winlin

for #319, reorder the ingesters.

@@ -120,6 +120,15 @@ SrsIngester::~SrsIngester() @@ -120,6 +120,15 @@ SrsIngester::~SrsIngester()
120 clear_engines(); 120 clear_engines();
121 } 121 }
122 122
  123 +void SrsIngester::dispose()
  124 +{
  125 + // first, use fast stop to notice all FFMPEG to quit gracefully.
  126 + fast_stop();
  127 +
  128 + // then, use stop to wait FFMPEG quit one by one and send SIGKILL if needed.
  129 + stop();
  130 +}
  131 +
123 int SrsIngester::start() 132 int SrsIngester::start()
124 { 133 {
125 int ret = ERROR_SUCCESS; 134 int ret = ERROR_SUCCESS;
@@ -143,85 +152,10 @@ int SrsIngester::start() @@ -143,85 +152,10 @@ int SrsIngester::start()
143 return ret; 152 return ret;
144 } 153 }
145 154
146 -int SrsIngester::parse_ingesters(SrsConfDirective* vhost)  
147 -{  
148 - int ret = ERROR_SUCCESS;  
149 -  
150 - std::vector<SrsConfDirective*> ingesters = _srs_config->get_ingesters(vhost->arg0());  
151 -  
152 - // create engine  
153 - for (int i = 0; i < (int)ingesters.size(); i++) {  
154 - SrsConfDirective* ingest = ingesters[i];  
155 - if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) {  
156 - return ret;  
157 - }  
158 - }  
159 -  
160 - return ret;  
161 -}  
162 -  
163 -int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest) 155 +void SrsIngester::stop()
164 { 156 {
165 - int ret = ERROR_SUCCESS;  
166 -  
167 - if (!_srs_config->get_ingest_enabled(ingest)) {  
168 - return ret;  
169 - }  
170 -  
171 - std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);  
172 - if (ffmpeg_bin.empty()) {  
173 - ret = ERROR_ENCODER_PARSE;  
174 - srs_trace("empty ffmpeg ret=%d", ret);  
175 - return ret;  
176 - }  
177 -  
178 - // get all engines.  
179 - std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest);  
180 -  
181 - // create ingesters without engines.  
182 - if (engines.empty()) {  
183 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
184 - if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {  
185 - srs_freep(ffmpeg);  
186 - if (ret != ERROR_ENCODER_LOOP) {  
187 - srs_error("invalid ingest engine. ret=%d", ret);  
188 - }  
189 - return ret;  
190 - }  
191 -  
192 - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();  
193 - if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {  
194 - srs_freep(ingester);  
195 - return ret;  
196 - }  
197 -  
198 - ingesters.push_back(ingester);  
199 - return ret;  
200 - }  
201 -  
202 - // create ingesters with engine  
203 - for (int i = 0; i < (int)engines.size(); i++) {  
204 - SrsConfDirective* engine = engines[i];  
205 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
206 - if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != ERROR_SUCCESS) {  
207 - srs_freep(ffmpeg);  
208 - if (ret != ERROR_ENCODER_LOOP) {  
209 - srs_error("invalid ingest engine: %s %s, ret=%d",  
210 - ingest->arg0().c_str(), engine->arg0().c_str(), ret);  
211 - }  
212 - return ret;  
213 - }  
214 -  
215 - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();  
216 - if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {  
217 - srs_freep(ingester);  
218 - return ret;  
219 - }  
220 -  
221 - ingesters.push_back(ingester);  
222 - }  
223 -  
224 - return ret; 157 + pthread->stop();
  158 + clear_engines();
225 } 159 }
226 160
227 void SrsIngester::fast_stop() 161 void SrsIngester::fast_stop()
@@ -237,21 +171,6 @@ void SrsIngester::fast_stop() @@ -237,21 +171,6 @@ void SrsIngester::fast_stop()
237 } 171 }
238 } 172 }
239 173
240 -void SrsIngester::dispose()  
241 -{  
242 - // first, use fast stop to notice all FFMPEG to quit gracefully.  
243 - fast_stop();  
244 -  
245 - // then, use stop to wait FFMPEG quit one by one and send SIGKILL if needed.  
246 - stop();  
247 -}  
248 -  
249 -void SrsIngester::stop()  
250 -{  
251 - pthread->stop();  
252 - clear_engines();  
253 -}  
254 -  
255 int SrsIngester::cycle() 174 int SrsIngester::cycle()
256 { 175 {
257 int ret = ERROR_SUCCESS; 176 int ret = ERROR_SUCCESS;
@@ -328,6 +247,87 @@ int SrsIngester::parse() @@ -328,6 +247,87 @@ int SrsIngester::parse()
328 return ret; 247 return ret;
329 } 248 }
330 249
  250 +int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
  251 +{
  252 + int ret = ERROR_SUCCESS;
  253 +
  254 + std::vector<SrsConfDirective*> ingesters = _srs_config->get_ingesters(vhost->arg0());
  255 +
  256 + // create engine
  257 + for (int i = 0; i < (int)ingesters.size(); i++) {
  258 + SrsConfDirective* ingest = ingesters[i];
  259 + if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) {
  260 + return ret;
  261 + }
  262 + }
  263 +
  264 + return ret;
  265 +}
  266 +
  267 +int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest)
  268 +{
  269 + int ret = ERROR_SUCCESS;
  270 +
  271 + if (!_srs_config->get_ingest_enabled(ingest)) {
  272 + return ret;
  273 + }
  274 +
  275 + std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
  276 + if (ffmpeg_bin.empty()) {
  277 + ret = ERROR_ENCODER_PARSE;
  278 + srs_trace("empty ffmpeg ret=%d", ret);
  279 + return ret;
  280 + }
  281 +
  282 + // get all engines.
  283 + std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest);
  284 +
  285 + // create ingesters without engines.
  286 + if (engines.empty()) {
  287 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  288 + if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {
  289 + srs_freep(ffmpeg);
  290 + if (ret != ERROR_ENCODER_LOOP) {
  291 + srs_error("invalid ingest engine. ret=%d", ret);
  292 + }
  293 + return ret;
  294 + }
  295 +
  296 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
  297 + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
  298 + srs_freep(ingester);
  299 + return ret;
  300 + }
  301 +
  302 + ingesters.push_back(ingester);
  303 + return ret;
  304 + }
  305 +
  306 + // create ingesters with engine
  307 + for (int i = 0; i < (int)engines.size(); i++) {
  308 + SrsConfDirective* engine = engines[i];
  309 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  310 + if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != ERROR_SUCCESS) {
  311 + srs_freep(ffmpeg);
  312 + if (ret != ERROR_ENCODER_LOOP) {
  313 + srs_error("invalid ingest engine: %s %s, ret=%d",
  314 + ingest->arg0().c_str(), engine->arg0().c_str(), ret);
  315 + }
  316 + return ret;
  317 + }
  318 +
  319 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
  320 + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
  321 + srs_freep(ingester);
  322 + return ret;
  323 + }
  324 +
  325 + ingesters.push_back(ingester);
  326 + }
  327 +
  328 + return ret;
  329 +}
  330 +
331 int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine) 331 int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine)
332 { 332 {
333 int ret = ERROR_SUCCESS; 333 int ret = ERROR_SUCCESS;
@@ -468,20 +468,6 @@ void SrsIngester::show_ingest_log_message() @@ -468,20 +468,6 @@ void SrsIngester::show_ingest_log_message()
468 } 468 }
469 } 469 }
470 470
471 -int SrsIngester::on_reload_vhost_added(string vhost)  
472 -{  
473 - int ret = ERROR_SUCCESS;  
474 -  
475 - SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);  
476 - if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) {  
477 - return ret;  
478 - }  
479 -  
480 - srs_trace("reload add vhost ingesters, vhost=%s", vhost.c_str());  
481 -  
482 - return ret;  
483 -}  
484 -  
485 int SrsIngester::on_reload_vhost_removed(string vhost) 471 int SrsIngester::on_reload_vhost_removed(string vhost)
486 { 472 {
487 int ret = ERROR_SUCCESS; 473 int ret = ERROR_SUCCESS;
@@ -510,6 +496,20 @@ int SrsIngester::on_reload_vhost_removed(string vhost) @@ -510,6 +496,20 @@ int SrsIngester::on_reload_vhost_removed(string vhost)
510 return ret; 496 return ret;
511 } 497 }
512 498
  499 +int SrsIngester::on_reload_vhost_added(string vhost)
  500 +{
  501 + int ret = ERROR_SUCCESS;
  502 +
  503 + SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);
  504 + if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) {
  505 + return ret;
  506 + }
  507 +
  508 + srs_trace("reload add vhost ingesters, vhost=%s", vhost.c_str());
  509 +
  510 + return ret;
  511 +}
  512 +
513 int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id) 513 int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id)
514 { 514 {
515 int ret = ERROR_SUCCESS; 515 int ret = ERROR_SUCCESS;