winlin

for bug #251, 9k+ clients, use fast cache for msgs queue. 2.0.57

@@ -485,6 +485,7 @@ Supported operating systems and hardware: @@ -485,6 +485,7 @@ Supported operating systems and hardware:
485 * 2013-10-17, Created.<br/> 485 * 2013-10-17, Created.<br/>
486 486
487 ## History 487 ## History
  488 +* v2.0, 2014-12-05, fix [#251](https://github.com/winlinvip/simple-rtmp-server/issues/251), 9k+ clients, use fast cache for msgs queue. 2.0.57
488 * v2.0, 2014-12-04, fix [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), add mw(merged-write) config. 2.0.53 489 * v2.0, 2014-12-04, fix [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), add mw(merged-write) config. 2.0.53
489 * v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52. 490 * v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
490 * v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50 491 * v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50
@@ -605,7 +605,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -605,7 +605,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
605 // get messages from consumer. 605 // get messages from consumer.
606 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. 606 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
607 int count = 0; 607 int count = 0;
608 - if ((ret = consumer->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) { 608 + if ((ret = consumer->dump_packets(&msgs, &count)) != ERROR_SUCCESS) {
609 srs_error("get messages from consumer failed. ret=%d", ret); 609 srs_error("get messages from consumer failed. ret=%d", ret);
610 return ret; 610 return ret;
611 } 611 }
@@ -41,6 +41,7 @@ using namespace std; @@ -41,6 +41,7 @@ using namespace std;
41 #include <srs_app_edge.hpp> 41 #include <srs_app_edge.hpp>
42 #include <srs_kernel_utility.hpp> 42 #include <srs_kernel_utility.hpp>
43 #include <srs_app_avc_aac.hpp> 43 #include <srs_app_avc_aac.hpp>
  44 +#include <srs_protocol_msg_array.hpp>
44 45
45 #define CONST_MAX_JITTER_MS 500 46 #define CONST_MAX_JITTER_MS 500
46 #define DEFAULT_FRAME_TIME_MS 40 47 #define DEFAULT_FRAME_TIME_MS 40
@@ -166,22 +167,12 @@ SrsMessageQueue::~SrsMessageQueue() @@ -166,22 +167,12 @@ SrsMessageQueue::~SrsMessageQueue()
166 clear(); 167 clear();
167 } 168 }
168 169
169 -int SrsMessageQueue::count()  
170 -{  
171 - return (int)msgs.size();  
172 -}  
173 -  
174 -int SrsMessageQueue::duration()  
175 -{  
176 - return (int)(av_end_time - av_start_time);  
177 -}  
178 -  
179 void SrsMessageQueue::set_queue_size(double queue_size) 170 void SrsMessageQueue::set_queue_size(double queue_size)
180 { 171 {
181 queue_size_ms = (int)(queue_size * 1000); 172 queue_size_ms = (int)(queue_size * 1000);
182 } 173 }
183 174
184 -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) 175 +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
185 { 176 {
186 int ret = ERROR_SUCCESS; 177 int ret = ERROR_SUCCESS;
187 178
@@ -196,6 +187,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) @@ -196,6 +187,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
196 msgs.push_back(msg); 187 msgs.push_back(msg);
197 188
198 while (av_end_time - av_start_time > queue_size_ms) { 189 while (av_end_time - av_start_time > queue_size_ms) {
  190 + // notice the caller queue already overflow and shrinked.
  191 + if (is_overflow) {
  192 + *is_overflow = true;
  193 + }
  194 +
199 shrink(); 195 shrink();
200 } 196 }
201 197
@@ -305,10 +301,20 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -305,10 +301,20 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
305 mw_min_msgs = 0; 301 mw_min_msgs = 0;
306 mw_duration = 0; 302 mw_duration = 0;
307 mw_waiting = false; 303 mw_waiting = false;
  304 +
  305 + mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);
  306 + mw_count = 0;
  307 + mw_first_pkt = mw_last_pkt = 0;
308 } 308 }
309 309
310 SrsConsumer::~SrsConsumer() 310 SrsConsumer::~SrsConsumer()
311 { 311 {
  312 + if (mw_cache) {
  313 + mw_cache->free(mw_count);
  314 + mw_count = 0;
  315 + }
  316 + srs_freep(mw_cache);
  317 +
312 source->on_consumer_destroy(this); 318 source->on_consumer_destroy(this);
313 srs_freep(jitter); 319 srs_freep(jitter);
314 srs_freep(queue); 320 srs_freep(queue);
@@ -341,22 +347,53 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S @@ -341,22 +347,53 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
341 } 347 }
342 } 348 }
343 349
344 - if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {  
345 - return ret; 350 + // use fast cache if available
  351 + if (mw_count < mw_cache->max) {
  352 + // update fast cache timestamps
  353 + if (mw_count == 0) {
  354 + mw_first_pkt = msg->header.timestamp;
  355 + }
  356 + mw_last_pkt = msg->header.timestamp;
  357 +
  358 + mw_cache->msgs[mw_count++] = msg;
  359 + } else{
  360 + // fast cache is full, use queue.
  361 + bool is_overflow = false;
  362 + if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) {
  363 + return ret;
  364 + }
  365 + // when overflow, clear cache and refresh the fast cache.
  366 + if (is_overflow) {
  367 + mw_cache->free(mw_count);
  368 + if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) {
  369 + return ret;
  370 + }
  371 + }
346 } 372 }
347 373
348 // fire the mw when msgs is enough. 374 // fire the mw when msgs is enough.
349 - if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) {  
350 - st_cond_signal(mw_wait);  
351 - mw_waiting = false; 375 + if (mw_waiting) {
  376 + // when fast cache not overflow, always flush.
  377 + // so we donot care about the queue.
  378 + bool fast_cache_overflow = mw_count >= mw_cache->max;
  379 + int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
  380 + bool match_min_msgs = mw_count > mw_min_msgs;
  381 +
  382 + // when fast cache overflow, or duration ok, signal to flush.
  383 + if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
  384 + st_cond_signal(mw_wait);
  385 + mw_waiting = false;
  386 + }
352 } 387 }
353 388
354 return ret; 389 return ret;
355 } 390 }
356 391
357 -int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count) 392 +int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
358 { 393 {
359 - srs_assert(max_count > 0); 394 + int ret =ERROR_SUCCESS;
  395 +
  396 + srs_assert(msgs->max > 0);
360 397
361 if (should_update_source_id) { 398 if (should_update_source_id) {
362 srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id()); 399 srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
@@ -365,10 +402,24 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count) @@ -365,10 +402,24 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
365 402
366 // paused, return nothing. 403 // paused, return nothing.
367 if (paused) { 404 if (paused) {
368 - return ERROR_SUCCESS; 405 + return ret;
  406 + }
  407 +
  408 + // only dumps an whole array to msgs.
  409 + for (int i = 0; i < mw_count; i++) {
  410 + msgs->msgs[i] = mw_cache->msgs[i];
369 } 411 }
  412 + *count = mw_count;
370 413
371 - return queue->dump_packets(max_count, pmsgs, count); 414 + // when fast cache is not filled,
  415 + // we donot check the queue, direclty zero fast cache.
  416 + if (mw_count < mw_cache->max) {
  417 + mw_count = 0;
  418 + mw_first_pkt = mw_last_pkt = 0;
  419 + return ret;
  420 + }
  421 +
  422 + return dumps_queue_to_fast_cache();
372 } 423 }
373 424
374 void SrsConsumer::wait(int nb_msgs, int duration) 425 void SrsConsumer::wait(int nb_msgs, int duration)
@@ -376,14 +427,20 @@ void SrsConsumer::wait(int nb_msgs, int duration) @@ -376,14 +427,20 @@ void SrsConsumer::wait(int nb_msgs, int duration)
376 mw_min_msgs = nb_msgs; 427 mw_min_msgs = nb_msgs;
377 mw_duration = duration; 428 mw_duration = duration;
378 429
379 - // already ok, donot wait.  
380 - if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) { 430 + // when fast cache not overflow, always flush.
  431 + // so we donot care about the queue.
  432 + bool fast_cache_overflow = mw_count >= mw_cache->max;
  433 + int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
  434 + bool match_min_msgs = mw_count > mw_min_msgs;
  435 +
  436 + // when fast cache overflow, or duration ok, signal to flush.
  437 + if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
381 return; 438 return;
382 } 439 }
383 440
384 // the enqueue will notify this cond. 441 // the enqueue will notify this cond.
385 mw_waiting = true; 442 mw_waiting = true;
386 - 443 + // wait for msgs to incoming.
387 st_cond_wait(mw_wait); 444 st_cond_wait(mw_wait);
388 } 445 }
389 446
@@ -397,6 +454,26 @@ int SrsConsumer::on_play_client_pause(bool is_pause) @@ -397,6 +454,26 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
397 return ret; 454 return ret;
398 } 455 }
399 456
  457 +int SrsConsumer::dumps_queue_to_fast_cache()
  458 +{
  459 + int ret =ERROR_SUCCESS;
  460 +
  461 + // fill fast cache with queue.
  462 + if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) {
  463 + return ret;
  464 + }
  465 + // set the timestamp when got message.
  466 + if (mw_count > 0) {
  467 + SrsMessage* first_msg = mw_cache->msgs[0];
  468 + mw_first_pkt = first_msg->header.timestamp;
  469 +
  470 + SrsMessage* last_msg = mw_cache->msgs[mw_count - 1];
  471 + mw_last_pkt = last_msg->header.timestamp;
  472 + }
  473 +
  474 + return ret;
  475 +}
  476 +
400 SrsGopCache::SrsGopCache() 477 SrsGopCache::SrsGopCache()
401 { 478 {
402 cached_video_count = 0; 479 cached_video_count = 0;
@@ -48,6 +48,7 @@ class SrsRequest; @@ -48,6 +48,7 @@ class SrsRequest;
48 class SrsStSocket; 48 class SrsStSocket;
49 class SrsRtmpServer; 49 class SrsRtmpServer;
50 class SrsEdgeProxyContext; 50 class SrsEdgeProxyContext;
  51 +class SrsMessageArray;
51 #ifdef SRS_AUTO_HLS 52 #ifdef SRS_AUTO_HLS
52 class SrsHls; 53 class SrsHls;
53 #endif 54 #endif
@@ -116,14 +117,6 @@ public: @@ -116,14 +117,6 @@ public:
116 virtual ~SrsMessageQueue(); 117 virtual ~SrsMessageQueue();
117 public: 118 public:
118 /** 119 /**
119 - * get the count of queue.  
120 - */  
121 - virtual int count();  
122 - /**  
123 - * get duration of queue.  
124 - */  
125 - virtual int duration();  
126 - /**  
127 * set the queue size 120 * set the queue size
128 * @param queue_size the queue size in seconds. 121 * @param queue_size the queue size in seconds.
129 */ 122 */
@@ -132,8 +125,9 @@ public: @@ -132,8 +125,9 @@ public:
132 /** 125 /**
133 * enqueue the message, the timestamp always monotonically. 126 * enqueue the message, the timestamp always monotonically.
134 * @param msg, the msg to enqueue, user never free it whatever the return code. 127 * @param msg, the msg to enqueue, user never free it whatever the return code.
  128 + * @param is_overflow, whether overflow and shrinked. NULL to ignore.
135 */ 129 */
136 - virtual int enqueue(SrsSharedPtrMessage* msg); 130 + virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
137 /** 131 /**
138 * get packets in consumer queue. 132 * get packets in consumer queue.
139 * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it. 133 * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
@@ -168,6 +162,14 @@ private: @@ -168,6 +162,14 @@ private:
168 bool mw_waiting; 162 bool mw_waiting;
169 int mw_min_msgs; 163 int mw_min_msgs;
170 int mw_duration; 164 int mw_duration;
  165 + // use fast cache for msgs
  166 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  167 + SrsMessageArray* mw_cache;
  168 + // the count of msg in fast cache.
  169 + int mw_count;
  170 + // the packet time in fast cache.
  171 + int64_t mw_first_pkt;
  172 + int64_t mw_last_pkt;
171 public: 173 public:
172 SrsConsumer(SrsSource* _source); 174 SrsConsumer(SrsSource* _source);
173 virtual ~SrsConsumer(); 175 virtual ~SrsConsumer();
@@ -197,11 +199,11 @@ public: @@ -197,11 +199,11 @@ public:
197 virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag); 199 virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag);
198 /** 200 /**
199 * get packets in consumer queue. 201 * get packets in consumer queue.
200 - * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.  
201 - * @count the count in array, output param. 202 + * @param msgs the msgs array to dump packets to send.
  203 + * @param count the count in array, output param.
202 * @max_count the max count to dequeue, must be positive. 204 * @max_count the max count to dequeue, must be positive.
203 */ 205 */
204 - virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count); 206 + virtual int dump_packets(SrsMessageArray* msgs, int* count);
205 /** 207 /**
206 * wait for messages incomming, atleast nb_msgs and in duration. 208 * wait for messages incomming, atleast nb_msgs and in duration.
207 * @param nb_msgs the messages count to wait. 209 * @param nb_msgs the messages count to wait.
@@ -212,6 +214,12 @@ public: @@ -212,6 +214,12 @@ public:
212 * when client send the pause message. 214 * when client send the pause message.
213 */ 215 */
214 virtual int on_play_client_pause(bool is_pause); 216 virtual int on_play_client_pause(bool is_pause);
  217 +private:
  218 + /**
  219 + * dumps the queue to fast cache,
  220 + * when fast cache is clear or queue is overflow.
  221 + */
  222 + virtual int dumps_queue_to_fast_cache();
215 }; 223 };
216 224
217 /** 225 /**
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 56 34 +#define VERSION_REVISION 57
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -75,14 +75,24 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -75,14 +75,24 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
75 * @see SrsConfig::get_mw_sleep_ms() 75 * @see SrsConfig::get_mw_sleep_ms()
76 * @remark the mw sleep and msgs to send, maybe: 76 * @remark the mw sleep and msgs to send, maybe:
77 * mw_sleep msgs iovs 77 * mw_sleep msgs iovs
78 -* 350 24/48 48/84  
79 -* 500 24/48 48/84  
80 -* 800 42/64 84/128  
81 -* 1000 64/85 128/170  
82 -* 1200 65/86 130/172  
83 -* 1500 87/110 174/220  
84 -* 1800 106/128 212/256  
85 -* 2000 134/142 268/284 78 +* 350 43 86
  79 +* 400 44 88
  80 +* 500 46 92
  81 +* 600 46 92
  82 +* 700 82 164
  83 +* 800 81 162
  84 +* 900 80 160
  85 +* 1000 88 176
  86 +* 1100 91 182
  87 +* 1200 89 178
  88 +* 1300 119 238
  89 +* 1400 120 240
  90 +* 1500 119 238
  91 +* 1600 131 262
  92 +* 1700
  93 +* 1800
  94 +* 1900
  95 +* 2000
86 */ 96 */
87 // the default config of mw. 97 // the default config of mw.
88 #define SRS_PERF_MW_SLEEP 350 98 #define SRS_PERF_MW_SLEEP 350
@@ -32,10 +32,7 @@ SrsMessageArray::SrsMessageArray(int max_msgs) @@ -32,10 +32,7 @@ SrsMessageArray::SrsMessageArray(int max_msgs)
32 msgs = new SrsMessage*[max_msgs]; 32 msgs = new SrsMessage*[max_msgs];
33 max = max_msgs; 33 max = max_msgs;
34 34
35 - // initialize  
36 - for (int i = 0; i < max_msgs; i++) {  
37 - msgs[i] = NULL;  
38 - } 35 + zero(max_msgs);
39 } 36 }
40 37
41 SrsMessageArray::~SrsMessageArray() 38 SrsMessageArray::~SrsMessageArray()
@@ -46,4 +43,23 @@ SrsMessageArray::~SrsMessageArray() @@ -46,4 +43,23 @@ SrsMessageArray::~SrsMessageArray()
46 srs_freep(msgs); 43 srs_freep(msgs);
47 } 44 }
48 45
  46 +void SrsMessageArray::free(int count)
  47 +{
  48 + // initialize
  49 + for (int i = 0; i < count; i++) {
  50 + SrsMessage* msg = msgs[i];
  51 + srs_freep(msg);
  52 +
  53 + msgs[i] = NULL;
  54 + }
  55 +}
  56 +
  57 +void SrsMessageArray::zero(int count)
  58 +{
  59 + // initialize
  60 + for (int i = 0; i < count; i++) {
  61 + msgs[i] = NULL;
  62 + }
  63 +}
  64 +
49 65
@@ -60,6 +60,16 @@ public: @@ -60,6 +60,16 @@ public:
60 * free the msgs not sent out(not NULL). 60 * free the msgs not sent out(not NULL).
61 */ 61 */
62 virtual ~SrsMessageArray(); 62 virtual ~SrsMessageArray();
  63 +public:
  64 + /**
  65 + * free specified count of messages.
  66 + */
  67 + virtual void free(int count);
  68 +private:
  69 + /**
  70 + * zero initialize the message array.
  71 + */
  72 + virtual void zero(int count);
63 }; 73 };
64 74
65 #endif 75 #endif