winlin

support reload ingesters(added/removed/updated). change to 0.9.57.

@@ -221,6 +221,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -221,6 +221,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
221 * nginx v1.5.0: 139524 lines <br/> 221 * nginx v1.5.0: 139524 lines <br/>
222 222
223 ## History 223 ## History
  224 +* v1.0, 2014-04-10, support reload ingesters(added/removed/updated). change to 0.9.57.
224 * v1.0, 2014-04-07, [1.0 mainline(0.9.55)](https://github.com/winlinvip/simple-rtmp-server/releases/tag/1.0.mainline) released. 30000 lines. 225 * v1.0, 2014-04-07, [1.0 mainline(0.9.55)](https://github.com/winlinvip/simple-rtmp-server/releases/tag/1.0.mainline) released. 30000 lines.
225 * v1.0, 2014-04-07, support [ingest](https://github.com/winlinvip/simple-rtmp-server/wiki/SampleIngest) file/stream/device. 226 * v1.0, 2014-04-07, support [ingest](https://github.com/winlinvip/simple-rtmp-server/wiki/SampleIngest) file/stream/device.
226 * v1.0, 2014-04-05, support [http api](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPApi) and [http server](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPServer). 227 * v1.0, 2014-04-05, support [http api](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPApi) and [http server](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPServer).
@@ -568,7 +568,7 @@ int SrsConfig::reload() @@ -568,7 +568,7 @@ int SrsConfig::reload()
568 // merge config: vhost modified. 568 // merge config: vhost modified.
569 srs_trace("vhost %s modified, reload its detail.", vhost.c_str()); 569 srs_trace("vhost %s modified, reload its detail.", vhost.c_str());
570 if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) { 570 if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) {
571 - // gop_cache 571 + // gop_cache, only one per vhost
572 if (!srs_directive_equals(new_vhost->get("gop_cache"), old_vhost->get("gop_cache"))) { 572 if (!srs_directive_equals(new_vhost->get("gop_cache"), old_vhost->get("gop_cache"))) {
573 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 573 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
574 ISrsReloadHandler* subscribe = *it; 574 ISrsReloadHandler* subscribe = *it;
@@ -579,7 +579,7 @@ int SrsConfig::reload() @@ -579,7 +579,7 @@ int SrsConfig::reload()
579 } 579 }
580 srs_trace("vhost %s reload gop_cache success.", vhost.c_str()); 580 srs_trace("vhost %s reload gop_cache success.", vhost.c_str());
581 } 581 }
582 - // queue_length 582 + // queue_length, only one per vhost
583 if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) { 583 if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) {
584 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 584 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
585 ISrsReloadHandler* subscribe = *it; 585 ISrsReloadHandler* subscribe = *it;
@@ -590,7 +590,7 @@ int SrsConfig::reload() @@ -590,7 +590,7 @@ int SrsConfig::reload()
590 } 590 }
591 srs_trace("vhost %s reload queue_length success.", vhost.c_str()); 591 srs_trace("vhost %s reload queue_length success.", vhost.c_str());
592 } 592 }
593 - // forward 593 + // forward, only one per vhost
594 if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) { 594 if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) {
595 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 595 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
596 ISrsReloadHandler* subscribe = *it; 596 ISrsReloadHandler* subscribe = *it;
@@ -601,7 +601,7 @@ int SrsConfig::reload() @@ -601,7 +601,7 @@ int SrsConfig::reload()
601 } 601 }
602 srs_trace("vhost %s reload forward success.", vhost.c_str()); 602 srs_trace("vhost %s reload forward success.", vhost.c_str());
603 } 603 }
604 - // hls 604 + // hls, only one per vhost
605 if (!srs_directive_equals(new_vhost->get("hls"), old_vhost->get("hls"))) { 605 if (!srs_directive_equals(new_vhost->get("hls"), old_vhost->get("hls"))) {
606 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 606 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
607 ISrsReloadHandler* subscribe = *it; 607 ISrsReloadHandler* subscribe = *it;
@@ -612,7 +612,8 @@ int SrsConfig::reload() @@ -612,7 +612,8 @@ int SrsConfig::reload()
612 } 612 }
613 srs_trace("vhost %s reload hls success.", vhost.c_str()); 613 srs_trace("vhost %s reload hls success.", vhost.c_str());
614 } 614 }
615 - // transcode 615 + // TODO: FIXME: there might be many transcoders per vhost.
  616 + // transcode, only one per vhost
616 if (!srs_directive_equals(new_vhost->get("transcode"), old_vhost->get("transcode"))) { 617 if (!srs_directive_equals(new_vhost->get("transcode"), old_vhost->get("transcode"))) {
617 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 618 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
618 ISrsReloadHandler* subscribe = *it; 619 ISrsReloadHandler* subscribe = *it;
@@ -623,6 +624,10 @@ int SrsConfig::reload() @@ -623,6 +624,10 @@ int SrsConfig::reload()
623 } 624 }
624 srs_trace("vhost %s reload transcode success.", vhost.c_str()); 625 srs_trace("vhost %s reload transcode success.", vhost.c_str());
625 } 626 }
  627 + // ingest, many per vhost.
  628 + if ((ret = reload_ingest(new_vhost, old_vhost)) != ERROR_SUCCESS) {
  629 + return ret;
  630 + }
626 // TODO: suppor reload hls/forward/ffmpeg/http 631 // TODO: suppor reload hls/forward/ffmpeg/http
627 continue; 632 continue;
628 } 633 }
@@ -689,6 +694,102 @@ int SrsConfig::parse_options(int argc, char** argv) @@ -689,6 +694,102 @@ int SrsConfig::parse_options(int argc, char** argv)
689 return parse_file(config_file.c_str()); 694 return parse_file(config_file.c_str());
690 } 695 }
691 696
  697 +int SrsConfig::reload_ingest(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost)
  698 +{
  699 + int ret = ERROR_SUCCESS;
  700 +
  701 + std::vector<SrsConfDirective*> old_ingesters;
  702 + for (int i = 0; i < (int)old_vhost->directives.size(); i++) {
  703 + SrsConfDirective* conf = old_vhost->at(i);
  704 + if (conf->name == "ingest") {
  705 + old_ingesters.push_back(conf);
  706 + }
  707 + }
  708 +
  709 + std::vector<SrsConfDirective*> new_ingesters;
  710 + for (int i = 0; i < (int)new_vhost->directives.size(); i++) {
  711 + SrsConfDirective* conf = new_vhost->at(i);
  712 + if (conf->name == "ingest") {
  713 + new_ingesters.push_back(conf);
  714 + }
  715 + }
  716 +
  717 + std::vector<ISrsReloadHandler*>::iterator it;
  718 +
  719 + std::string vhost = new_vhost->arg0();
  720 +
  721 + // for removed ingesters, stop them.
  722 + for (int i = 0; i < (int)old_ingesters.size(); i++) {
  723 + SrsConfDirective* old_ingester = old_ingesters.at(i);
  724 + std::string ingest_id = old_ingester->arg0();
  725 +
  726 + // if ingester exists in new vhost, not removed, ignore.
  727 + if (new_vhost->get("ingest", ingest_id)) {
  728 + continue;
  729 + }
  730 +
  731 + // notice handler ingester removed.
  732 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  733 + ISrsReloadHandler* subscribe = *it;
  734 + if ((ret = subscribe->on_reload_ingest_removed(vhost, ingest_id)) != ERROR_SUCCESS) {
  735 + srs_error("vhost %s notify subscribes ingest=%s removed failed. ret=%d",
  736 + vhost.c_str(), ingest_id.c_str(), ret);
  737 + return ret;
  738 + }
  739 + }
  740 + srs_trace("vhost %s reload ingest=%s removed success.", vhost.c_str(), ingest_id.c_str());
  741 + }
  742 +
  743 + // for added ingesters, start them.
  744 + for (int i = 0; i < (int)new_ingesters.size(); i++) {
  745 + SrsConfDirective* new_ingester = new_ingesters.at(i);
  746 + std::string ingest_id = new_ingester->arg0();
  747 +
  748 + // if ingester exists in old vhost, not added, ignore.
  749 + if (old_vhost->get("ingest", ingest_id)) {
  750 + continue;
  751 + }
  752 +
  753 + // notice handler ingester removed.
  754 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  755 + ISrsReloadHandler* subscribe = *it;
  756 + if ((ret = subscribe->on_reload_ingest_added(vhost, ingest_id)) != ERROR_SUCCESS) {
  757 + srs_error("vhost %s notify subscribes ingest=%s added failed. ret=%d",
  758 + vhost.c_str(), ingest_id.c_str(), ret);
  759 + return ret;
  760 + }
  761 + }
  762 + srs_trace("vhost %s reload ingest=%s added success.", vhost.c_str(), ingest_id.c_str());
  763 + }
  764 +
  765 + // for updated ingesters, restart them.
  766 + for (int i = 0; i < (int)new_ingesters.size(); i++) {
  767 + SrsConfDirective* new_ingester = new_ingesters.at(i);
  768 + std::string ingest_id = new_ingester->arg0();
  769 + SrsConfDirective* old_ingester = old_vhost->get("ingest", ingest_id);
  770 + srs_assert(old_ingester);
  771 +
  772 + if (srs_directive_equals(new_ingester, old_ingester)) {
  773 + continue;
  774 + }
  775 +
  776 + // notice handler ingester removed.
  777 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  778 + ISrsReloadHandler* subscribe = *it;
  779 + if ((ret = subscribe->on_reload_ingest_updated(vhost, ingest_id)) != ERROR_SUCCESS) {
  780 + srs_error("vhost %s notify subscribes ingest=%s updated failed. ret=%d",
  781 + vhost.c_str(), ingest_id.c_str(), ret);
  782 + return ret;
  783 + }
  784 + }
  785 + srs_trace("vhost %s reload ingest=%s updated success.", vhost.c_str(), ingest_id.c_str());
  786 + }
  787 +
  788 + srs_warn("invalid reload ingest vhost=%s", vhost.c_str());
  789 +
  790 + return ret;
  791 +}
  792 +
692 int SrsConfig::parse_file(const char* filename) 793 int SrsConfig::parse_file(const char* filename)
693 { 794 {
694 int ret = ERROR_SUCCESS; 795 int ret = ERROR_SUCCESS;
@@ -1667,6 +1768,17 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& @@ -1667,6 +1768,17 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector<SrsConfDirective*>&
1667 return; 1768 return;
1668 } 1769 }
1669 1770
  1771 +SrsConfDirective* SrsConfig::get_ingest(std::string vhost, std::string ingest_id)
  1772 +{
  1773 + SrsConfDirective* conf = get_vhost(vhost);
  1774 + if (!conf) {
  1775 + return NULL;
  1776 + }
  1777 +
  1778 + conf = conf->get("ingest", ingest_id);
  1779 + return conf;
  1780 +}
  1781 +
1670 bool SrsConfig::get_ingest_enabled(SrsConfDirective* ingest) 1782 bool SrsConfig::get_ingest_enabled(SrsConfDirective* ingest)
1671 { 1783 {
1672 SrsConfDirective* conf = ingest->get("enable"); 1784 SrsConfDirective* conf = ingest->get("enable");
@@ -125,6 +125,7 @@ public: @@ -125,6 +125,7 @@ public:
125 public: 125 public:
126 virtual int parse_options(int argc, char** argv); 126 virtual int parse_options(int argc, char** argv);
127 private: 127 private:
  128 + virtual int reload_ingest(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost);
128 virtual int parse_file(const char* filename); 129 virtual int parse_file(const char* filename);
129 virtual int parse_argv(int& i, char** argv); 130 virtual int parse_argv(int& i, char** argv);
130 virtual void print_help(char** argv); 131 virtual void print_help(char** argv);
@@ -193,6 +194,7 @@ public: @@ -193,6 +194,7 @@ public:
193 // ingest section 194 // ingest section
194 public: 195 public:
195 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters); 196 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters);
  197 + virtual SrsConfDirective* get_ingest(std::string vhost, std::string ingest_id);
196 virtual bool get_ingest_enabled(SrsConfDirective* ingest); 198 virtual bool get_ingest_enabled(SrsConfDirective* ingest);
197 virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest); 199 virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest);
198 virtual std::string get_ingest_input_type(SrsConfDirective* ingest); 200 virtual std::string get_ingest_input_type(SrsConfDirective* ingest);
@@ -97,10 +97,6 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost) @@ -97,10 +97,6 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
97 // create engine 97 // create engine
98 for (int i = 0; i < (int)ingesters.size(); i++) { 98 for (int i = 0; i < (int)ingesters.size(); i++) {
99 SrsConfDirective* ingest = ingesters[i]; 99 SrsConfDirective* ingest = ingesters[i];
100 - if (!_srs_config->get_ingest_enabled(ingest)) {  
101 - continue;  
102 - }  
103 -  
104 if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) { 100 if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) {
105 return ret; 101 return ret;
106 } 102 }
@@ -112,7 +108,11 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost) @@ -112,7 +108,11 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
112 int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest) 108 int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest)
113 { 109 {
114 int ret = ERROR_SUCCESS; 110 int ret = ERROR_SUCCESS;
115 - 111 +
  112 + if (!_srs_config->get_ingest_enabled(ingest)) {
  113 + return ret;
  114 + }
  115 +
116 std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest); 116 std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
117 if (ffmpeg_bin.empty()) { 117 if (ffmpeg_bin.empty()) {
118 ret = ERROR_ENCODER_PARSE; 118 ret = ERROR_ENCODER_PARSE;
@@ -360,6 +360,8 @@ int SrsIngester::on_reload_vhost_added(string vhost) @@ -360,6 +360,8 @@ int SrsIngester::on_reload_vhost_added(string vhost)
360 if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) { 360 if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) {
361 return ret; 361 return ret;
362 } 362 }
  363 +
  364 + srs_trace("reload add vhost ingesters, vhost=%s", vhost.c_str());
363 365
364 return ret; 366 return ret;
365 } 367 }
@@ -393,4 +395,68 @@ int SrsIngester::on_reload_vhost_removed(string vhost) @@ -393,4 +395,68 @@ int SrsIngester::on_reload_vhost_removed(string vhost)
393 return ret; 395 return ret;
394 } 396 }
395 397
  398 +int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id)
  399 +{
  400 + int ret = ERROR_SUCCESS;
  401 +
  402 + std::vector<SrsIngesterFFMPEG*>::iterator it;
  403 +
  404 + for (it = ingesters.begin(); it != ingesters.end();) {
  405 + SrsIngesterFFMPEG* ingester = *it;
  406 +
  407 + if (ingester->vhost != vhost || ingester->id != ingest_id) {
  408 + ++it;
  409 + continue;
  410 + }
  411 +
  412 + // stop the ffmpeg and free it.
  413 + ingester->ffmpeg->stop();
  414 +
  415 + srs_trace("reload stop ingester, "
  416 + "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str());
  417 +
  418 + srs_freep(ingester);
  419 +
  420 + // remove the item from ingesters.
  421 + it = ingesters.erase(it);
  422 + }
  423 +
  424 + return ret;
  425 +}
  426 +
  427 +int SrsIngester::on_reload_ingest_added(string vhost, string ingest_id)
  428 +{
  429 + int ret = ERROR_SUCCESS;
  430 +
  431 + SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);
  432 + SrsConfDirective* _ingester = _srs_config->get_ingest(vhost, ingest_id);
  433 +
  434 + if ((ret = parse_engines(_vhost, _ingester)) != ERROR_SUCCESS) {
  435 + return ret;
  436 + }
  437 +
  438 + srs_trace("reload add ingester, "
  439 + "vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str());
  440 +
  441 + return ret;
  442 +}
  443 +
  444 +int SrsIngester::on_reload_ingest_updated(string vhost, string ingest_id)
  445 +{
  446 + int ret = ERROR_SUCCESS;
  447 +
  448 + if ((ret = on_reload_ingest_removed(vhost, ingest_id)) != ERROR_SUCCESS) {
  449 + return ret;
  450 + }
  451 +
  452 + if ((ret = on_reload_ingest_added(vhost, ingest_id)) != ERROR_SUCCESS) {
  453 + return ret;
  454 + }
  455 +
  456 + srs_trace("reload updated ingester, "
  457 + "vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str());
  458 +
  459 + return ret;
  460 +}
  461 +
396 #endif 462 #endif
@@ -86,6 +86,9 @@ private: @@ -86,6 +86,9 @@ private:
86 public: 86 public:
87 virtual int on_reload_vhost_removed(std::string vhost); 87 virtual int on_reload_vhost_removed(std::string vhost);
88 virtual int on_reload_vhost_added(std::string vhost); 88 virtual int on_reload_vhost_added(std::string vhost);
  89 + virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
  90 + virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
  91 + virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id);
89 }; 92 };
90 93
91 #endif 94 #endif
@@ -80,3 +80,18 @@ int ISrsReloadHandler::on_reload_transcode(string /*vhost*/) @@ -80,3 +80,18 @@ int ISrsReloadHandler::on_reload_transcode(string /*vhost*/)
80 return ERROR_SUCCESS; 80 return ERROR_SUCCESS;
81 } 81 }
82 82
  83 +int ISrsReloadHandler::on_reload_ingest_removed(string /*vhost*/, string /*ingest_id*/)
  84 +{
  85 + return ERROR_SUCCESS;
  86 +}
  87 +
  88 +int ISrsReloadHandler::on_reload_ingest_added(string /*vhost*/, string /*ingest_id*/)
  89 +{
  90 + return ERROR_SUCCESS;
  91 +}
  92 +
  93 +int ISrsReloadHandler::on_reload_ingest_updated(string /*vhost*/, string /*ingest_id*/)
  94 +{
  95 + return ERROR_SUCCESS;
  96 +}
  97 +
@@ -50,6 +50,9 @@ public: @@ -50,6 +50,9 @@ public:
50 virtual int on_reload_forward(std::string vhost); 50 virtual int on_reload_forward(std::string vhost);
51 virtual int on_reload_hls(std::string vhost); 51 virtual int on_reload_hls(std::string vhost);
52 virtual int on_reload_transcode(std::string vhost); 52 virtual int on_reload_transcode(std::string vhost);
  53 + virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
  54 + virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
  55 + virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id);
53 }; 56 };
54 57
55 #endif 58 #endif