winlin

for #319, when reload the listen, restart all ingesters.

@@ -106,6 +106,8 @@ SrsIngester::SrsIngester() @@ -106,6 +106,8 @@ SrsIngester::SrsIngester()
106 { 106 {
107 _srs_config->subscribe(this); 107 _srs_config->subscribe(this);
108 108
  109 + expired = false;
  110 +
109 pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); 111 pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US);
110 pprint = SrsPithyPrint::create_ingester(); 112 pprint = SrsPithyPrint::create_ingester();
111 } 113 }
@@ -222,9 +224,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -222,9 +224,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
222 return ret; 224 return ret;
223 } 225 }
224 226
225 -void SrsIngester::dispose() 227 +void SrsIngester::fast_stop()
226 { 228 {
227 - // first, use fast stop to notice all FFMPEG to quit gracefully.  
228 std::vector<SrsIngesterFFMPEG*>::iterator it; 229 std::vector<SrsIngesterFFMPEG*>::iterator it;
229 for (it = ingesters.begin(); it != ingesters.end(); ++it) { 230 for (it = ingesters.begin(); it != ingesters.end(); ++it) {
230 SrsIngesterFFMPEG* ingester = *it; 231 SrsIngesterFFMPEG* ingester = *it;
@@ -234,6 +235,12 @@ void SrsIngester::dispose() @@ -234,6 +235,12 @@ void SrsIngester::dispose()
234 if (!ingesters.empty()) { 235 if (!ingesters.empty()) {
235 srs_trace("fast stop all ingesters ok."); 236 srs_trace("fast stop all ingesters ok.");
236 } 237 }
  238 +}
  239 +
  240 +void SrsIngester::dispose()
  241 +{
  242 + // first, use fast stop to notice all FFMPEG to quit gracefully.
  243 + fast_stop();
237 244
238 // then, use stop to wait FFMPEG quit one by one and send SIGKILL if needed. 245 // then, use stop to wait FFMPEG quit one by one and send SIGKILL if needed.
239 stop(); 246 stop();
@@ -249,6 +256,21 @@ int SrsIngester::cycle() @@ -249,6 +256,21 @@ int SrsIngester::cycle()
249 { 256 {
250 int ret = ERROR_SUCCESS; 257 int ret = ERROR_SUCCESS;
251 258
  259 + // when expired, restart all ingesters.
  260 + if (expired) {
  261 + expired = false;
  262 +
  263 + // stop current ingesters.
  264 + fast_stop();
  265 + clear_engines();
  266 +
  267 + // re-prase the ingesters.
  268 + if ((ret = parse()) != ERROR_SUCCESS) {
  269 + return ret;
  270 + }
  271 + }
  272 +
  273 + // cycle exists ingesters.
252 std::vector<SrsIngesterFFMPEG*>::iterator it; 274 std::vector<SrsIngesterFFMPEG*>::iterator it;
253 for (it = ingesters.begin(); it != ingesters.end(); ++it) { 275 for (it = ingesters.begin(); it != ingesters.end(); ++it) {
254 SrsIngesterFFMPEG* ingester = *it; 276 SrsIngesterFFMPEG* ingester = *it;
@@ -551,5 +573,11 @@ int SrsIngester::on_reload_ingest_updated(string vhost, string ingest_id) @@ -551,5 +573,11 @@ int SrsIngester::on_reload_ingest_updated(string vhost, string ingest_id)
551 return ret; 573 return ret;
552 } 574 }
553 575
  576 +int SrsIngester::on_reload_listen()
  577 +{
  578 + expired = true;
  579 + return ERROR_SUCCESS;
  580 +}
  581 +
554 #endif 582 #endif
555 583
@@ -81,6 +81,10 @@ private: @@ -81,6 +81,10 @@ private:
81 private: 81 private:
82 SrsReusableThread* pthread; 82 SrsReusableThread* pthread;
83 SrsPithyPrint* pprint; 83 SrsPithyPrint* pprint;
  84 + // whether the ingesters are expired,
  85 + // for example, the listen port changed,
  86 + // all ingesters must be restart.
  87 + bool expired;
84 public: 88 public:
85 SrsIngester(); 89 SrsIngester();
86 virtual ~SrsIngester(); 90 virtual ~SrsIngester();
@@ -89,6 +93,8 @@ public: @@ -89,6 +93,8 @@ public:
89 public: 93 public:
90 virtual int start(); 94 virtual int start();
91 virtual void stop(); 95 virtual void stop();
  96 +private:
  97 + virtual void fast_stop();
92 // interface ISrsReusableThreadHandler. 98 // interface ISrsReusableThreadHandler.
93 public: 99 public:
94 virtual int cycle(); 100 virtual int cycle();
@@ -107,6 +113,7 @@ public: @@ -107,6 +113,7 @@ public:
107 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); 113 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
108 virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); 114 virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
109 virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id); 115 virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id);
  116 + virtual int on_reload_listen();
110 }; 117 };
111 118
112 #endif 119 #endif