winlin

for bug #251, merge the performance refines.

@@ -598,6 +598,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -598,6 +598,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
598 // collect elapse for pithy print. 598 // collect elapse for pithy print.
599 pithy_print.elapse(); 599 pithy_print.elapse();
600 600
  601 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  602 + // wait for message to incoming.
  603 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  604 + consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
  605 +#endif
  606 +
601 // get messages from consumer. 607 // get messages from consumer.
602 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. 608 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
603 int count = 0; 609 int count = 0;
@@ -605,12 +611,16 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -605,12 +611,16 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
605 srs_error("get messages from consumer failed. ret=%d", ret); 611 srs_error("get messages from consumer failed. ret=%d", ret);
606 return ret; 612 return ret;
607 } 613 }
608 -  
609 - // no messages, sleep for a while. 614 +
  615 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  616 + // we use wait to get messages, so the count must be positive.
  617 + srs_assert(count > 0);
  618 +#else
610 if (count <= 0) { 619 if (count <= 0) {
611 st_usleep(mw_sleep * 1000); 620 st_usleep(mw_sleep * 1000);
612 } 621 }
613 - srs_info("got %d msgs, mw=%d", count, mw_sleep); 622 +#endif
  623 + srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
614 624
615 // reportable 625 // reportable
616 if (pithy_print.can_print()) { 626 if (pithy_print.can_print()) {
@@ -995,6 +1005,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) @@ -995,6 +1005,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
995 return; 1005 return;
996 } 1006 }
997 1007
  1008 + // get the sock buffer size.
  1009 + int fd = st_netfd_fileno(stfd);
  1010 + int onb_sbuf = 0;
  1011 + socklen_t sock_buf_size = sizeof(int);
  1012 + getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);
  1013 +
  1014 +#ifdef SRS_PERF_MW_SO_SNDBUF
998 // the bytes: 1015 // the bytes:
999 // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, 1016 // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
1000 // 128KB=131072, 256KB=262144, 512KB=524288 1017 // 128KB=131072, 256KB=262144, 512KB=524288
@@ -1007,11 +1024,6 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) @@ -1007,11 +1024,6 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
1007 // 2000*5000/8=1250000B(about 1220KB). 1024 // 2000*5000/8=1250000B(about 1220KB).
1008 int kbps = 5000; 1025 int kbps = 5000;
1009 int socket_buffer_size = sleep_ms * kbps / 8; 1026 int socket_buffer_size = sleep_ms * kbps / 8;
1010 -  
1011 - int fd = st_netfd_fileno(stfd);  
1012 - int onb_sbuf = 0;  
1013 - socklen_t sock_buf_size = sizeof(int);  
1014 - getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);  
1015 1027
1016 // socket send buffer, system will double it. 1028 // socket send buffer, system will double it.
1017 int nb_sbuf = socket_buffer_size / 2; 1029 int nb_sbuf = socket_buffer_size / 2;
@@ -1022,9 +1034,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) @@ -1022,9 +1034,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
1022 } 1034 }
1023 getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size); 1035 getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size);
1024 1036
1025 - srs_trace("mw change sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d", 1037 + srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d",
1026 mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size, 1038 mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size,
1027 onb_sbuf, nb_sbuf); 1039 onb_sbuf, nb_sbuf);
  1040 +#else
  1041 + srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d",
  1042 + mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf);
  1043 +#endif
1028 1044
1029 mw_sleep = sleep_ms; 1045 mw_sleep = sleep_ms;
1030 } 1046 }
@@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size) @@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size)
182 queue_size_ms = (int)(queue_size * 1000); 182 queue_size_ms = (int)(queue_size * 1000);
183 } 183 }
184 184
185 -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) 185 +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
186 { 186 {
187 int ret = ERROR_SUCCESS; 187 int ret = ERROR_SUCCESS;
188 188
@@ -197,11 +197,6 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) @@ -197,11 +197,6 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
197 msgs.push_back(msg); 197 msgs.push_back(msg);
198 198
199 while (av_end_time - av_start_time > queue_size_ms) { 199 while (av_end_time - av_start_time > queue_size_ms) {
200 - // notice the caller queue already overflow and shrinked.  
201 - if (is_overflow) {  
202 - *is_overflow = true;  
203 - }  
204 -  
205 shrink(); 200 shrink();
206 } 201 }
207 202
@@ -211,7 +206,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) @@ -211,7 +206,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
211 int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) 206 int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
212 { 207 {
213 int ret = ERROR_SUCCESS; 208 int ret = ERROR_SUCCESS;
214 - 209 +
215 int nb_msgs = (int)msgs.size(); 210 int nb_msgs = (int)msgs.size();
216 if (nb_msgs <= 0) { 211 if (nb_msgs <= 0) {
217 return ret; 212 return ret;
@@ -308,6 +303,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -308,6 +303,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
308 jitter = new SrsRtmpJitter(); 303 jitter = new SrsRtmpJitter();
309 queue = new SrsMessageQueue(); 304 queue = new SrsMessageQueue();
310 should_update_source_id = false; 305 should_update_source_id = false;
  306 +
  307 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  308 + mw_wait = st_cond_new();
  309 + mw_min_msgs = 0;
  310 + mw_duration = 0;
  311 + mw_waiting = false;
  312 +#endif
311 } 313 }
312 314
313 SrsConsumer::~SrsConsumer() 315 SrsConsumer::~SrsConsumer()
@@ -315,6 +317,10 @@ SrsConsumer::~SrsConsumer() @@ -315,6 +317,10 @@ SrsConsumer::~SrsConsumer()
315 source->on_consumer_destroy(this); 317 source->on_consumer_destroy(this);
316 srs_freep(jitter); 318 srs_freep(jitter);
317 srs_freep(queue); 319 srs_freep(queue);
  320 +
  321 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  322 + st_cond_destroy(mw_wait);
  323 +#endif
318 } 324 }
319 325
320 void SrsConsumer::set_queue_size(double queue_size) 326 void SrsConsumer::set_queue_size(double queue_size)
@@ -344,11 +350,25 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, @@ -344,11 +350,25 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv,
344 return ret; 350 return ret;
345 } 351 }
346 } 352 }
347 -  
348 - if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { 353 +
  354 + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
349 return ret; 355 return ret;
350 } 356 }
351 357
  358 + #ifdef SRS_PERF_QUEUE_COND_WAIT
  359 + // fire the mw when msgs is enough.
  360 + if (mw_waiting) {
  361 + int duration_ms = queue->duration();
  362 + bool match_min_msgs = queue->size() > mw_min_msgs;
  363 +
  364 + // when duration ok, signal to flush.
  365 + if (match_min_msgs && duration_ms > mw_duration) {
  366 + st_cond_signal(mw_wait);
  367 + mw_waiting = false;
  368 + }
  369 + }
  370 + #endif
  371 +
352 return ret; 372 return ret;
353 } 373 }
354 374
@@ -376,6 +396,27 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) @@ -376,6 +396,27 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
376 return ret; 396 return ret;
377 } 397 }
378 398
  399 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  400 +void SrsConsumer::wait(int nb_msgs, int duration)
  401 +{
  402 + mw_min_msgs = nb_msgs;
  403 + mw_duration = duration;
  404 +
  405 + int duration_ms = queue->duration();
  406 + bool match_min_msgs = queue->size() > mw_min_msgs;
  407 +
  408 + // when duration ok, signal to flush.
  409 + if (match_min_msgs && duration_ms > mw_duration) {
  410 + return;
  411 + }
  412 +
  413 + // the enqueue will notify this cond.
  414 + mw_waiting = true;
  415 + // wait for msgs to incoming.
  416 + st_cond_wait(mw_wait);
  417 +}
  418 +#endif
  419 +
379 int SrsConsumer::on_play_client_pause(bool is_pause) 420 int SrsConsumer::on_play_client_pause(bool is_pause)
380 { 421 {
381 int ret = ERROR_SUCCESS; 422 int ret = ERROR_SUCCESS;
@@ -134,12 +134,11 @@ public: @@ -134,12 +134,11 @@ public:
134 /** 134 /**
135 * enqueue the message, the timestamp always monotonically. 135 * enqueue the message, the timestamp always monotonically.
136 * @param msg, the msg to enqueue, user never free it whatever the return code. 136 * @param msg, the msg to enqueue, user never free it whatever the return code.
137 - * @param is_overflow, whether overflow and shrinked. NULL to ignore.  
138 */ 137 */
139 - virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); 138 + virtual int enqueue(SrsSharedPtrMessage* msg);
140 /** 139 /**
141 * get packets in consumer queue. 140 * get packets in consumer queue.
142 - * @pmsgs SrsCommonMessages*[], used to store the msgs, user must alloc it. 141 + * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
143 * @count the count in array, output param. 142 * @count the count in array, output param.
144 * @max_count the max count to dequeue, must be positive. 143 * @max_count the max count to dequeue, must be positive.
145 */ 144 */
@@ -165,6 +164,14 @@ private: @@ -165,6 +164,14 @@ private:
165 bool paused; 164 bool paused;
166 // when source id changed, notice all consumers 165 // when source id changed, notice all consumers
167 bool should_update_source_id; 166 bool should_update_source_id;
  167 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  168 + // the cond wait for mw.
  169 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  170 + st_cond_t mw_wait;
  171 + bool mw_waiting;
  172 + int mw_min_msgs;
  173 + int mw_duration;
  174 +#endif
168 public: 175 public:
169 SrsConsumer(SrsSource* _source); 176 SrsConsumer(SrsSource* _source);
170 virtual ~SrsConsumer(); 177 virtual ~SrsConsumer();
@@ -200,6 +207,14 @@ public: @@ -200,6 +207,14 @@ public:
200 * @max_count the max count to dequeue, must be positive. 207 * @max_count the max count to dequeue, must be positive.
201 */ 208 */
202 virtual int dump_packets(SrsMessageArray* msgs, int& count); 209 virtual int dump_packets(SrsMessageArray* msgs, int& count);
  210 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  211 + /**
  212 + * wait for messages incomming, atleast nb_msgs and in duration.
  213 + * @param nb_msgs the messages count to wait.
  214 + * @param duration the messgae duration to wait.
  215 + */
  216 + virtual void wait(int nb_msgs, int duration);
  217 +#endif
203 /** 218 /**
204 * when client send the pause message. 219 * when client send the pause message.
205 */ 220 */
@@ -103,6 +103,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -103,6 +103,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
103 * @remark, recomment to 128. 103 * @remark, recomment to 128.
104 */ 104 */
105 #define SRS_PERF_MW_MSGS 128 105 #define SRS_PERF_MW_MSGS 128
  106 +/**
  107 +* whether set the socket send buffer size.
  108 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  109 +*/
  110 +#undef SRS_PERF_MW_SO_SNDBUF
  111 +/**
  112 +* whether set the socket recv buffer size.
  113 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  114 +*/
  115 +#undef SRS_PERF_MW_SO_RCVBUF
  116 +/**
  117 +* whether use cond wait to send messages.
  118 +* @remark this improve performance for large connectios.
  119 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  120 +*/
  121 +#undef SRS_PERF_QUEUE_COND_WAIT
106 122
107 /** 123 /**
108 * how many chunk stream to cache, [0, N]. 124 * how many chunk stream to cache, [0, N].
@@ -37,7 +37,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -37,7 +37,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
37 37
38 class SrsProtocol; 38 class SrsProtocol;
39 class ISrsProtocolReaderWriter; 39 class ISrsProtocolReaderWriter;
40 -class ISrsCommonMessage;  
41 class SrsCommonMessage; 40 class SrsCommonMessage;
42 class SrsCreateStreamPacket; 41 class SrsCreateStreamPacket;
43 class SrsFMLEStartPacket; 42 class SrsFMLEStartPacket;
@@ -755,10 +755,17 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -755,10 +755,17 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
755 755
756 // always write the header event payload is empty. 756 // always write the header event payload is empty.
757 while (p < pend) { 757 while (p < pend) {
758 - // header use iov[0].  
759 - generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, iov); 758 + // always has header
  759 + int nbh = 0;
  760 + char* header = NULL;
  761 + generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
  762 + srs_assert(nbh > 0);
760 763
761 - // payload use iov[1]. 764 + // header iov
  765 + iov[0].iov_base = header;
  766 + iov[0].iov_len = nbh;
  767 +
  768 + // payload iov
762 int payload_size = pend - p; 769 int payload_size = pend - p;
763 if (payload_size > out_chunk_size) { 770 if (payload_size > out_chunk_size) {
764 payload_size = out_chunk_size; 771 payload_size = out_chunk_size;
@@ -781,14 +788,14 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -781,14 +788,14 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
781 int realloc_size = sizeof(iovec) * nb_out_iovs; 788 int realloc_size = sizeof(iovec) * nb_out_iovs;
782 out_iovs = (iovec*)realloc(out_iovs, realloc_size); 789 out_iovs = (iovec*)realloc(out_iovs, realloc_size);
783 } 790 }
784 -  
785 - // to next c0c3 header cache  
786 - c0c3_cache_index += iov[0].iov_len;  
787 - c0c3_cache = out_c0c3_caches + c0c3_cache_index;  
788 791
789 // to next pair of iovs 792 // to next pair of iovs
790 iov_index += 2; 793 iov_index += 2;
791 iov = out_iovs + iov_index; 794 iov = out_iovs + iov_index;
  795 +
  796 + // to next c0c3 header cache
  797 + c0c3_cache_index += nbh;
  798 + c0c3_cache = out_c0c3_caches + c0c3_cache_index;
792 799
793 // the cache header should never be realloc again, 800 // the cache header should never be realloc again,
794 // for the ptr is set to iovs, so we just warn user to set larger 801 // for the ptr is set to iovs, so we just warn user to set larger
@@ -898,7 +905,7 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) @@ -898,7 +905,7 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id)
898 return ret; 905 return ret;
899 } 906 }
900 907
901 -void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov) 908 +void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
902 { 909 {
903 // to directly set the field. 910 // to directly set the field.
904 char* pp = NULL; 911 char* pp = NULL;
@@ -975,8 +982,8 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool @@ -975,8 +982,8 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool
975 } 982 }
976 983
977 // always has header 984 // always has header
978 - iov->iov_base = cache;  
979 - iov->iov_len = p - cache; 985 + *pnbh = p - cache;
  986 + *ph = cache;
980 } 987 }
981 988
982 int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) 989 int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket)
@@ -496,9 +496,11 @@ private: @@ -496,9 +496,11 @@ private:
496 * generate the chunk header for msg. 496 * generate the chunk header for msg.
497 * @param mh, the header of msg to send. 497 * @param mh, the header of msg to send.
498 * @param c0, whether the first chunk, the c0 chunk. 498 * @param c0, whether the first chunk, the c0 chunk.
499 - * @param iov, output the header and size to iovec. 499 + * @param pnbh, output the size of header.
  500 + * @param ph, output the header cache.
  501 + * user should never free it, it's cached header.
500 */ 502 */
501 - virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov); 503 + virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph);
502 /** 504 /**
503 * imp for decode_message 505 * imp for decode_message
504 */ 506 */