winlin

support ingest reload: add new vhost with ingester

@@ -493,8 +493,36 @@ int SrsConfig::reload() @@ -493,8 +493,36 @@ int SrsConfig::reload()
493 srs_trace("reload pithy_print success."); 493 srs_trace("reload pithy_print success.");
494 } 494 }
495 495
496 - // merge config: vhost added, directly supported. 496 + // merge config: vhost added
  497 + for (int i = 0; i < (int)root->directives.size(); i++) {
  498 + // ingest need to start if specified.
  499 + // other features, directly supported.
  500 + SrsConfDirective* new_vhost = root->at(i);
  501 +
  502 + // only process vhost directives.
  503 + if (new_vhost->name != "vhost") {
  504 + continue;
  505 + }
  506 +
  507 + std::string vhost = new_vhost->arg0();
  508 +
  509 + // not new added vhost, ignore.
  510 + if (old_root->get("vhost", vhost)) {
  511 + continue;
  512 + }
497 513
  514 + srs_trace("vhost %s added, reload it.", vhost.c_str());
  515 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  516 + ISrsReloadHandler* subscribe = *it;
  517 + if ((ret = subscribe->on_reload_vhost_added(vhost)) != ERROR_SUCCESS) {
  518 + srs_error("notify subscribes pithy_print remove "
  519 + "vhost %s failed. ret=%d", vhost.c_str(), ret);
  520 + return ret;
  521 + }
  522 + }
  523 + srs_trace("reload new vhost %s success.", vhost.c_str());
  524 + }
  525 +
498 // merge config: vhost removed/disabled/modified. 526 // merge config: vhost removed/disabled/modified.
499 for (int i = 0; i < (int)old_root->directives.size(); i++) { 527 for (int i = 0; i < (int)old_root->directives.size(); i++) {
500 SrsConfDirective* old_vhost = old_root->at(i); 528 SrsConfDirective* old_vhost = old_root->at(i);
@@ -51,13 +51,16 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG() @@ -51,13 +51,16 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
51 51
52 SrsIngester::SrsIngester() 52 SrsIngester::SrsIngester()
53 { 53 {
54 - // TODO: FIXME: support reload. 54 + _srs_config->subscribe(this);
  55 +
55 pthread = new SrsThread(this, SRS_INGESTER_SLEEP_US); 56 pthread = new SrsThread(this, SRS_INGESTER_SLEEP_US);
56 pithy_print = new SrsPithyPrint(SRS_STAGE_INGESTER); 57 pithy_print = new SrsPithyPrint(SRS_STAGE_INGESTER);
57 } 58 }
58 59
59 SrsIngester::~SrsIngester() 60 SrsIngester::~SrsIngester()
60 { 61 {
  62 + _srs_config->unsubscribe(this);
  63 +
61 srs_freep(pthread); 64 srs_freep(pthread);
62 clear_engines(); 65 clear_engines();
63 } 66 }
@@ -72,10 +75,8 @@ int SrsIngester::start() @@ -72,10 +75,8 @@ int SrsIngester::start()
72 return ret; 75 return ret;
73 } 76 }
74 77
75 - // return for error or no engine.  
76 - if (ingesters.empty()) {  
77 - return ret;  
78 - } 78 + // even no ingesters, we must also start it,
  79 + // for the reload may add more ingesters.
79 80
80 // start thread to run all encoding engines. 81 // start thread to run all encoding engines.
81 if ((ret = pthread->start()) != ERROR_SUCCESS) { 82 if ((ret = pthread->start()) != ERROR_SUCCESS) {
@@ -353,4 +354,16 @@ void SrsIngester::ingester() @@ -353,4 +354,16 @@ void SrsIngester::ingester()
353 } 354 }
354 } 355 }
355 356
  357 +int SrsIngester::on_reload_vhost_added(string vhost)
  358 +{
  359 + int ret = ERROR_SUCCESS;
  360 +
  361 + SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);
  362 + if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) {
  363 + return ret;
  364 + }
  365 +
  366 + return ret;
  367 +}
  368 +
356 #endif 369 #endif
@@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #include <vector> 34 #include <vector>
35 35
36 #include <srs_app_thread.hpp> 36 #include <srs_app_thread.hpp>
  37 +#include <srs_app_reload.hpp>
37 38
38 class SrsFFMPEG; 39 class SrsFFMPEG;
39 class SrsConfDirective; 40 class SrsConfDirective;
@@ -57,7 +58,7 @@ struct SrsIngesterFFMPEG @@ -57,7 +58,7 @@ struct SrsIngesterFFMPEG
57 * encode with FFMPEG(optional), 58 * encode with FFMPEG(optional),
58 * push to SRS(or any RTMP server) over RTMP. 59 * push to SRS(or any RTMP server) over RTMP.
59 */ 60 */
60 -class SrsIngester : public ISrsThreadHandler 61 +class SrsIngester : public ISrsThreadHandler, public ISrsReloadHandler
61 { 62 {
62 private: 63 private:
63 std::string input_stream_name; 64 std::string input_stream_name;
@@ -82,6 +83,9 @@ private: @@ -82,6 +83,9 @@ private:
82 virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest); 83 virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest);
83 virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine); 84 virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine);
84 virtual void ingester(); 85 virtual void ingester();
  86 +// interface ISrsReloadHandler.
  87 +public:
  88 + virtual int on_reload_vhost_added(std::string vhost);
85 }; 89 };
86 90
87 #endif 91 #endif
@@ -45,6 +45,11 @@ int ISrsReloadHandler::on_reload_pithy_print() @@ -45,6 +45,11 @@ int ISrsReloadHandler::on_reload_pithy_print()
45 return ERROR_SUCCESS; 45 return ERROR_SUCCESS;
46 } 46 }
47 47
  48 +int ISrsReloadHandler::on_reload_vhost_added(string /*vhost*/)
  49 +{
  50 + return ERROR_SUCCESS;
  51 +}
  52 +
48 int ISrsReloadHandler::on_reload_vhost_removed(string /*vhost*/) 53 int ISrsReloadHandler::on_reload_vhost_removed(string /*vhost*/)
49 { 54 {
50 return ERROR_SUCCESS; 55 return ERROR_SUCCESS;
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 33
34 /** 34 /**
35 * the handler for config reload. 35 * the handler for config reload.
  36 +* when reload callback, the config is updated yet.
36 */ 37 */
37 class ISrsReloadHandler 38 class ISrsReloadHandler
38 { 39 {
@@ -42,6 +43,7 @@ public: @@ -42,6 +43,7 @@ public:
42 public: 43 public:
43 virtual int on_reload_listen(); 44 virtual int on_reload_listen();
44 virtual int on_reload_pithy_print(); 45 virtual int on_reload_pithy_print();
  46 + virtual int on_reload_vhost_added(std::string vhost);
45 virtual int on_reload_vhost_removed(std::string vhost); 47 virtual int on_reload_vhost_removed(std::string vhost);
46 virtual int on_reload_gop_cache(std::string vhost); 48 virtual int on_reload_gop_cache(std::string vhost);
47 virtual int on_reload_queue_length(std::string vhost); 49 virtual int on_reload_queue_length(std::string vhost);