winlin

fix #84: unpublish source when edge stop, clear gop cache

@@ -105,6 +105,9 @@ void SrsEdgeIngester::stop() @@ -105,6 +105,9 @@ void SrsEdgeIngester::stop()
105 srs_freep(client); 105 srs_freep(client);
106 srs_freep(io); 106 srs_freep(io);
107 kbps->set_io(NULL, NULL); 107 kbps->set_io(NULL, NULL);
  108 +
  109 + // notice to unpublish.
  110 + _source->on_unpublish();
108 } 111 }
109 112
110 int SrsEdgeIngester::cycle() 113 int SrsEdgeIngester::cycle()
@@ -285,8 +288,8 @@ int SrsEdgeIngester::connect_server() @@ -285,8 +288,8 @@ int SrsEdgeIngester::connect_server()
285 } 288 }
286 289
287 // open socket. 290 // open socket.
288 - srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d",  
289 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); 291 + srs_trace("edge connected, can_publish=%d, url=%s/%s, server=%s:%d",
  292 + _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port);
290 293
291 // TODO: FIXME: extract utility method 294 // TODO: FIXME: extract utility method
292 int sock = socket(AF_INET, SOCK_STREAM, 0); 295 int sock = socket(AF_INET, SOCK_STREAM, 0);
@@ -271,7 +271,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -271,7 +271,7 @@ int SrsRtmpConn::stream_service_cycle()
271 srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret); 271 srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
272 return ret; 272 return ret;
273 } 273 }
274 - srs_trace("set chunk_size=%d success", chunk_size); 274 + srs_info("set chunk_size=%d success", chunk_size);
275 275
276 // find a source to serve. 276 // find a source to serve.
277 SrsSource* source = NULL; 277 SrsSource* source = NULL;
@@ -296,16 +296,16 @@ int SrsRtmpConn::stream_service_cycle() @@ -296,16 +296,16 @@ int SrsRtmpConn::stream_service_cycle()
296 } 296 }
297 297
298 bool enabled_cache = _srs_config->get_gop_cache(req->vhost); 298 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
299 - srs_trace("source found, ip=%s, url=%s, enabled_cache=%d, edge=%d",  
300 - ip.c_str(), req->get_stream_url().c_str(), enabled_cache, vhost_is_edge); 299 + srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, id=%d",
  300 + req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id());
301 source->set_cache(enabled_cache); 301 source->set_cache(enabled_cache);
302 302
303 switch (type) { 303 switch (type) {
304 case SrsRtmpConnPlay: { 304 case SrsRtmpConnPlay: {
305 srs_verbose("start to play stream %s.", req->stream.c_str()); 305 srs_verbose("start to play stream %s.", req->stream.c_str());
306 306
307 - // notice edge to start for the first client.  
308 if (vhost_is_edge) { 307 if (vhost_is_edge) {
  308 + // notice edge to start for the first client.
309 if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { 309 if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) {
310 srs_error("notice edge start play stream failed. ret=%d", ret); 310 srs_error("notice edge start play stream failed. ret=%d", ret);
311 return ret; 311 return ret;
@@ -568,15 +568,18 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) @@ -568,15 +568,18 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
568 568
569 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); 569 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
570 570
571 - // notify the hls to prepare when publish start.  
572 - if ((ret = source->on_publish()) != ERROR_SUCCESS) {  
573 - srs_error("fmle hls on_publish failed. ret=%d", ret);  
574 - return ret;  
575 - }  
576 - srs_verbose("fmle hls on_publish success.");  
577 -  
578 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); 571 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
579 572
  573 + // when edge, ignore the publish event, directly proxy it.
  574 + if (vhost_is_edge) {
  575 + // notify the hls to prepare when publish start.
  576 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
  577 + srs_error("fmle hls on_publish failed. ret=%d", ret);
  578 + return ret;
  579 + }
  580 + srs_verbose("fmle hls on_publish success.");
  581 + }
  582 +
580 while (true) { 583 while (true) {
581 // switch to other st-threads. 584 // switch to other st-threads.
582 st_usleep(0); 585 st_usleep(0);
@@ -644,15 +647,18 @@ int SrsRtmpConn::flash_publish(SrsSource* source) @@ -644,15 +647,18 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
644 647
645 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); 648 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
646 649
647 - // notify the hls to prepare when publish start.  
648 - if ((ret = source->on_publish()) != ERROR_SUCCESS) {  
649 - srs_error("flash hls on_publish failed. ret=%d", ret);  
650 - return ret;  
651 - }  
652 - srs_verbose("flash hls on_publish success.");  
653 -  
654 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); 650 bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
655 651
  652 + // when edge, ignore the publish event, directly proxy it.
  653 + if (vhost_is_edge) {
  654 + // notify the hls to prepare when publish start.
  655 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
  656 + srs_error("flash hls on_publish failed. ret=%d", ret);
  657 + return ret;
  658 + }
  659 + srs_verbose("flash hls on_publish success.");
  660 + }
  661 +
656 while (true) { 662 while (true) {
657 // switch to other st-threads. 663 // switch to other st-threads.
658 st_usleep(0); 664 st_usleep(0);
@@ -465,6 +465,7 @@ SrsSource::SrsSource(SrsRequest* req) @@ -465,6 +465,7 @@ SrsSource::SrsSource(SrsRequest* req)
465 465
466 frame_rate = sample_rate = 0; 466 frame_rate = sample_rate = 0;
467 _can_publish = true; 467 _can_publish = true;
  468 + _source_id = -1;
468 469
469 play_edge = new SrsPlayEdge(); 470 play_edge = new SrsPlayEdge();
470 publish_edge = new SrsPublishEdge(); 471 publish_edge = new SrsPublishEdge();
@@ -791,6 +792,24 @@ int SrsSource::on_dvr_request_sh() @@ -791,6 +792,24 @@ int SrsSource::on_dvr_request_sh()
791 return ret; 792 return ret;
792 } 793 }
793 794
  795 +int SrsSource::on_source_id_changed(int id)
  796 +{
  797 + int ret = ERROR_SUCCESS;
  798 +
  799 + if (_source_id == id) {
  800 + return ret;
  801 + }
  802 +
  803 + _source_id = id;
  804 +
  805 + return ret;
  806 +}
  807 +
  808 +int SrsSource::source_id()
  809 +{
  810 + return _source_id;
  811 +}
  812 +
794 bool SrsSource::can_publish() 813 bool SrsSource::can_publish()
795 { 814 {
796 return _can_publish; 815 return _can_publish;
@@ -1181,6 +1200,10 @@ int SrsSource::on_publish() @@ -1181,6 +1200,10 @@ int SrsSource::on_publish()
1181 1200
1182 _can_publish = false; 1201 _can_publish = false;
1183 1202
  1203 + // whatever, the publish thread is the source or edge source,
  1204 + // save its id to srouce id.
  1205 + on_source_id_changed(_srs_context->get_id());
  1206 +
1184 // create forwarders 1207 // create forwarders
1185 if ((ret = create_forwarders()) != ERROR_SUCCESS) { 1208 if ((ret = create_forwarders()) != ERROR_SUCCESS) {
1186 srs_error("create forwarders failed. ret=%d", ret); 1209 srs_error("create forwarders failed. ret=%d", ret);
@@ -1239,6 +1262,7 @@ void SrsSource::on_unpublish() @@ -1239,6 +1262,7 @@ void SrsSource::on_unpublish()
1239 srs_trace("clear cache/metadata/sequence-headers when unpublish."); 1262 srs_trace("clear cache/metadata/sequence-headers when unpublish.");
1240 1263
1241 _can_publish = true; 1264 _can_publish = true;
  1265 + _source_id = -1;
1242 } 1266 }
1243 1267
1244 int SrsSource::create_consumer(SrsConsumer*& consumer) 1268 int SrsSource::create_consumer(SrsConsumer*& consumer)
@@ -229,6 +229,12 @@ public: @@ -229,6 +229,12 @@ public:
229 */ 229 */
230 static void destroy(); 230 static void destroy();
231 private: 231 private:
  232 + // source id,
  233 + // for publish, it's the publish client id.
  234 + // for edge, it's the edge ingest id.
  235 + // when source id changed, for example, the edge reconnect,
  236 + // invoke the on_source_id_changed() to let all clients know.
  237 + int _source_id;
232 // deep copy of client request. 238 // deep copy of client request.
233 SrsRequest* _req; 239 SrsRequest* _req;
234 // to delivery stream to clients. 240 // to delivery stream to clients.
@@ -298,6 +304,7 @@ public: @@ -298,6 +304,7 @@ public:
298 virtual int on_reload_vhost_hls(std::string vhost); 304 virtual int on_reload_vhost_hls(std::string vhost);
299 virtual int on_reload_vhost_dvr(std::string vhost); 305 virtual int on_reload_vhost_dvr(std::string vhost);
300 virtual int on_reload_vhost_transcode(std::string vhost); 306 virtual int on_reload_vhost_transcode(std::string vhost);
  307 +// for the tools callback
301 public: 308 public:
302 // for the SrsForwarder to callback to request the sequence headers. 309 // for the SrsForwarder to callback to request the sequence headers.
303 virtual int on_forwarder_start(SrsForwarder* forwarder); 310 virtual int on_forwarder_start(SrsForwarder* forwarder);
@@ -305,6 +312,11 @@ public: @@ -305,6 +312,11 @@ public:
305 virtual int on_hls_start(); 312 virtual int on_hls_start();
306 // for the SrsDvr to callback to request the sequence headers. 313 // for the SrsDvr to callback to request the sequence headers.
307 virtual int on_dvr_request_sh(); 314 virtual int on_dvr_request_sh();
  315 + // source id changed.
  316 + virtual int on_source_id_changed(int id);
  317 + // get current source id.
  318 + virtual int source_id();
  319 +// logic data methods
308 public: 320 public:
309 virtual bool can_publish(); 321 virtual bool can_publish();
310 virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata); 322 virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata);
@@ -318,6 +330,7 @@ public: @@ -318,6 +330,7 @@ public:
318 */ 330 */
319 virtual int on_publish(); 331 virtual int on_publish();
320 virtual void on_unpublish(); 332 virtual void on_unpublish();
  333 +// consumer methods
321 public: 334 public:
322 virtual int create_consumer(SrsConsumer*& consumer); 335 virtual int create_consumer(SrsConsumer*& consumer);
323 virtual void on_consumer_destroy(SrsConsumer* consumer); 336 virtual void on_consumer_destroy(SrsConsumer* consumer);
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "118" 34 +#define VERSION_REVISION "119"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"