winlin

refine code for consumer to refer the rtmp connection.

@@ -116,7 +116,7 @@ int SrsStreamCache::cycle() @@ -116,7 +116,7 @@ int SrsStreamCache::cycle()
116 // the stream cache will create consumer to cache stream, 116 // the stream cache will create consumer to cache stream,
117 // which will trigger to fetch stream from origin for edge. 117 // which will trigger to fetch stream from origin for edge.
118 SrsConsumer* consumer = NULL; 118 SrsConsumer* consumer = NULL;
119 - if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { 119 + if ((ret = source->create_consumer(NULL, consumer, false, false, true)) != ERROR_SUCCESS) {
120 srs_error("http: create consumer failed. ret=%d", ret); 120 srs_error("http: create consumer failed. ret=%d", ret);
121 return ret; 121 return ret;
122 } 122 }
@@ -483,7 +483,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -483,7 +483,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
483 483
484 // create consumer of souce, ignore gop cache, use the audio gop cache. 484 // create consumer of souce, ignore gop cache, use the audio gop cache.
485 SrsConsumer* consumer = NULL; 485 SrsConsumer* consumer = NULL;
486 - if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { 486 + if ((ret = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) {
487 srs_error("http: create consumer failed. ret=%d", ret); 487 srs_error("http: create consumer failed. ret=%d", ret);
488 return ret; 488 return ret;
489 } 489 }
@@ -598,7 +598,7 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -598,7 +598,7 @@ int SrsRtmpConn::playing(SrsSource* source)
598 598
599 // create consumer of souce. 599 // create consumer of souce.
600 SrsConsumer* consumer = NULL; 600 SrsConsumer* consumer = NULL;
601 - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { 601 + if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
602 srs_error("create consumer failed. ret=%d", ret); 602 srs_error("create consumer failed. ret=%d", ret);
603 return ret; 603 return ret;
604 } 604 }
@@ -418,9 +418,10 @@ ISrsWakable::~ISrsWakable() @@ -418,9 +418,10 @@ ISrsWakable::~ISrsWakable()
418 { 418 {
419 } 419 }
420 420
421 -SrsConsumer::SrsConsumer(SrsSource* _source) 421 +SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
422 { 422 {
423 - source = _source; 423 + source = s;
  424 + conn = c;
424 paused = false; 425 paused = false;
425 jitter = new SrsRtmpJitter(); 426 jitter = new SrsRtmpJitter();
426 queue = new SrsMessageQueue(); 427 queue = new SrsMessageQueue();
@@ -2143,11 +2144,11 @@ void SrsSource::on_unpublish() @@ -2143,11 +2144,11 @@ void SrsSource::on_unpublish()
2143 handler->on_unpublish(this, _req); 2144 handler->on_unpublish(this, _req);
2144 } 2145 }
2145 2146
2146 -int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg) 2147 +int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
2147 { 2148 {
2148 int ret = ERROR_SUCCESS; 2149 int ret = ERROR_SUCCESS;
2149 2150
2150 - consumer = new SrsConsumer(this); 2151 + consumer = new SrsConsumer(this, conn);
2151 consumers.push_back(consumer); 2152 consumers.push_back(consumer);
2152 2153
2153 double queue_size = _srs_config->get_queue_length(_req->vhost); 2154 double queue_size = _srs_config->get_queue_length(_req->vhost);
@@ -51,6 +51,7 @@ class SrsStSocket; @@ -51,6 +51,7 @@ class SrsStSocket;
51 class SrsRtmpServer; 51 class SrsRtmpServer;
52 class SrsEdgeProxyContext; 52 class SrsEdgeProxyContext;
53 class SrsMessageArray; 53 class SrsMessageArray;
  54 +class SrsConnection;
54 #ifdef SRS_AUTO_HLS 55 #ifdef SRS_AUTO_HLS
55 class SrsHls; 56 class SrsHls;
56 #endif 57 #endif
@@ -224,6 +225,8 @@ private: @@ -224,6 +225,8 @@ private:
224 SrsRtmpJitter* jitter; 225 SrsRtmpJitter* jitter;
225 SrsSource* source; 226 SrsSource* source;
226 SrsMessageQueue* queue; 227 SrsMessageQueue* queue;
  228 + // the owner connection for debug, maybe NULL.
  229 + SrsConnection* conn;
227 bool paused; 230 bool paused;
228 // when source id changed, notice all consumers 231 // when source id changed, notice all consumers
229 bool should_update_source_id; 232 bool should_update_source_id;
@@ -236,7 +239,7 @@ private: @@ -236,7 +239,7 @@ private:
236 int mw_duration; 239 int mw_duration;
237 #endif 240 #endif
238 public: 241 public:
239 - SrsConsumer(SrsSource* _source); 242 + SrsConsumer(SrsSource* s, SrsConnection* c);
240 virtual ~SrsConsumer(); 243 virtual ~SrsConsumer();
241 public: 244 public:
242 /** 245 /**
@@ -571,7 +574,7 @@ public: @@ -571,7 +574,7 @@ public:
571 * @param dg, whether dumps the gop cache. 574 * @param dg, whether dumps the gop cache.
572 */ 575 */
573 virtual int create_consumer( 576 virtual int create_consumer(
574 - SrsConsumer*& consumer, 577 + SrsConnection* conn, SrsConsumer*& consumer,
575 bool ds = true, bool dm = true, bool dg = true 578 bool ds = true, bool dm = true, bool dg = true
576 ); 579 );
577 virtual void on_consumer_destroy(SrsConsumer* consumer); 580 virtual void on_consumer_destroy(SrsConsumer* consumer);
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 205 34 +#define VERSION_REVISION 206
35 35
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"