winlin

for bug #251, refine the send use cond wait.

@@ -598,6 +598,10 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -598,6 +598,10 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
598 // collect elapse for pithy print. 598 // collect elapse for pithy print.
599 pithy_print.elapse(); 599 pithy_print.elapse();
600 600
  601 + // wait for message to incoming.
  602 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  603 + consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
  604 +
601 // get messages from consumer. 605 // get messages from consumer.
602 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. 606 // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
603 int count = 0; 607 int count = 0;
@@ -606,12 +610,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -606,12 +610,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
606 return ret; 610 return ret;
607 } 611 }
608 612
609 - // no message to send, sleep a while.  
610 - if (count <= 0) {  
611 - srs_verbose("sleep for no messages to send");  
612 - st_usleep(mw_sleep * 1000);  
613 - }  
614 - srs_info("got %d msgs, mw=%d", count, mw_sleep); 613 + // we use wait to get messages, so the count must be positive.
  614 + srs_assert(count > 0);
  615 + srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
615 616
616 // reportable 617 // reportable
617 if (pithy_print.can_print()) { 618 if (pithy_print.can_print()) {
@@ -166,6 +166,16 @@ SrsMessageQueue::~SrsMessageQueue() @@ -166,6 +166,16 @@ SrsMessageQueue::~SrsMessageQueue()
166 clear(); 166 clear();
167 } 167 }
168 168
  169 +int SrsMessageQueue::count()
  170 +{
  171 + return (int)msgs.size();
  172 +}
  173 +
  174 +int SrsMessageQueue::duration()
  175 +{
  176 + return (int)(av_end_time - av_start_time);
  177 +}
  178 +
169 void SrsMessageQueue::set_queue_size(double queue_size) 179 void SrsMessageQueue::set_queue_size(double queue_size)
170 { 180 {
171 queue_size_ms = (int)(queue_size * 1000); 181 queue_size_ms = (int)(queue_size * 1000);
@@ -290,6 +300,11 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -290,6 +300,11 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
290 jitter = new SrsRtmpJitter(); 300 jitter = new SrsRtmpJitter();
291 queue = new SrsMessageQueue(); 301 queue = new SrsMessageQueue();
292 should_update_source_id = false; 302 should_update_source_id = false;
  303 +
  304 + mw_wait = st_cond_new();
  305 + mw_min_msgs = 0;
  306 + mw_duration = 0;
  307 + mw_waiting = false;
293 } 308 }
294 309
295 SrsConsumer::~SrsConsumer() 310 SrsConsumer::~SrsConsumer()
@@ -297,6 +312,7 @@ SrsConsumer::~SrsConsumer() @@ -297,6 +312,7 @@ SrsConsumer::~SrsConsumer()
297 source->on_consumer_destroy(this); 312 source->on_consumer_destroy(this);
298 srs_freep(jitter); 313 srs_freep(jitter);
299 srs_freep(queue); 314 srs_freep(queue);
  315 + st_cond_destroy(mw_wait);
300 } 316 }
301 317
302 void SrsConsumer::set_queue_size(double queue_size) 318 void SrsConsumer::set_queue_size(double queue_size)
@@ -329,6 +345,12 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S @@ -329,6 +345,12 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
329 return ret; 345 return ret;
330 } 346 }
331 347
  348 + // fire the mw when msgs is enough.
  349 + if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
  350 + st_cond_signal(mw_wait);
  351 + mw_waiting = false;
  352 + }
  353 +
332 return ret; 354 return ret;
333 } 355 }
334 356
@@ -349,6 +371,22 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count) @@ -349,6 +371,22 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
349 return queue->dump_packets(max_count, pmsgs, count); 371 return queue->dump_packets(max_count, pmsgs, count);
350 } 372 }
351 373
  374 +void SrsConsumer::wait(int nb_msgs, int duration)
  375 +{
  376 + mw_min_msgs = nb_msgs;
  377 + mw_duration = duration;
  378 +
  379 + // already ok, donot wait.
  380 + if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
  381 + return;
  382 + }
  383 +
  384 + // the enqueue will notify this cond.
  385 + mw_waiting = true;
  386 +
  387 + st_cond_wait(mw_wait);
  388 +}
  389 +
352 int SrsConsumer::on_play_client_pause(bool is_pause) 390 int SrsConsumer::on_play_client_pause(bool is_pause)
353 { 391 {
354 int ret = ERROR_SUCCESS; 392 int ret = ERROR_SUCCESS;
@@ -116,6 +116,14 @@ public: @@ -116,6 +116,14 @@ public:
116 virtual ~SrsMessageQueue(); 116 virtual ~SrsMessageQueue();
117 public: 117 public:
118 /** 118 /**
  119 + * get the count of queue.
  120 + */
  121 + virtual int count();
  122 + /**
  123 + * get duration of queue.
  124 + */
  125 + virtual int duration();
  126 + /**
119 * set the queue size 127 * set the queue size
120 * @param queue_size the queue size in seconds. 128 * @param queue_size the queue size in seconds.
121 */ 129 */
@@ -154,6 +162,12 @@ private: @@ -154,6 +162,12 @@ private:
154 bool paused; 162 bool paused;
155 // when source id changed, notice all consumers 163 // when source id changed, notice all consumers
156 bool should_update_source_id; 164 bool should_update_source_id;
  165 + // the cond wait for mw.
  166 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  167 + st_cond_t mw_wait;
  168 + bool mw_waiting;
  169 + int mw_min_msgs;
  170 + int mw_duration;
157 public: 171 public:
158 SrsConsumer(SrsSource* _source); 172 SrsConsumer(SrsSource* _source);
159 virtual ~SrsConsumer(); 173 virtual ~SrsConsumer();
@@ -189,6 +203,12 @@ public: @@ -189,6 +203,12 @@ public:
189 */ 203 */
190 virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count); 204 virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
191 /** 205 /**
  206 + * wait for messages incomming, atleast nb_msgs and in duration.
  207 + * @param nb_msgs the messages count to wait.
  208 + * @param duration the messgae duration to wait.
  209 + */
  210 + virtual void wait(int nb_msgs, int duration);
  211 + /**
192 * when client send the pause message. 212 * when client send the pause message.
193 */ 213 */
194 virtual int on_play_client_pause(bool is_pause); 214 virtual int on_play_client_pause(bool is_pause);
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 55 34 +#define VERSION_REVISION 56
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -93,6 +93,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -93,6 +93,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
93 * @remark, recomment to 156. 93 * @remark, recomment to 156.
94 */ 94 */
95 #define SRS_PERF_MW_MSGS 156 95 #define SRS_PERF_MW_MSGS 156
  96 +/**
  97 +* how many msgs atleast to send.
  98 +* @remark, recomment to 8.
  99 +*/
  100 +#define SRS_PERF_MW_MIN_MSGS 8
96 101
97 /** 102 /**
98 * how many chunk stream to cache, [0, N]. 103 * how many chunk stream to cache, [0, N].