winlin

merge from srs2.

@@ -117,7 +117,7 @@ int SrsBufferCache::cycle() @@ -117,7 +117,7 @@ int SrsBufferCache::cycle()
117 // the stream cache will create consumer to cache stream, 117 // the stream cache will create consumer to cache stream,
118 // which will trigger to fetch stream from origin for edge. 118 // which will trigger to fetch stream from origin for edge.
119 SrsConsumer* consumer = NULL; 119 SrsConsumer* consumer = NULL;
120 - if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { 120 + if ((ret = source->create_consumer(NULL, consumer, false, false, true)) != ERROR_SUCCESS) {
121 srs_error("http: create consumer failed. ret=%d", ret); 121 srs_error("http: create consumer failed. ret=%d", ret);
122 return ret; 122 return ret;
123 } 123 }
@@ -484,7 +484,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -484,7 +484,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
484 484
485 // create consumer of souce, ignore gop cache, use the audio gop cache. 485 // create consumer of souce, ignore gop cache, use the audio gop cache.
486 SrsConsumer* consumer = NULL; 486 SrsConsumer* consumer = NULL;
487 - if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { 487 + if ((ret = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) {
488 srs_error("http: create consumer failed. ret=%d", ret); 488 srs_error("http: create consumer failed. ret=%d", ret);
489 return ret; 489 return ret;
490 } 490 }
@@ -832,7 +832,7 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -832,7 +832,7 @@ int SrsRtmpConn::playing(SrsSource* source)
832 832
833 // create consumer of souce. 833 // create consumer of souce.
834 SrsConsumer* consumer = NULL; 834 SrsConsumer* consumer = NULL;
835 - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { 835 + if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
836 srs_error("create consumer failed. ret=%d", ret); 836 srs_error("create consumer failed. ret=%d", ret);
837 return ret; 837 return ret;
838 } 838 }
@@ -419,9 +419,10 @@ ISrsWakable::~ISrsWakable() @@ -419,9 +419,10 @@ ISrsWakable::~ISrsWakable()
419 { 419 {
420 } 420 }
421 421
422 -SrsConsumer::SrsConsumer(SrsSource* _source) 422 +SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
423 { 423 {
424 - source = _source; 424 + source = s;
  425 + conn = c;
425 paused = false; 426 paused = false;
426 jitter = new SrsRtmpJitter(); 427 jitter = new SrsRtmpJitter();
427 queue = new SrsMessageQueue(); 428 queue = new SrsMessageQueue();
@@ -2157,11 +2158,11 @@ void SrsSource::on_unpublish() @@ -2157,11 +2158,11 @@ void SrsSource::on_unpublish()
2157 handler->on_unpublish(this, req); 2158 handler->on_unpublish(this, req);
2158 } 2159 }
2159 2160
2160 -int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg) 2161 +int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
2161 { 2162 {
2162 int ret = ERROR_SUCCESS; 2163 int ret = ERROR_SUCCESS;
2163 2164
2164 - consumer = new SrsConsumer(this); 2165 + consumer = new SrsConsumer(this, conn);
2165 consumers.push_back(consumer); 2166 consumers.push_back(consumer);
2166 2167
2167 double queue_size = _srs_config->get_queue_length(req->vhost); 2168 double queue_size = _srs_config->get_queue_length(req->vhost);
@@ -52,6 +52,7 @@ class SrsRtmpServer; @@ -52,6 +52,7 @@ class SrsRtmpServer;
52 class SrsEdgeProxyContext; 52 class SrsEdgeProxyContext;
53 class SrsMessageArray; 53 class SrsMessageArray;
54 class SrsNgExec; 54 class SrsNgExec;
  55 +class SrsConnection;
55 #ifdef SRS_AUTO_HLS 56 #ifdef SRS_AUTO_HLS
56 class SrsHls; 57 class SrsHls;
57 #endif 58 #endif
@@ -225,6 +226,8 @@ private: @@ -225,6 +226,8 @@ private:
225 SrsRtmpJitter* jitter; 226 SrsRtmpJitter* jitter;
226 SrsSource* source; 227 SrsSource* source;
227 SrsMessageQueue* queue; 228 SrsMessageQueue* queue;
  229 + // the owner connection for debug, maybe NULL.
  230 + SrsConnection* conn;
228 bool paused; 231 bool paused;
229 // when source id changed, notice all consumers 232 // when source id changed, notice all consumers
230 bool should_update_source_id; 233 bool should_update_source_id;
@@ -237,7 +240,7 @@ private: @@ -237,7 +240,7 @@ private:
237 int mw_duration; 240 int mw_duration;
238 #endif 241 #endif
239 public: 242 public:
240 - SrsConsumer(SrsSource* _source); 243 + SrsConsumer(SrsSource* s, SrsConnection* c);
241 virtual ~SrsConsumer(); 244 virtual ~SrsConsumer();
242 public: 245 public:
243 /** 246 /**
@@ -575,7 +578,7 @@ public: @@ -575,7 +578,7 @@ public:
575 * @param dg, whether dumps the gop cache. 578 * @param dg, whether dumps the gop cache.
576 */ 579 */
577 virtual int create_consumer( 580 virtual int create_consumer(
578 - SrsConsumer*& consumer, 581 + SrsConnection* conn, SrsConsumer*& consumer,
579 bool ds = true, bool dm = true, bool dg = true 582 bool ds = true, bool dm = true, bool dg = true
580 ); 583 );
581 virtual void on_consumer_destroy(SrsConsumer* consumer); 584 virtual void on_consumer_destroy(SrsConsumer* consumer);