winlin

for bug #251, somhc(session-oriented message-header cache). 2.0.61

@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 60 34 +#define VERSION_REVISION 61
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -729,6 +729,13 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -729,6 +729,13 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
729 int c0c3_cache_index = 0; 729 int c0c3_cache_index = 0;
730 char* c0c3_cache = out_c0c3_caches + c0c3_cache_index; 730 char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
731 731
  732 + // the somc(session-oriented message-header cache),
  733 + // many message header are same, use cache.
  734 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  735 + SrsMessageHeader* somhc = NULL;
  736 + char* somhc_bytes = NULL;
  737 + int nb_somhc_bytes = 0;
  738 +
732 // try to send use the c0c3 header cache, 739 // try to send use the c0c3 header cache,
733 // if cache is consumed, try another loop. 740 // if cache is consumed, try another loop.
734 for (int i = 0; i < nb_msgs; i++) { 741 for (int i = 0; i < nb_msgs; i++) {
@@ -755,8 +762,20 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -755,8 +762,20 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
755 762
756 // always write the header event payload is empty. 763 // always write the header event payload is empty.
757 while (p < pend) { 764 while (p < pend) {
  765 + // the first chunk is c0, others is c3.
  766 + bool is_c0 = p == msg->payload;
  767 +
758 // header use iov[0]. 768 // header use iov[0].
759 - generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, iov); 769 + generate_chunk_header(somhc, somhc_bytes, nb_somhc_bytes,
  770 + c0c3_cache, SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index,
  771 + &msg->header, is_c0, iov);
  772 +
  773 + // set somhc to the first header.
  774 + if (!somhc) {
  775 + somhc = &msg->header;
  776 + somhc_bytes = (char*)iov[0].iov_base;
  777 + nb_somhc_bytes = iov[0].iov_len;
  778 + }
760 779
761 // payload use iov[1]. 780 // payload use iov[1].
762 int payload_size = pend - p; 781 int payload_size = pend - p;
@@ -898,8 +917,10 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) @@ -898,8 +917,10 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id)
898 return ret; 917 return ret;
899 } 918 }
900 919
901 -void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov)  
902 -{ 920 +void SrsProtocol::generate_chunk_header(
  921 + SrsMessageHeader* somhc, char* somhc_bytes, int nb_somhc_bytes,
  922 + char* cache, int nb_cache, SrsMessageHeader* mh, bool c0, iovec* iov
  923 +) {
903 // to directly set the field. 924 // to directly set the field.
904 char* pp = NULL; 925 char* pp = NULL;
905 926
@@ -910,37 +931,55 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool @@ -910,37 +931,55 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool
910 u_int32_t timestamp = (u_int32_t)mh->timestamp; 931 u_int32_t timestamp = (u_int32_t)mh->timestamp;
911 932
912 if (c0) { 933 if (c0) {
  934 + // if cached header, copy it.
  935 + if (somhc) {
  936 + srs_assert(nb_cache >= nb_somhc_bytes);
  937 + memcpy(cache, somhc_bytes, nb_somhc_bytes);
  938 + }
  939 +
913 // write new chunk stream header, fmt is 0 940 // write new chunk stream header, fmt is 0
914 *p++ = 0x00 | (mh->perfer_cid & 0x3F); 941 *p++ = 0x00 | (mh->perfer_cid & 0x3F);
915 942
916 // chunk message header, 11 bytes 943 // chunk message header, 11 bytes
917 // timestamp, 3bytes, big-endian 944 // timestamp, 3bytes, big-endian
918 - if (timestamp < RTMP_EXTENDED_TIMESTAMP) {  
919 - pp = (char*)&timestamp;  
920 - *p++ = pp[2];  
921 - *p++ = pp[1];  
922 - *p++ = pp[0]; 945 + if (somhc && somhc->timestamp == mh->timestamp) {
  946 + p += 3;
923 } else { 947 } else {
924 - *p++ = 0xFF;  
925 - *p++ = 0xFF;  
926 - *p++ = 0xFF; 948 + if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
  949 + pp = (char*)&timestamp;
  950 + *p++ = pp[2];
  951 + *p++ = pp[1];
  952 + *p++ = pp[0];
  953 + } else {
  954 + *p++ = 0xFF;
  955 + *p++ = 0xFF;
  956 + *p++ = 0xFF;
  957 + }
927 } 958 }
928 959
929 // message_length, 3bytes, big-endian 960 // message_length, 3bytes, big-endian
930 - pp = (char*)&mh->payload_length;  
931 - *p++ = pp[2];  
932 - *p++ = pp[1];  
933 - *p++ = pp[0]; 961 + if (somhc && somhc->payload_length == mh->payload_length) {
  962 + p += 3;
  963 + } else {
  964 + pp = (char*)&mh->payload_length;
  965 + *p++ = pp[2];
  966 + *p++ = pp[1];
  967 + *p++ = pp[0];
  968 + }
934 969
935 // message_type, 1bytes 970 // message_type, 1bytes
936 *p++ = mh->message_type; 971 *p++ = mh->message_type;
937 972
938 - // message_length, 3bytes, little-endian  
939 - pp = (char*)&mh->stream_id;  
940 - *p++ = pp[0];  
941 - *p++ = pp[1];  
942 - *p++ = pp[2];  
943 - *p++ = pp[3]; 973 + // stream_id, 4bytes, little-endian.
  974 + if (somhc && somhc->stream_id == mh->stream_id) {
  975 + p += 4;
  976 + } else {
  977 + pp = (char*)&mh->stream_id;
  978 + *p++ = pp[0];
  979 + *p++ = pp[1];
  980 + *p++ = pp[2];
  981 + *p++ = pp[3];
  982 + }
944 } else { 983 } else {
945 // write no message header chunk stream, fmt is 3 984 // write no message header chunk stream, fmt is 3
946 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, 985 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
@@ -967,11 +1006,15 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool @@ -967,11 +1006,15 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool
967 // @see: http://blog.csdn.net/win_lin/article/details/13363699 1006 // @see: http://blog.csdn.net/win_lin/article/details/13363699
968 // TODO: FIXME: extract to outer. 1007 // TODO: FIXME: extract to outer.
969 if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { 1008 if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
970 - pp = (char*)&timestamp;  
971 - *p++ = pp[3];  
972 - *p++ = pp[2];  
973 - *p++ = pp[1];  
974 - *p++ = pp[0]; 1009 + if (somhc && somhc->payload_length == mh->payload_length) {
  1010 + p += 4;
  1011 + } else {
  1012 + pp = (char*)&timestamp;
  1013 + *p++ = pp[3];
  1014 + *p++ = pp[2];
  1015 + *p++ = pp[1];
  1016 + *p++ = pp[0];
  1017 + }
975 } 1018 }
976 1019
977 // always has header 1020 // always has header
@@ -84,7 +84,7 @@ public: @@ -84,7 +84,7 @@ public:
84 /** 84 /**
85 * 4bytes. 85 * 4bytes.
86 * Four-byte field that identifies the stream of the message. These 86 * Four-byte field that identifies the stream of the message. These
87 - * bytes are set in big-endian format. 87 + * bytes are set in little-endian format.
88 */ 88 */
89 int32_t stream_id; 89 int32_t stream_id;
90 90
@@ -494,11 +494,16 @@ private: @@ -494,11 +494,16 @@ private:
494 virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id); 494 virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id);
495 /** 495 /**
496 * generate the chunk header for msg. 496 * generate the chunk header for msg.
  497 + * @param somhc, session-oriented message-header cache.
  498 + * @param somhc_bytes, the serialized bytes.
  499 + * @param nb_somhc_bytes, the size of somhc_bytes.
497 * @param mh, the header of msg to send. 500 * @param mh, the header of msg to send.
498 * @param c0, whether the first chunk, the c0 chunk. 501 * @param c0, whether the first chunk, the c0 chunk.
499 * @param iov, output the header and size to iovec. 502 * @param iov, output the header and size to iovec.
500 */ 503 */
501 - virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov); 504 + virtual void generate_chunk_header(
  505 + SrsMessageHeader* somhc, char* somhc_bytes, int nb_somhc_bytes,
  506 + char* cache, int nb_cache, SrsMessageHeader* mh, bool c0, iovec* iov);
502 /** 507 /**
503 * imp for decode_message 508 * imp for decode_message
504 */ 509 */