winlin

fix #251, revert changes, for the cond wait and fast cache queue is no use. 2.0.59

@@ -598,29 +598,19 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -598,29 +598,19 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
598 // collect elapse for pithy print. 598 // collect elapse for pithy print.
599 pithy_print.elapse(); 599 pithy_print.elapse();
600 600
601 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
602 - // wait for message to incoming.  
603 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251  
604 - consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);  
605 -#endif  
606 -  
607 // get messages from consumer. 601 // get messages from consumer.
608 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. 602 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
609 int count = 0; 603 int count = 0;
610 - if ((ret = consumer->dump_packets(&msgs, &count)) != ERROR_SUCCESS) { 604 + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
611 srs_error("get messages from consumer failed. ret=%d", ret); 605 srs_error("get messages from consumer failed. ret=%d", ret);
612 return ret; 606 return ret;
613 } 607 }
614 -  
615 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
616 - // we use wait to get messages, so the count must be positive.  
617 - srs_assert(count > 0);  
618 -#else 608 +
  609 + // no messages, sleep for a while.
619 if (count <= 0) { 610 if (count <= 0) {
620 st_usleep(mw_sleep * 1000); 611 st_usleep(mw_sleep * 1000);
621 } 612 }
622 -#endif  
623 - srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep); 613 + srs_info("got %d msgs, mw=%d", count, mw_sleep);
624 614
625 // reportable 615 // reportable
626 if (pithy_print.can_print()) { 616 if (pithy_print.can_print()) {
@@ -306,38 +306,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -306,38 +306,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
306 jitter = new SrsRtmpJitter(); 306 jitter = new SrsRtmpJitter();
307 queue = new SrsMessageQueue(); 307 queue = new SrsMessageQueue();
308 should_update_source_id = false; 308 should_update_source_id = false;
309 -  
310 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
311 - mw_wait = st_cond_new();  
312 - mw_min_msgs = 0;  
313 - mw_duration = 0;  
314 - mw_waiting = false;  
315 -#endif  
316 -  
317 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
318 - mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);  
319 - mw_count = 0;  
320 - mw_first_pkt = mw_last_pkt = 0;  
321 -#endif  
322 } 309 }
323 310
324 SrsConsumer::~SrsConsumer() 311 SrsConsumer::~SrsConsumer()
325 { 312 {
326 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
327 - if (mw_cache) {  
328 - mw_cache->free(mw_count);  
329 - mw_count = 0;  
330 - }  
331 - srs_freep(mw_cache);  
332 -#endif  
333 -  
334 source->on_consumer_destroy(this); 313 source->on_consumer_destroy(this);
335 srs_freep(jitter); 314 srs_freep(jitter);
336 srs_freep(queue); 315 srs_freep(queue);
337 -  
338 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
339 - st_cond_destroy(mw_wait);  
340 -#endif  
341 } 316 }
342 317
343 void SrsConsumer::set_queue_size(double queue_size) 318 void SrsConsumer::set_queue_size(double queue_size)
@@ -365,72 +340,15 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S @@ -365,72 +340,15 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
365 return ret; 340 return ret;
366 } 341 }
367 } 342 }
368 -  
369 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
370 - // use fast cache if available  
371 - if (mw_count < mw_cache->max) {  
372 - // update fast cache timestamps  
373 - if (mw_count == 0) {  
374 - mw_first_pkt = msg->header.timestamp;  
375 - }  
376 - mw_last_pkt = msg->header.timestamp;  
377 -  
378 - mw_cache->msgs[mw_count++] = msg;  
379 - } else{  
380 - // fast cache is full, use queue.  
381 - bool is_overflow = false;  
382 - if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) {  
383 - return ret;  
384 - }  
385 - // when overflow, clear cache and refresh the fast cache.  
386 - if (is_overflow) {  
387 - mw_cache->free(mw_count);  
388 - if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) {  
389 - return ret;  
390 - }  
391 - }  
392 - }  
393 -  
394 - #ifdef SRS_PERF_QUEUE_COND_WAIT  
395 - // fire the mw when msgs is enough.  
396 - if (mw_waiting) {  
397 - // when fast cache not overflow, always flush.  
398 - // so we donot care about the queue.  
399 - bool fast_cache_overflow = mw_count >= mw_cache->max;  
400 - int duration_ms = (int)(mw_last_pkt - mw_first_pkt);  
401 - bool match_min_msgs = mw_count > mw_min_msgs;  
402 -  
403 - // when fast cache overflow, or duration ok, signal to flush.  
404 - if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {  
405 - st_cond_signal(mw_wait);  
406 - mw_waiting = false;  
407 - }  
408 - }  
409 - #endif  
410 -#else 343 +
411 if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { 344 if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {
412 return ret; 345 return ret;
413 } 346 }
414 347
415 - #ifdef SRS_PERF_QUEUE_COND_WAIT  
416 - // fire the mw when msgs is enough.  
417 - if (mw_waiting) {  
418 - int duration_ms = queue->duration();  
419 - bool match_min_msgs = queue->size() > mw_min_msgs;  
420 -  
421 - // when duration ok, signal to flush.  
422 - if (match_min_msgs && duration_ms > mw_duration) {  
423 - st_cond_signal(mw_wait);  
424 - mw_waiting = false;  
425 - }  
426 - }  
427 - #endif  
428 -#endif  
429 -  
430 return ret; 348 return ret;
431 } 349 }
432 350
433 -int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) 351 +int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
434 { 352 {
435 int ret =ERROR_SUCCESS; 353 int ret =ERROR_SUCCESS;
436 354
@@ -446,68 +364,13 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) @@ -446,68 +364,13 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
446 return ret; 364 return ret;
447 } 365 }
448 366
449 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
450 - // only dumps an whole array to msgs.  
451 - for (int i = 0; i < mw_count; i++) {  
452 - msgs->msgs[i] = mw_cache->msgs[i];  
453 - }  
454 - *count = mw_count;  
455 -  
456 - // when fast cache is not filled,  
457 - // we donot check the queue, direclty zero fast cache.  
458 - if (mw_count < mw_cache->max) {  
459 - mw_count = 0;  
460 - mw_first_pkt = mw_last_pkt = 0;  
461 - return ret;  
462 - }  
463 -  
464 - return dumps_queue_to_fast_cache();  
465 -#else  
466 -  
467 // pump msgs from queue. 367 // pump msgs from queue.
468 - int nb_msgs = 0;  
469 - if ((ret = queue->dump_packets(msgs->max, msgs->msgs, nb_msgs)) != ERROR_SUCCESS) { 368 + if ((ret = queue->dump_packets(msgs->max, msgs->msgs, count)) != ERROR_SUCCESS) {
470 return ret; 369 return ret;
471 } 370 }
472 - *count = nb_msgs;  
473 371
474 return ret; 372 return ret;
475 -#endif  
476 -}  
477 -  
478 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
479 -void SrsConsumer::wait(int nb_msgs, int duration)  
480 -{  
481 - mw_min_msgs = nb_msgs;  
482 - mw_duration = duration;  
483 -  
484 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
485 - // when fast cache not overflow, always flush.  
486 - // so we donot care about the queue.  
487 - bool fast_cache_overflow = mw_count >= mw_cache->max;  
488 - int duration_ms = (int)(mw_last_pkt - mw_first_pkt);  
489 - bool match_min_msgs = mw_count > mw_min_msgs;  
490 -  
491 - // when fast cache overflow, or duration ok, signal to flush.  
492 - if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {  
493 - return;  
494 - }  
495 -#else  
496 - int duration_ms = queue->duration();  
497 - bool match_min_msgs = queue->size() > mw_min_msgs;  
498 -  
499 - // when duration ok, signal to flush.  
500 - if (match_min_msgs && duration_ms > mw_duration) {  
501 - return;  
502 - }  
503 -#endif  
504 -  
505 - // the enqueue will notify this cond.  
506 - mw_waiting = true;  
507 - // wait for msgs to incoming.  
508 - st_cond_wait(mw_wait);  
509 } 373 }
510 -#endif  
511 374
512 int SrsConsumer::on_play_client_pause(bool is_pause) 375 int SrsConsumer::on_play_client_pause(bool is_pause)
513 { 376 {
@@ -519,28 +382,6 @@ int SrsConsumer::on_play_client_pause(bool is_pause) @@ -519,28 +382,6 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
519 return ret; 382 return ret;
520 } 383 }
521 384
522 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
523 -int SrsConsumer::dumps_queue_to_fast_cache()  
524 -{  
525 - int ret =ERROR_SUCCESS;  
526 -  
527 - // fill fast cache with queue.  
528 - if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) {  
529 - return ret;  
530 - }  
531 - // set the timestamp when got message.  
532 - if (mw_count > 0) {  
533 - SrsMessage* first_msg = mw_cache->msgs[0];  
534 - mw_first_pkt = first_msg->header.timestamp;  
535 -  
536 - SrsMessage* last_msg = mw_cache->msgs[mw_count - 1];  
537 - mw_last_pkt = last_msg->header.timestamp;  
538 - }  
539 -  
540 - return ret;  
541 -}  
542 -#endif  
543 -  
544 SrsGopCache::SrsGopCache() 385 SrsGopCache::SrsGopCache()
545 { 386 {
546 cached_video_count = 0; 387 cached_video_count = 0;
@@ -165,24 +165,6 @@ private: @@ -165,24 +165,6 @@ private:
165 bool paused; 165 bool paused;
166 // when source id changed, notice all consumers 166 // when source id changed, notice all consumers
167 bool should_update_source_id; 167 bool should_update_source_id;
168 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
169 - // the cond wait for mw.  
170 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251  
171 - st_cond_t mw_wait;  
172 - bool mw_waiting;  
173 - int mw_min_msgs;  
174 - int mw_duration;  
175 -#endif  
176 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
177 - // use fast cache for msgs  
178 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251  
179 - SrsMessageArray* mw_cache;  
180 - // the count of msg in fast cache.  
181 - int mw_count;  
182 - // the packet time in fast cache.  
183 - int64_t mw_first_pkt;  
184 - int64_t mw_last_pkt;  
185 -#endif  
186 public: 168 public:
187 SrsConsumer(SrsSource* _source); 169 SrsConsumer(SrsSource* _source);
188 virtual ~SrsConsumer(); 170 virtual ~SrsConsumer();
@@ -216,27 +198,11 @@ public: @@ -216,27 +198,11 @@ public:
216 * @param count the count in array, output param. 198 * @param count the count in array, output param.
217 * @max_count the max count to dequeue, must be positive. 199 * @max_count the max count to dequeue, must be positive.
218 */ 200 */
219 - virtual int dump_packets(SrsMessageArray* msgs, int* count);  
220 -#ifdef SRS_PERF_QUEUE_COND_WAIT  
221 - /**  
222 - * wait for messages incomming, atleast nb_msgs and in duration.  
223 - * @param nb_msgs the messages count to wait.  
224 - * @param duration the messgae duration to wait.  
225 - */  
226 - virtual void wait(int nb_msgs, int duration);  
227 -#endif 201 + virtual int dump_packets(SrsMessageArray* msgs, int& count);
228 /** 202 /**
229 * when client send the pause message. 203 * when client send the pause message.
230 */ 204 */
231 virtual int on_play_client_pause(bool is_pause); 205 virtual int on_play_client_pause(bool is_pause);
232 -private:  
233 -#ifdef SRS_PERF_QUEUE_FAST_CACHE  
234 - /**  
235 - * dumps the queue to fast cache,  
236 - * when fast cache is clear or queue is overflow.  
237 - */  
238 - virtual int dumps_queue_to_fast_cache();  
239 -#endif  
240 }; 206 };
241 207
242 /** 208 /**
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 58 34 +#define VERSION_REVISION 59
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -100,28 +100,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -100,28 +100,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
100 * how many msgs can be send entirely. 100 * how many msgs can be send entirely.
101 * for play clients to get msgs then totally send out. 101 * for play clients to get msgs then totally send out.
102 * for the mw sleep set to 1800, the msgs is about 133. 102 * for the mw sleep set to 1800, the msgs is about 133.
103 -* @remark, recomment to 256. 103 +* @remark, recomment to 128.
104 */ 104 */
105 -#define SRS_PERF_MW_MSGS 256  
106 -/**  
107 -* how many msgs atleast to send.  
108 -* @remark, recomment to 32.  
109 -*/  
110 -#define SRS_PERF_MW_MIN_MSGS 32  
111 -  
112 -/**  
113 -* whether enable the fast cache.  
114 -* @remark this improve performance for large connectios.  
115 -* @remark this also introduce complex, default to disable it.  
116 -* @see https://github.com/winlinvip/simple-rtmp-server/issues/251  
117 -*/  
118 -#undef SRS_PERF_QUEUE_FAST_CACHE  
119 -/**  
120 -* whether use cond wait to send messages.  
121 -* @remark this improve performance for large connectios.  
122 -* @see https://github.com/winlinvip/simple-rtmp-server/issues/251  
123 -*/  
124 -#undef SRS_PERF_QUEUE_COND_WAIT 105 +#define SRS_PERF_MW_MSGS 128
125 106
126 /** 107 /**
127 * how many chunk stream to cache, [0, N]. 108 * how many chunk stream to cache, [0, N].
@@ -736,8 +736,20 @@ int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs) @@ -736,8 +736,20 @@ int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs)
736 if (iov_index <= 0) { 736 if (iov_index <= 0) {
737 return ret; 737 return ret;
738 } 738 }
739 - srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d", 739 +
  740 + // calc the bytes of iovs, for debug.
  741 +#if 0
  742 + int nb_bytes = 0;
  743 + for (int i = 0; i < iov_index; i++) {
  744 + iovec* iov = out_iovs + i;
  745 + nb_bytes += iov->iov_len;
  746 + }
  747 + srs_warn("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d",
  748 + nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
  749 +#else
  750 + srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d",
740 nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); 751 nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
  752 +#endif
741 753
742 // send by writev 754 // send by writev
743 // sendout header and payload by writev. 755 // sendout header and payload by writev.