winlin

refine code, the consumer always alive longer than queue recv thread.

@@ -134,9 +134,10 @@ void SrsRecvThread::on_thread_stop() @@ -134,9 +134,10 @@ void SrsRecvThread::on_thread_stop()
134 handler->on_thread_stop(); 134 handler->on_thread_stop();
135 } 135 }
136 136
137 -SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) 137 +SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
138 : trd(this, rtmp_sdk, timeout_ms) 138 : trd(this, rtmp_sdk, timeout_ms)
139 { 139 {
  140 + _consumer = consumer;
140 rtmp = rtmp_sdk; 141 rtmp = rtmp_sdk;
141 recv_error_code = ERROR_SUCCESS; 142 recv_error_code = ERROR_SUCCESS;
142 _consumer = NULL; 143 _consumer = NULL;
@@ -237,11 +238,6 @@ void SrsQueueRecvThread::on_thread_stop() @@ -237,11 +238,6 @@ void SrsQueueRecvThread::on_thread_stop()
237 rtmp->set_auto_response(true); 238 rtmp->set_auto_response(true);
238 } 239 }
239 240
240 -void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer)  
241 -{  
242 - _consumer = consumer;  
243 -}  
244 -  
245 SrsPublishRecvThread::SrsPublishRecvThread( 241 SrsPublishRecvThread::SrsPublishRecvThread(
246 SrsRtmpServer* rtmp_sdk, 242 SrsRtmpServer* rtmp_sdk,
247 SrsRequest* _req, int mr_sock_fd, int timeout_ms, 243 SrsRequest* _req, int mr_sock_fd, int timeout_ms,
@@ -113,9 +113,9 @@ private: @@ -113,9 +113,9 @@ private:
113 SrsRtmpServer* rtmp; 113 SrsRtmpServer* rtmp;
114 // the recv thread error code. 114 // the recv thread error code.
115 int recv_error_code; 115 int recv_error_code;
116 - SrsConsumer *_consumer; 116 + SrsConsumer* _consumer;
117 public: 117 public:
118 - SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); 118 + SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms);
119 virtual ~SrsQueueRecvThread(); 119 virtual ~SrsQueueRecvThread();
120 public: 120 public:
121 virtual int start(); 121 virtual int start();
@@ -132,8 +132,6 @@ public: @@ -132,8 +132,6 @@ public:
132 public: 132 public:
133 virtual void on_thread_start(); 133 virtual void on_thread_start();
134 virtual void on_thread_stop(); 134 virtual void on_thread_stop();
135 -public:  
136 - virtual void set_consumer(SrsConsumer *consumer);  
137 }; 135 };
138 136
139 /** 137 /**
@@ -560,9 +560,18 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -560,9 +560,18 @@ int SrsRtmpConn::playing(SrsSource* source)
560 { 560 {
561 int ret = ERROR_SUCCESS; 561 int ret = ERROR_SUCCESS;
562 562
  563 + // create consumer of souce.
  564 + SrsConsumer* consumer = NULL;
  565 + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
  566 + srs_error("create consumer failed. ret=%d", ret);
  567 + return ret;
  568 + }
  569 + SrsAutoFree(SrsConsumer, consumer);
  570 + srs_verbose("consumer created success.");
  571 +
563 // use isolate thread to recv, 572 // use isolate thread to recv,
564 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 573 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
565 - SrsQueueRecvThread trd(rtmp, SRS_PERF_MW_SLEEP); 574 + SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
566 575
567 // start isolate recv thread. 576 // start isolate recv thread.
568 if ((ret = trd.start()) != ERROR_SUCCESS) { 577 if ((ret = trd.start()) != ERROR_SUCCESS) {
@@ -571,7 +580,7 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -571,7 +580,7 @@ int SrsRtmpConn::playing(SrsSource* source)
571 } 580 }
572 581
573 // delivery messages for clients playing stream. 582 // delivery messages for clients playing stream.
574 - ret = do_playing(source, &trd); 583 + ret = do_playing(source, consumer, &trd);
575 584
576 // stop isolate recv thread 585 // stop isolate recv thread
577 trd.stop(); 586 trd.stop();
@@ -584,27 +593,18 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -584,27 +593,18 @@ int SrsRtmpConn::playing(SrsSource* source)
584 return ret; 593 return ret;
585 } 594 }
586 595
587 -int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) 596 +int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd)
588 { 597 {
589 int ret = ERROR_SUCCESS; 598 int ret = ERROR_SUCCESS;
590 599
  600 + srs_assert(consumer != NULL);
  601 +
591 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) { 602 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) {
592 srs_error("check play_refer failed. ret=%d", ret); 603 srs_error("check play_refer failed. ret=%d", ret);
593 return ret; 604 return ret;
594 } 605 }
595 srs_verbose("check play_refer success."); 606 srs_verbose("check play_refer success.");
596 607
597 - SrsConsumer* consumer = NULL;  
598 - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {  
599 - srs_error("create consumer failed. ret=%d", ret);  
600 - return ret;  
601 - }  
602 -  
603 - srs_assert(consumer != NULL);  
604 - SrsAutoFree(SrsConsumer, consumer);  
605 - trd->set_consumer(consumer);  
606 - srs_verbose("consumer created success.");  
607 -  
608 // initialize other components 608 // initialize other components
609 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); 609 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
610 SrsMessageArray msgs(SRS_PERF_MW_MSGS); 610 SrsMessageArray msgs(SRS_PERF_MW_MSGS);
@@ -103,7 +103,7 @@ private: @@ -103,7 +103,7 @@ private:
103 virtual int stream_service_cycle(); 103 virtual int stream_service_cycle();
104 virtual int check_vhost(); 104 virtual int check_vhost();
105 virtual int playing(SrsSource* source); 105 virtual int playing(SrsSource* source);
106 - virtual int do_playing(SrsSource* source, SrsQueueRecvThread* trd); 106 + virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd);
107 virtual int fmle_publishing(SrsSource* source); 107 virtual int fmle_publishing(SrsSource* source);
108 virtual int flash_publishing(SrsSource* source); 108 virtual int flash_publishing(SrsSource* source);
109 virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); 109 virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);