winlin

for bug #251, add queue fast cache. 2.0.65

@@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size) @@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size)
182 queue_size_ms = (int)(queue_size * 1000); 182 queue_size_ms = (int)(queue_size * 1000);
183 } 183 }
184 184
185 -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) 185 +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
186 { 186 {
187 int ret = ERROR_SUCCESS; 187 int ret = ERROR_SUCCESS;
188 188
@@ -197,6 +197,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) @@ -197,6 +197,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
197 msgs.push_back(msg); 197 msgs.push_back(msg);
198 198
199 while (av_end_time - av_start_time > queue_size_ms) { 199 while (av_end_time - av_start_time > queue_size_ms) {
  200 + // notice the caller queue already overflow and shrinked.
  201 + if (is_overflow) {
  202 + *is_overflow = true;
  203 + }
  204 +
200 shrink(); 205 shrink();
201 } 206 }
202 207
@@ -310,10 +315,23 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -310,10 +315,23 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
310 mw_duration = 0; 315 mw_duration = 0;
311 mw_waiting = false; 316 mw_waiting = false;
312 #endif 317 #endif
  318 +
  319 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  320 + mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);
  321 + mw_count = 0;
  322 + mw_first_pkt = mw_last_pkt = 0;
  323 +#endif
313 } 324 }
314 325
315 SrsConsumer::~SrsConsumer() 326 SrsConsumer::~SrsConsumer()
316 { 327 {
  328 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  329 + if (mw_cache) {
  330 + mw_cache->free(mw_count);
  331 + mw_count = 0;
  332 + }
  333 + srs_freep(mw_cache);
  334 +#endif
317 source->on_consumer_destroy(this); 335 source->on_consumer_destroy(this);
318 srs_freep(jitter); 336 srs_freep(jitter);
319 srs_freep(queue); 337 srs_freep(queue);
@@ -351,11 +369,37 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, @@ -351,11 +369,37 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv,
351 } 369 }
352 } 370 }
353 371
354 - if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { 372 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  373 + // use fast cache if available
  374 + if (mw_count < mw_cache->max) {
  375 + // update fast cache timestamps
  376 + if (mw_count == 0) {
  377 + mw_first_pkt = msg->timestamp;
  378 + }
  379 + mw_last_pkt = msg->timestamp;
  380 +
  381 + mw_cache->msgs[mw_count++] = msg;
  382 + } else{
  383 + // fast cache is full, use queue.
  384 + bool is_overflow = false;
  385 + if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) {
  386 + return ret;
  387 + }
  388 + // when overflow, clear cache and refresh the fast cache.
  389 + if (is_overflow) {
  390 + mw_cache->free(mw_count);
  391 + if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) {
  392 + return ret;
  393 + }
  394 + }
  395 + }
  396 +#else
  397 + if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {
355 return ret; 398 return ret;
356 } 399 }
  400 +#endif
357 401
358 - #ifdef SRS_PERF_QUEUE_COND_WAIT 402 +#ifdef SRS_PERF_QUEUE_COND_WAIT
359 // fire the mw when msgs is enough. 403 // fire the mw when msgs is enough.
360 if (mw_waiting) { 404 if (mw_waiting) {
361 int duration_ms = queue->duration(); 405 int duration_ms = queue->duration();
@@ -367,7 +411,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, @@ -367,7 +411,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv,
367 mw_waiting = false; 411 mw_waiting = false;
368 } 412 }
369 } 413 }
370 - #endif 414 +#endif
371 415
372 return ret; 416 return ret;
373 } 417 }
@@ -388,12 +432,30 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) @@ -388,12 +432,30 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
388 return ret; 432 return ret;
389 } 433 }
390 434
  435 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  436 + // only dumps an whole array to msgs.
  437 + for (int i = 0; i < mw_count; i++) {
  438 + msgs->msgs[i] = mw_cache->msgs[i];
  439 + }
  440 + count = mw_count;
  441 +
  442 + // when fast cache is not filled,
  443 + // we donot check the queue, direclty zero fast cache.
  444 + if (mw_count < mw_cache->max) {
  445 + mw_count = 0;
  446 + mw_first_pkt = mw_last_pkt = 0;
  447 + return ret;
  448 + }
  449 +
  450 + return dumps_queue_to_fast_cache();
  451 +#else
391 // pump msgs from queue. 452 // pump msgs from queue.
392 if ((ret = queue->dump_packets(msgs->max, msgs->msgs, count)) != ERROR_SUCCESS) { 453 if ((ret = queue->dump_packets(msgs->max, msgs->msgs, count)) != ERROR_SUCCESS) {
393 return ret; 454 return ret;
394 } 455 }
395 456
396 return ret; 457 return ret;
  458 +#endif
397 } 459 }
398 460
399 #ifdef SRS_PERF_QUEUE_COND_WAIT 461 #ifdef SRS_PERF_QUEUE_COND_WAIT
@@ -402,6 +464,18 @@ void SrsConsumer::wait(int nb_msgs, int duration) @@ -402,6 +464,18 @@ void SrsConsumer::wait(int nb_msgs, int duration)
402 mw_min_msgs = nb_msgs; 464 mw_min_msgs = nb_msgs;
403 mw_duration = duration; 465 mw_duration = duration;
404 466
  467 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  468 + // when fast cache not overflow, always flush.
  469 + // so we donot care about the queue.
  470 + bool fast_cache_overflow = mw_count >= mw_cache->max;
  471 + int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
  472 + bool match_min_msgs = mw_count > mw_min_msgs;
  473 +
  474 + // when fast cache overflow, or duration ok, signal to flush.
  475 + if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
  476 + return;
  477 + }
  478 +#else
405 int duration_ms = queue->duration(); 479 int duration_ms = queue->duration();
406 bool match_min_msgs = queue->size() > mw_min_msgs; 480 bool match_min_msgs = queue->size() > mw_min_msgs;
407 481
@@ -409,6 +483,7 @@ void SrsConsumer::wait(int nb_msgs, int duration) @@ -409,6 +483,7 @@ void SrsConsumer::wait(int nb_msgs, int duration)
409 if (match_min_msgs && duration_ms > mw_duration) { 483 if (match_min_msgs && duration_ms > mw_duration) {
410 return; 484 return;
411 } 485 }
  486 +#endif
412 487
413 // the enqueue will notify this cond. 488 // the enqueue will notify this cond.
414 mw_waiting = true; 489 mw_waiting = true;
@@ -427,6 +502,28 @@ int SrsConsumer::on_play_client_pause(bool is_pause) @@ -427,6 +502,28 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
427 return ret; 502 return ret;
428 } 503 }
429 504
  505 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  506 +int SrsConsumer::dumps_queue_to_fast_cache()
  507 +{
  508 + int ret =ERROR_SUCCESS;
  509 +
  510 + // fill fast cache with queue.
  511 + if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) {
  512 + return ret;
  513 + }
  514 + // set the timestamp when got message.
  515 + if (mw_count > 0) {
  516 + SrsSharedPtrMessage* first_msg = mw_cache->msgs[0];
  517 + mw_first_pkt = first_msg->timestamp;
  518 +
  519 + SrsSharedPtrMessage* last_msg = mw_cache->msgs[mw_count - 1];
  520 + mw_last_pkt = last_msg->timestamp;
  521 + }
  522 +
  523 + return ret;
  524 +}
  525 +#endif
  526 +
430 SrsGopCache::SrsGopCache() 527 SrsGopCache::SrsGopCache()
431 { 528 {
432 cached_video_count = 0; 529 cached_video_count = 0;
@@ -134,8 +134,9 @@ public: @@ -134,8 +134,9 @@ public:
134 /** 134 /**
135 * enqueue the message, the timestamp always monotonically. 135 * enqueue the message, the timestamp always monotonically.
136 * @param msg, the msg to enqueue, user never free it whatever the return code. 136 * @param msg, the msg to enqueue, user never free it whatever the return code.
  137 + * @param is_overflow, whether overflow and shrinked. NULL to ignore.
137 */ 138 */
138 - virtual int enqueue(SrsSharedPtrMessage* msg); 139 + virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
139 /** 140 /**
140 * get packets in consumer queue. 141 * get packets in consumer queue.
141 * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. 142 * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
@@ -172,6 +173,16 @@ private: @@ -172,6 +173,16 @@ private:
172 int mw_min_msgs; 173 int mw_min_msgs;
173 int mw_duration; 174 int mw_duration;
174 #endif 175 #endif
  176 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  177 + // use fast cache for msgs
  178 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  179 + SrsMessageArray* mw_cache;
  180 + // the count of msg in fast cache.
  181 + int mw_count;
  182 + // the packet time in fast cache.
  183 + int64_t mw_first_pkt;
  184 + int64_t mw_last_pkt;
  185 +#endif
175 public: 186 public:
176 SrsConsumer(SrsSource* _source); 187 SrsConsumer(SrsSource* _source);
177 virtual ~SrsConsumer(); 188 virtual ~SrsConsumer();
@@ -219,6 +230,14 @@ public: @@ -219,6 +230,14 @@ public:
219 * when client send the pause message. 230 * when client send the pause message.
220 */ 231 */
221 virtual int on_play_client_pause(bool is_pause); 232 virtual int on_play_client_pause(bool is_pause);
  233 +private:
  234 +#ifdef SRS_PERF_QUEUE_FAST_CACHE
  235 + /**
  236 + * dumps the queue to fast cache,
  237 + * when fast cache is clear or queue is overflow.
  238 + */
  239 + virtual int dumps_queue_to_fast_cache();
  240 +#endif
222 }; 241 };
223 242
224 /** 243 /**
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 64 34 +#define VERSION_REVISION 65
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -128,6 +128,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -128,6 +128,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
128 */ 128 */
129 #undef SRS_PERF_MW_SO_RCVBUF 129 #undef SRS_PERF_MW_SO_RCVBUF
130 /** 130 /**
  131 +* whether enable the fast cache.
  132 +* @remark this improve performance for large connectios.
  133 +* @remark this also introduce complex, default to disable it.
  134 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  135 +*/
  136 +#undef SRS_PERF_QUEUE_FAST_CACHE
  137 +/**
131 * whether use cond wait to send messages. 138 * whether use cond wait to send messages.
132 * @remark this improve performance for large connectios. 139 * @remark this improve performance for large connectios.
133 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251 140 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251