winlin

Merge branch 'srs.master'

@@ -532,78 +532,121 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) @@ -532,78 +532,121 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
532 return ret; 532 return ret;
533 } 533 }
534 534
535 -int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) 535 +int SrsProtocol::do_send_message(SrsMessage* msg)
536 { 536 {
537 int ret = ERROR_SUCCESS; 537 int ret = ERROR_SUCCESS;
538 538
539 - // always not NULL msg.  
540 - srs_assert(msg); 539 + // ignore empty message.
  540 + if (!msg->payload || msg->size <= 0) {
  541 + srs_info("ignore empty message.");
  542 + return ret;
  543 + }
541 544
542 // we donot use the complex basic header, 545 // we donot use the complex basic header,
543 // ensure the basic header is 1bytes. 546 // ensure the basic header is 1bytes.
544 if (msg->header.perfer_cid < 2) { 547 if (msg->header.perfer_cid < 2) {
545 - srs_warn("change the chunk_id=%d to default=%d", msg->header.perfer_cid, RTMP_CID_ProtocolControl); 548 + srs_warn("change the chunk_id=%d to default=%d",
  549 + msg->header.perfer_cid, RTMP_CID_ProtocolControl);
546 msg->header.perfer_cid = RTMP_CID_ProtocolControl; 550 msg->header.perfer_cid = RTMP_CID_ProtocolControl;
547 } 551 }
548 552
549 // p set to current write position, 553 // p set to current write position,
550 // it's ok when payload is NULL and size is 0. 554 // it's ok when payload is NULL and size is 0.
551 char* p = msg->payload; 555 char* p = msg->payload;
  556 + char* pend = msg->payload + msg->size;
  557 +
  558 + // always write the header event payload is empty.
  559 + while (p < pend) {
  560 + // always has header
  561 + int nbh = 0;
  562 + char* header = NULL;
  563 + generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header);
  564 + srs_assert(nbh > 0);
  565 +
  566 + // header iov
  567 + iov[0].iov_base = header;
  568 + iov[0].iov_len = nbh;
  569 +
  570 + // payload iov
  571 + int payload_size = pend - p;
  572 + if (payload_size > out_chunk_size) {
  573 + payload_size = out_chunk_size;
  574 + }
  575 + iov[1].iov_base = p;
  576 + iov[1].iov_len = payload_size;
  577 +
  578 + // send by writev
  579 + // sendout header and payload by writev.
  580 + // decrease the sys invoke count to get higher performance.
  581 + if ((ret = skt->writev(iov, 2, NULL)) != ERROR_SUCCESS) {
  582 + srs_error("send with writev failed. ret=%d", ret);
  583 + return ret;
  584 + }
  585 +
  586 + // consume sendout bytes.
  587 + p += payload_size;
  588 + }
  589 +
  590 + return ret;
  591 +}
  592 +
  593 +void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
  594 +{
  595 + char* cache = out_c0_cache;
  596 +
552 // to directly set the field. 597 // to directly set the field.
553 char* pp = NULL; 598 char* pp = NULL;
554 599
555 - // always write the header event payload is empty.  
556 - do {  
557 // generate the header. 600 // generate the header.
558 - char* pheader = out_header_cache; 601 + char* p = cache;
559 602
560 - if (p == msg->payload) { 603 + if (c0) {
561 // write new chunk stream header, fmt is 0 604 // write new chunk stream header, fmt is 0
562 - *pheader++ = 0x00 | (msg->header.perfer_cid & 0x3F); 605 + *p++ = 0x00 | (mh->perfer_cid & 0x3F);
563 606
564 // chunk message header, 11 bytes 607 // chunk message header, 11 bytes
565 // timestamp, 3bytes, big-endian 608 // timestamp, 3bytes, big-endian
566 - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; 609 + u_int32_t timestamp = (u_int32_t)mh->timestamp;
567 if (timestamp < RTMP_EXTENDED_TIMESTAMP) { 610 if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
568 pp = (char*)&timestamp; 611 pp = (char*)&timestamp;
569 - *pheader++ = pp[2];  
570 - *pheader++ = pp[1];  
571 - *pheader++ = pp[0]; 612 + *p++ = pp[2];
  613 + *p++ = pp[1];
  614 + *p++ = pp[0];
572 } else { 615 } else {
573 - *pheader++ = 0xFF;  
574 - *pheader++ = 0xFF;  
575 - *pheader++ = 0xFF; 616 + *p++ = 0xFF;
  617 + *p++ = 0xFF;
  618 + *p++ = 0xFF;
576 } 619 }
577 620
578 // message_length, 3bytes, big-endian 621 // message_length, 3bytes, big-endian
579 - pp = (char*)&msg->header.payload_length;  
580 - *pheader++ = pp[2];  
581 - *pheader++ = pp[1];  
582 - *pheader++ = pp[0]; 622 + pp = (char*)&mh->payload_length;
  623 + *p++ = pp[2];
  624 + *p++ = pp[1];
  625 + *p++ = pp[0];
583 626
584 // message_type, 1bytes 627 // message_type, 1bytes
585 - *pheader++ = msg->header.message_type; 628 + *p++ = mh->message_type;
586 629
587 // message_length, 3bytes, little-endian 630 // message_length, 3bytes, little-endian
588 - pp = (char*)&msg->header.stream_id;  
589 - *pheader++ = pp[0];  
590 - *pheader++ = pp[1];  
591 - *pheader++ = pp[2];  
592 - *pheader++ = pp[3]; 631 + pp = (char*)&mh->stream_id;
  632 + *p++ = pp[0];
  633 + *p++ = pp[1];
  634 + *p++ = pp[2];
  635 + *p++ = pp[3];
593 636
594 // chunk extended timestamp header, 0 or 4 bytes, big-endian 637 // chunk extended timestamp header, 0 or 4 bytes, big-endian
595 if(timestamp >= RTMP_EXTENDED_TIMESTAMP) { 638 if(timestamp >= RTMP_EXTENDED_TIMESTAMP) {
596 pp = (char*)&timestamp; 639 pp = (char*)&timestamp;
597 - *pheader++ = pp[3];  
598 - *pheader++ = pp[2];  
599 - *pheader++ = pp[1];  
600 - *pheader++ = pp[0]; 640 + *p++ = pp[3];
  641 + *p++ = pp[2];
  642 + *p++ = pp[1];
  643 + *p++ = pp[0];
601 } 644 }
602 } else { 645 } else {
603 // write no message header chunk stream, fmt is 3 646 // write no message header chunk stream, fmt is 3
604 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, 647 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
605 // SRS will rollback to 1B chunk header. 648 // SRS will rollback to 1B chunk header.
606 - *pheader++ = 0xC0 | (msg->header.perfer_cid & 0x3F); 649 + *p++ = 0xC0 | (mh->perfer_cid & 0x3F);
607 650
608 // chunk extended timestamp header, 0 or 4 bytes, big-endian 651 // chunk extended timestamp header, 0 or 4 bytes, big-endian
609 // 6.1.3. Extended Timestamp 652 // 6.1.3. Extended Timestamp
@@ -618,51 +661,20 @@ int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) @@ -618,51 +661,20 @@ int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet)
618 // must send the extended-timestamp to flash-player. 661 // must send the extended-timestamp to flash-player.
619 // @see: ngx_rtmp_prepare_message 662 // @see: ngx_rtmp_prepare_message
620 // @see: http://blog.csdn.net/win_lin/article/details/13363699 663 // @see: http://blog.csdn.net/win_lin/article/details/13363699
621 - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; 664 + // TODO: FIXME: extract to outer.
  665 + u_int32_t timestamp = (u_int32_t)mh->timestamp;
622 if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { 666 if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
623 pp = (char*)&timestamp; 667 pp = (char*)&timestamp;
624 - *pheader++ = pp[3];  
625 - *pheader++ = pp[2];  
626 - *pheader++ = pp[1];  
627 - *pheader++ = pp[0]; 668 + *p++ = pp[3];
  669 + *p++ = pp[2];
  670 + *p++ = pp[1];
  671 + *p++ = pp[0];
628 } 672 }
629 } 673 }
630 674
631 - // sendout header and payload by writev.  
632 - // decrease the sys invoke count to get higher performance.  
633 - int payload_size = msg->size - (p - msg->payload);  
634 - payload_size = srs_min(payload_size, out_chunk_size);  
635 -  
636 // always has header 675 // always has header
637 - int header_size = pheader - out_header_cache;  
638 - srs_assert(header_size > 0);  
639 -  
640 - // send by writev  
641 - iovec iov[2];  
642 - iov[0].iov_base = out_header_cache;  
643 - iov[0].iov_len = header_size;  
644 - iov[1].iov_base = p;  
645 - iov[1].iov_len = payload_size;  
646 -  
647 - ssize_t nwrite;  
648 - if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {  
649 - srs_error("send with writev failed. ret=%d", ret);  
650 - return ret;  
651 - }  
652 -  
653 - // consume sendout bytes when not empty packet.  
654 - if (msg->payload && msg->size > 0) {  
655 - p += payload_size;  
656 - }  
657 - } while (p < msg->payload + msg->size);  
658 -  
659 - // only process the callback event when with packet  
660 - if (packet && (ret = on_send_packet(msg, packet)) != ERROR_SUCCESS) {  
661 - srs_error("hook the send message failed. ret=%d", ret);  
662 - return ret;  
663 - }  
664 -  
665 - return ret; 676 + *pnbh = p - cache;
  677 + *ph = cache;
666 } 678 }
667 679
668 int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) 680 int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket)
@@ -834,14 +846,17 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, @@ -834,14 +846,17 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
834 846
835 int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) 847 int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
836 { 848 {
837 - if (msg) { 849 + // always not NULL msg.
  850 + srs_assert(msg);
  851 +
  852 + // update the stream id in header.
838 msg->header.stream_id = stream_id; 853 msg->header.stream_id = stream_id;
839 - }  
840 854
841 // donot use the auto free to free the msg, 855 // donot use the auto free to free the msg,
842 // for performance issue. 856 // for performance issue.
843 - int ret = do_send_message(msg, NULL); 857 + int ret = do_send_message(msg);
844 srs_freep(msg); 858 srs_freep(msg);
  859 +
845 return ret; 860 return ret;
846 } 861 }
847 862
@@ -878,7 +893,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) @@ -878,7 +893,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
878 893
879 // donot use the auto free to free the msg, 894 // donot use the auto free to free the msg,
880 // for performance issue. 895 // for performance issue.
881 - ret = do_send_message(msg, packet); 896 + ret = do_send_message(msg);
  897 + if (ret == ERROR_SUCCESS) {
  898 + ret = on_send_packet(msg, packet);
  899 + }
882 srs_freep(msg); 900 srs_freep(msg);
883 901
884 return ret; 902 return ret;
@@ -1535,8 +1553,10 @@ int SrsProtocol::on_send_packet(SrsMessage* msg, SrsPacket* packet) @@ -1535,8 +1553,10 @@ int SrsProtocol::on_send_packet(SrsMessage* msg, SrsPacket* packet)
1535 { 1553 {
1536 int ret = ERROR_SUCCESS; 1554 int ret = ERROR_SUCCESS;
1537 1555
1538 - // should never be raw bytes oriented RTMP message.  
1539 - srs_assert(packet); 1556 + // ignore raw bytes oriented RTMP message.
  1557 + if (packet == NULL) {
  1558 + return ret;
  1559 + }
1540 1560
1541 switch (msg->header.message_type) { 1561 switch (msg->header.message_type) {
1542 case RTMP_MSG_SetChunkSize: { 1562 case RTMP_MSG_SetChunkSize: {
@@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 32
33 #include <map> 33 #include <map>
34 #include <string> 34 #include <string>
  35 +#include <sys/uio.h>
35 36
36 #include <srs_kernel_log.hpp> 37 #include <srs_kernel_log.hpp>
37 #include <srs_kernel_error.hpp> 38 #include <srs_kernel_error.hpp>
@@ -214,7 +215,11 @@ private: @@ -214,7 +215,11 @@ private:
214 * used for type0, 11bytes(or 15bytes with extended timestamp) header. 215 * used for type0, 11bytes(or 15bytes with extended timestamp) header.
215 * or for type3, 1bytes(or 5bytes with extended timestamp) header. 216 * or for type3, 1bytes(or 5bytes with extended timestamp) header.
216 */ 217 */
217 - char out_header_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; 218 + char out_c0_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
  219 + /**
  220 + * output iovec cache.
  221 + */
  222 + iovec iov[2];
218 /** 223 /**
219 * output chunk size, default to 128, set by config. 224 * output chunk size, default to 128, set by config.
220 */ 225 */
@@ -339,10 +344,19 @@ public: @@ -339,10 +344,19 @@ public:
339 } 344 }
340 private: 345 private:
341 /** 346 /**
342 - * send out the message, donot free it, the caller must free the param msg.  
343 - * @param packet the packet of message, NULL for raw message. 347 + * send out the message, donot free it,
  348 + * the caller must free the param msg.
  349 + */
  350 + virtual int do_send_message(SrsMessage* msg);
  351 + /**
  352 + * generate the chunk header for msg.
  353 + * @param mh, the header of msg to send.
  354 + * @param c0, whether the first chunk, the c0 chunk.
  355 + * @param pnbh, output the size of header.
  356 + * @param ph, output the header cache.
  357 + * user should never free it, it's cached header.
344 */ 358 */
345 - virtual int do_send_message(SrsMessage* msg, SrsPacket* packet); 359 + virtual void generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph);
346 /** 360 /**
347 * imp for decode_message 361 * imp for decode_message
348 */ 362 */