winlin

fix the ffmpeg kill bug, wait until ffmpeg quit.

1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_encoder.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <unistd.h>  
28 -#include <sys/wait.h>  
29 -#include <fcntl.h>  
30 -#include <signal.h>  
31 -#include <sys/types.h>  
32 -  
33 -#include <algorithm>  
34 -  
35 -#include <srs_core_error.hpp>  
36 -#include <srs_core_log.hpp>  
37 -#include <srs_core_config.hpp>  
38 -#include <srs_core_rtmp.hpp>  
39 -#include <srs_core_pithy_print.hpp>  
40 -  
41 -#ifdef SRS_FFMPEG  
42 -  
43 -#define SRS_ENCODER_SLEEP_MS 2000  
44 -  
45 -#define SRS_ENCODER_VCODEC "libx264"  
46 -#define SRS_ENCODER_ACODEC "libaacplus"  
47 -  
48 -// for encoder to detect the dead loop  
49 -static std::vector<std::string> _transcoded_url;  
50 -  
51 -SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)  
52 -{  
53 - started = false;  
54 - pid = -1;  
55 - ffmpeg = ffmpeg_bin;  
56 -  
57 - vbitrate = 0;  
58 - vfps = 0;  
59 - vwidth = 0;  
60 - vheight = 0;  
61 - vthreads = 0;  
62 - abitrate = 0;  
63 - asample_rate = 0;  
64 - achannels = 0;  
65 -  
66 - log_fd = -1;  
67 -}  
68 -  
69 -SrsFFMPEG::~SrsFFMPEG()  
70 -{  
71 - stop();  
72 -}  
73 -  
74 -int SrsFFMPEG::initialize(SrsRequest* req, SrsConfDirective* engine)  
75 -{  
76 - int ret = ERROR_SUCCESS;  
77 -  
78 - config->get_engine_vfilter(engine, vfilter);  
79 - vcodec = config->get_engine_vcodec(engine);  
80 - vbitrate = config->get_engine_vbitrate(engine);  
81 - vfps = config->get_engine_vfps(engine);  
82 - vwidth = config->get_engine_vwidth(engine);  
83 - vheight = config->get_engine_vheight(engine);  
84 - vthreads = config->get_engine_vthreads(engine);  
85 - vprofile = config->get_engine_vprofile(engine);  
86 - vpreset = config->get_engine_vpreset(engine);  
87 - config->get_engine_vparams(engine, vparams);  
88 - acodec = config->get_engine_acodec(engine);  
89 - abitrate = config->get_engine_abitrate(engine);  
90 - asample_rate = config->get_engine_asample_rate(engine);  
91 - achannels = config->get_engine_achannels(engine);  
92 - config->get_engine_aparams(engine, aparams);  
93 - output = config->get_engine_output(engine);  
94 -  
95 - // ensure the size is even.  
96 - vwidth -= vwidth % 2;  
97 - vheight -= vheight % 2;  
98 -  
99 - // input stream, from local.  
100 - // ie. rtmp://127.0.0.1:1935/live/livestream  
101 - input = "rtmp://127.0.0.1:";  
102 - input += req->port;  
103 - input += "/";  
104 - input += req->app;  
105 - input += "?vhost=";  
106 - input += req->vhost;  
107 - input += "/";  
108 - input += req->stream;  
109 -  
110 - // output stream, to other/self server  
111 - // ie. rtmp://127.0.0.1:1935/live/livestream_sd  
112 - output = srs_replace(output, "[vhost]", req->vhost);  
113 - output = srs_replace(output, "[port]", req->port);  
114 - output = srs_replace(output, "[app]", req->app);  
115 - output = srs_replace(output, "[stream]", req->stream);  
116 - output = srs_replace(output, "[engine]", engine->arg0());  
117 -  
118 - // write ffmpeg info to log file.  
119 - log_file = config->get_log_dir();  
120 - log_file += "/";  
121 - log_file += "encoder";  
122 - log_file += "-";  
123 - log_file += req->vhost;  
124 - log_file += "-";  
125 - log_file += req->app;  
126 - log_file += "-";  
127 - log_file += req->stream;  
128 - log_file += ".log";  
129 -  
130 - // important: loop check, donot transcode again.  
131 - std::vector<std::string>::iterator it;  
132 - it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);  
133 - if (it != _transcoded_url.end()) {  
134 - ret = ERROR_ENCODER_LOOP;  
135 - srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",  
136 - input.c_str(), output.c_str(), ret);  
137 - return ret;  
138 - }  
139 - _transcoded_url.push_back(output);  
140 -  
141 - if (vcodec != SRS_ENCODER_VCODEC) {  
142 - ret = ERROR_ENCODER_VCODEC;  
143 - srs_error("invalid vcodec, must be %s, actual %s, ret=%d",  
144 - SRS_ENCODER_VCODEC, vcodec.c_str(), ret);  
145 - return ret;  
146 - }  
147 - if (vbitrate <= 0) {  
148 - ret = ERROR_ENCODER_VBITRATE;  
149 - srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);  
150 - return ret;  
151 - }  
152 - if (vfps <= 0) {  
153 - ret = ERROR_ENCODER_VFPS;  
154 - srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);  
155 - return ret;  
156 - }  
157 - if (vwidth <= 0) {  
158 - ret = ERROR_ENCODER_VWIDTH;  
159 - srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);  
160 - return ret;  
161 - }  
162 - if (vheight <= 0) {  
163 - ret = ERROR_ENCODER_VHEIGHT;  
164 - srs_error("invalid vheight: %d, ret=%d", vheight, ret);  
165 - return ret;  
166 - }  
167 - if (vthreads < 0) {  
168 - ret = ERROR_ENCODER_VTHREADS;  
169 - srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);  
170 - return ret;  
171 - }  
172 - if (vprofile.empty()) {  
173 - ret = ERROR_ENCODER_VPROFILE;  
174 - srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);  
175 - return ret;  
176 - }  
177 - if (vpreset.empty()) {  
178 - ret = ERROR_ENCODER_VPRESET;  
179 - srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);  
180 - return ret;  
181 - }  
182 - if (acodec != SRS_ENCODER_ACODEC) {  
183 - ret = ERROR_ENCODER_ACODEC;  
184 - srs_error("invalid acodec, must be %s, actual %s, ret=%d",  
185 - SRS_ENCODER_ACODEC, acodec.c_str(), ret);  
186 - return ret;  
187 - }  
188 - if (abitrate <= 0) {  
189 - ret = ERROR_ENCODER_ABITRATE;  
190 - srs_error("invalid abitrate: %d, ret=%d",  
191 - abitrate, ret);  
192 - return ret;  
193 - }  
194 - if (asample_rate <= 0) {  
195 - ret = ERROR_ENCODER_ASAMPLE_RATE;  
196 - srs_error("invalid sample rate: %d, ret=%d",  
197 - asample_rate, ret);  
198 - return ret;  
199 - }  
200 - if (achannels != 1 && achannels != 2) {  
201 - ret = ERROR_ENCODER_ACHANNELS;  
202 - srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",  
203 - achannels, ret);  
204 - return ret;  
205 - }  
206 - if (output.empty()) {  
207 - ret = ERROR_ENCODER_OUTPUT;  
208 - srs_error("invalid empty output, ret=%d", ret);  
209 - return ret;  
210 - }  
211 -  
212 - return ret;  
213 -}  
214 -  
215 -int SrsFFMPEG::start()  
216 -{  
217 - int ret = ERROR_SUCCESS;  
218 -  
219 - if (started) {  
220 - return ret;  
221 - }  
222 -  
223 - // prepare exec params  
224 - char tmp[256];  
225 - std::vector<std::string> params;  
226 -  
227 - // argv[0], set to ffmpeg bin.  
228 - // The execv() and execvp() functions ....  
229 - // The first argument, by convention, should point to  
230 - // the filename associated with the file being executed.  
231 - params.push_back(ffmpeg);  
232 -  
233 - // input.  
234 - params.push_back("-f");  
235 - params.push_back("flv");  
236 -  
237 - params.push_back("-i");  
238 - params.push_back(input);  
239 -  
240 - // build the filter  
241 - if (!vfilter.empty()) {  
242 - std::vector<std::string>::iterator it;  
243 - for (it = vfilter.begin(); it != vfilter.end(); ++it) {  
244 - std::string p = *it;  
245 - if (!p.empty()) {  
246 - params.push_back(p);  
247 - }  
248 - }  
249 - }  
250 -  
251 - // video specified.  
252 - params.push_back("-vcodec");  
253 - params.push_back(vcodec);  
254 -  
255 - params.push_back("-b:v");  
256 - snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);  
257 - params.push_back(tmp);  
258 -  
259 - params.push_back("-r");  
260 - snprintf(tmp, sizeof(tmp), "%.2f", vfps);  
261 - params.push_back(tmp);  
262 -  
263 - params.push_back("-s");  
264 - snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);  
265 - params.push_back(tmp);  
266 -  
267 - // TODO: add aspect if needed.  
268 - params.push_back("-aspect");  
269 - snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);  
270 - params.push_back(tmp);  
271 -  
272 - params.push_back("-threads");  
273 - snprintf(tmp, sizeof(tmp), "%d", vthreads);  
274 - params.push_back(tmp);  
275 -  
276 - params.push_back("-profile:v");  
277 - params.push_back(vprofile);  
278 -  
279 - params.push_back("-preset");  
280 - params.push_back(vpreset);  
281 -  
282 - // vparams  
283 - if (!vparams.empty()) {  
284 - std::vector<std::string>::iterator it;  
285 - for (it = vparams.begin(); it != vparams.end(); ++it) {  
286 - std::string p = *it;  
287 - if (!p.empty()) {  
288 - params.push_back(p);  
289 - }  
290 - }  
291 - }  
292 -  
293 - // audio specified.  
294 - params.push_back("-acodec");  
295 - params.push_back(acodec);  
296 -  
297 - params.push_back("-b:a");  
298 - snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);  
299 - params.push_back(tmp);  
300 -  
301 - params.push_back("-ar");  
302 - snprintf(tmp, sizeof(tmp), "%d", asample_rate);  
303 - params.push_back(tmp);  
304 -  
305 - params.push_back("-ac");  
306 - snprintf(tmp, sizeof(tmp), "%d", achannels);  
307 - params.push_back(tmp);  
308 -  
309 - // aparams  
310 - if (!aparams.empty()) {  
311 - std::vector<std::string>::iterator it;  
312 - for (it = aparams.begin(); it != aparams.end(); ++it) {  
313 - std::string p = *it;  
314 - if (!p.empty()) {  
315 - params.push_back(p);  
316 - }  
317 - }  
318 - }  
319 -  
320 - // output  
321 - params.push_back("-f");  
322 - params.push_back("flv");  
323 -  
324 - params.push_back("-y");  
325 - params.push_back(output);  
326 -  
327 - if (true) {  
328 - int pparam_size = 8 * 1024;  
329 - char* pparam = new char[pparam_size];  
330 - char* p = pparam;  
331 - char* last = pparam + pparam_size;  
332 - for (int i = 0; i < (int)params.size(); i++) {  
333 - std::string ffp = params[i];  
334 - snprintf(p, last - p, "%s ", ffp.c_str());  
335 - p += ffp.length() + 1;  
336 - }  
337 - srs_trace("start transcoder, log: %s, params: %s",  
338 - log_file.c_str(), pparam);  
339 - srs_freepa(pparam);  
340 - }  
341 -  
342 - // TODO: fork or vfork?  
343 - if ((pid = fork()) < 0) {  
344 - ret = ERROR_ENCODER_FORK;  
345 - srs_error("vfork process failed. ret=%d", ret);  
346 - return ret;  
347 - }  
348 -  
349 - // child process: ffmpeg encoder engine.  
350 - if (pid == 0) {  
351 - // redirect logs to file.  
352 - int flags = O_CREAT|O_WRONLY|O_APPEND;  
353 - mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;  
354 - if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {  
355 - ret = ERROR_ENCODER_OPEN;  
356 - srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret);  
357 - return ret;  
358 - }  
359 - if (dup2(log_fd, STDOUT_FILENO) < 0) {  
360 - ret = ERROR_ENCODER_DUP2;  
361 - srs_error("dup2 encoder file failed. ret=%d", ret);  
362 - return ret;  
363 - }  
364 - if (dup2(log_fd, STDERR_FILENO) < 0) {  
365 - ret = ERROR_ENCODER_DUP2;  
366 - srs_error("dup2 encoder file failed. ret=%d", ret);  
367 - return ret;  
368 - }  
369 - // close other fds  
370 - // TODO: do in right way.  
371 - for (int i = 3; i < 1024; i++) {  
372 - ::close(i);  
373 - }  
374 -  
375 - // memory leak in child process, it's ok.  
376 - char** charpv_params = new char*[params.size() + 1];  
377 - for (int i = 0; i < (int)params.size(); i++) {  
378 - std::string p = params[i];  
379 - charpv_params[i] = (char*)p.c_str();  
380 - }  
381 - // EOF: NULL  
382 - charpv_params[params.size()] = NULL;  
383 -  
384 - // TODO: execv or execvp  
385 - ret = execv(ffmpeg.c_str(), charpv_params);  
386 - if (ret < 0) {  
387 - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",  
388 - errno, strerror(errno));  
389 - }  
390 - exit(ret);  
391 - }  
392 -  
393 - // parent.  
394 - if (pid > 0) {  
395 - started = true;  
396 - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);  
397 - return ret;  
398 - }  
399 -  
400 - return ret;  
401 -}  
402 -  
403 -int SrsFFMPEG::cycle()  
404 -{  
405 - int ret = ERROR_SUCCESS;  
406 -  
407 - if (!started) {  
408 - return ret;  
409 - }  
410 -  
411 - int status = 0;  
412 - pid_t p = waitpid(pid, &status, WNOHANG);  
413 -  
414 - if (p < 0) {  
415 - ret = ERROR_SYSTEM_WAITPID;  
416 - srs_error("transcode waitpid failed, pid=%d, ret=%d", pid, ret);  
417 - return ret;  
418 - }  
419 -  
420 - if (p == 0) {  
421 - srs_info("transcode process pid=%d is running.", pid);  
422 - return ret;  
423 - }  
424 -  
425 - srs_trace("transcode process pid=%d terminate, restart it.", pid);  
426 - started = false;  
427 -  
428 - return ret;  
429 -}  
430 -  
431 -void SrsFFMPEG::stop()  
432 -{  
433 - if (log_fd > 0) {  
434 - ::close(log_fd);  
435 - log_fd = -1;  
436 - }  
437 -  
438 - if (!started) {  
439 - return;  
440 - }  
441 -  
442 - // kill the ffmpeg,  
443 - // when rewind, upstream will stop publish(unpublish),  
444 - // unpublish event will stop all ffmpeg encoders,  
445 - // then publish will start all ffmpeg encoders.  
446 - if (pid > 0) {  
447 - if (kill(pid, SIGKILL) < 0) {  
448 - srs_warn("kill the encoder failed, ignored. pid=%d", pid);  
449 - }  
450 -  
451 - int status = 0;  
452 - if (waitpid(pid, &status, WNOHANG) < 0) {  
453 - srs_warn("wait the encoder quit failed, ignored. pid=%d", pid);  
454 - }  
455 -  
456 - srs_trace("stop the encoder success. pid=%d", pid);  
457 - pid = -1;  
458 - }  
459 -  
460 - std::vector<std::string>::iterator it;  
461 - it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output);  
462 - if (it != _transcoded_url.end()) {  
463 - _transcoded_url.erase(it);  
464 - }  
465 -}  
466 -  
467 -SrsEncoder::SrsEncoder()  
468 -{  
469 - tid = NULL;  
470 - loop = false;  
471 -}  
472 -  
473 -SrsEncoder::~SrsEncoder()  
474 -{  
475 - on_unpublish();  
476 -}  
477 -  
478 -int SrsEncoder::parse_scope_engines(SrsRequest* req)  
479 -{  
480 - int ret = ERROR_SUCCESS;  
481 -  
482 - // parse all transcode engines.  
483 - SrsConfDirective* conf = NULL;  
484 -  
485 - // parse vhost scope engines  
486 - std::string scope = "";  
487 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
488 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
489 - srs_error("parse vhost scope=%s transcode engines failed. "  
490 - "ret=%d", scope.c_str(), ret);  
491 - return ret;  
492 - }  
493 - }  
494 - // parse app scope engines  
495 - scope = req->app;  
496 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
497 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
498 - srs_error("parse app scope=%s transcode engines failed. "  
499 - "ret=%d", scope.c_str(), ret);  
500 - return ret;  
501 - }  
502 - }  
503 - // parse stream scope engines  
504 - scope += "/";  
505 - scope += req->stream;  
506 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
507 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
508 - srs_error("parse stream scope=%s transcode engines failed. "  
509 - "ret=%d", scope.c_str(), ret);  
510 - return ret;  
511 - }  
512 - }  
513 -  
514 - return ret;  
515 -}  
516 -  
517 -int SrsEncoder::on_publish(SrsRequest* req)  
518 -{  
519 - int ret = ERROR_SUCCESS;  
520 -  
521 - ret = parse_scope_engines(req);  
522 -  
523 - // ignore the loop encoder  
524 - if (ret == ERROR_ENCODER_LOOP) {  
525 - ret = ERROR_SUCCESS;  
526 - }  
527 -  
528 - // return for error or no engine.  
529 - if (ret != ERROR_SUCCESS || ffmpegs.empty()) {  
530 - return ret;  
531 - }  
532 -  
533 - // start thread to run all encoding engines.  
534 - srs_assert(!tid);  
535 - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {  
536 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
537 - srs_error("st_thread_create failed. ret=%d", ret);  
538 - return ret;  
539 - }  
540 -  
541 - return ret;  
542 -}  
543 -  
544 -void SrsEncoder::on_unpublish()  
545 -{  
546 - if (tid) {  
547 - loop = false;  
548 - st_thread_interrupt(tid);  
549 - st_thread_join(tid, NULL);  
550 - tid = NULL;  
551 - }  
552 -  
553 - clear_engines();  
554 -}  
555 -  
556 -void SrsEncoder::clear_engines()  
557 -{  
558 - std::vector<SrsFFMPEG*>::iterator it;  
559 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
560 - SrsFFMPEG* ffmpeg = *it;  
561 - srs_freep(ffmpeg);  
562 - }  
563 - ffmpegs.clear();  
564 -}  
565 -  
566 -SrsFFMPEG* SrsEncoder::at(int index)  
567 -{  
568 - return ffmpegs[index];  
569 -}  
570 -  
571 -int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)  
572 -{  
573 - int ret = ERROR_SUCCESS;  
574 -  
575 - srs_assert(conf);  
576 -  
577 - // enabled  
578 - if (!config->get_transcode_enabled(conf)) {  
579 - srs_trace("ignore the disabled transcode: %s",  
580 - conf->arg0().c_str());  
581 - return ret;  
582 - }  
583 -  
584 - // ffmpeg  
585 - std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);  
586 - if (ffmpeg_bin.empty()) {  
587 - srs_trace("ignore the empty ffmpeg transcode: %s",  
588 - conf->arg0().c_str());  
589 - return ret;  
590 - }  
591 -  
592 - // get all engines.  
593 - std::vector<SrsConfDirective*> engines;  
594 - config->get_transcode_engines(conf, engines);  
595 - if (engines.empty()) {  
596 - srs_trace("ignore the empty transcode engine: %s",  
597 - conf->arg0().c_str());  
598 - return ret;  
599 - }  
600 -  
601 - // create engine  
602 - for (int i = 0; i < (int)engines.size(); i++) {  
603 - SrsConfDirective* engine = engines[i];  
604 - if (!config->get_engine_enabled(engine)) {  
605 - srs_trace("ignore the diabled transcode engine: %s %s",  
606 - conf->arg0().c_str(), engine->arg0().c_str());  
607 - continue;  
608 - }  
609 -  
610 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
611 -  
612 - if ((ret = ffmpeg->initialize(req, engine)) != ERROR_SUCCESS) {  
613 - srs_freep(ffmpeg);  
614 -  
615 - // if got a loop, donot transcode the whole stream.  
616 - if (ret == ERROR_ENCODER_LOOP) {  
617 - clear_engines();  
618 - break;  
619 - }  
620 -  
621 - srs_error("invalid transcode engine: %s %s",  
622 - conf->arg0().c_str(), engine->arg0().c_str());  
623 - return ret;  
624 - }  
625 -  
626 - ffmpegs.push_back(ffmpeg);  
627 - }  
628 -  
629 - return ret;  
630 -}  
631 -  
632 -int SrsEncoder::cycle()  
633 -{  
634 - int ret = ERROR_SUCCESS;  
635 -  
636 - std::vector<SrsFFMPEG*>::iterator it;  
637 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
638 - SrsFFMPEG* ffmpeg = *it;  
639 -  
640 - // start all ffmpegs.  
641 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {  
642 - srs_error("ffmpeg start failed. ret=%d", ret);  
643 - return ret;  
644 - }  
645 -  
646 - // check ffmpeg status.  
647 - if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {  
648 - srs_error("ffmpeg cycle failed. ret=%d", ret);  
649 - return ret;  
650 - }  
651 - }  
652 -  
653 - return ret;  
654 -}  
655 -  
656 -void SrsEncoder::encoder_cycle()  
657 -{  
658 - int ret = ERROR_SUCCESS;  
659 -  
660 - log_context->generate_id();  
661 - srs_trace("encoder cycle start");  
662 -  
663 - SrsPithyPrint pithy_print(SRS_STAGE_ENCODER);  
664 -  
665 - while (loop) {  
666 - if ((ret = cycle()) != ERROR_SUCCESS) {  
667 - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);  
668 - } else {  
669 - srs_info("encoder cycle success, retry");  
670 - }  
671 -  
672 - if (!loop) {  
673 - break;  
674 - }  
675 -  
676 - encoder(&pithy_print);  
677 - pithy_print.elapse(SRS_ENCODER_SLEEP_MS);  
678 -  
679 - st_usleep(SRS_ENCODER_SLEEP_MS * 1000);  
680 - }  
681 -  
682 - // kill ffmpeg when finished and it alive  
683 - std::vector<SrsFFMPEG*>::iterator it;  
684 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
685 - SrsFFMPEG* ffmpeg = *it;  
686 - ffmpeg->stop();  
687 - }  
688 -  
689 - srs_trace("encoder cycle finished");  
690 -}  
691 -  
692 -void SrsEncoder::encoder(SrsPithyPrint* pithy_print)  
693 -{  
694 - // reportable  
695 - if (pithy_print->can_print()) {  
696 - srs_trace("-> time=%"PRId64", encoders=%d",  
697 - pithy_print->get_age(), (int)ffmpegs.size());  
698 - }  
699 -}  
700 -  
701 -void* SrsEncoder::encoder_thread(void* arg)  
702 -{  
703 - SrsEncoder* obj = (SrsEncoder*)arg;  
704 - srs_assert(obj != NULL);  
705 -  
706 - obj->loop = true;  
707 - obj->encoder_cycle();  
708 -  
709 - return NULL;  
710 -}  
711 -  
712 -#endif  
713 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_encoder.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <unistd.h>
  28 +#include <sys/wait.h>
  29 +#include <fcntl.h>
  30 +#include <signal.h>
  31 +#include <sys/types.h>
  32 +
  33 +#include <algorithm>
  34 +
  35 +#include <srs_core_error.hpp>
  36 +#include <srs_core_log.hpp>
  37 +#include <srs_core_config.hpp>
  38 +#include <srs_core_rtmp.hpp>
  39 +#include <srs_core_pithy_print.hpp>
  40 +
  41 +#ifdef SRS_FFMPEG
  42 +
  43 +#define SRS_ENCODER_SLEEP_MS 2000
  44 +
  45 +#define SRS_ENCODER_VCODEC "libx264"
  46 +#define SRS_ENCODER_ACODEC "libaacplus"
  47 +
  48 +// for encoder to detect the dead loop
  49 +static std::vector<std::string> _transcoded_url;
  50 +
  51 +SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
  52 +{
  53 + started = false;
  54 + pid = -1;
  55 + ffmpeg = ffmpeg_bin;
  56 +
  57 + vbitrate = 0;
  58 + vfps = 0;
  59 + vwidth = 0;
  60 + vheight = 0;
  61 + vthreads = 0;
  62 + abitrate = 0;
  63 + asample_rate = 0;
  64 + achannels = 0;
  65 +
  66 + log_fd = -1;
  67 +}
  68 +
  69 +SrsFFMPEG::~SrsFFMPEG()
  70 +{
  71 + stop();
  72 +}
  73 +
  74 +int SrsFFMPEG::initialize(SrsRequest* req, SrsConfDirective* engine)
  75 +{
  76 + int ret = ERROR_SUCCESS;
  77 +
  78 + config->get_engine_vfilter(engine, vfilter);
  79 + vcodec = config->get_engine_vcodec(engine);
  80 + vbitrate = config->get_engine_vbitrate(engine);
  81 + vfps = config->get_engine_vfps(engine);
  82 + vwidth = config->get_engine_vwidth(engine);
  83 + vheight = config->get_engine_vheight(engine);
  84 + vthreads = config->get_engine_vthreads(engine);
  85 + vprofile = config->get_engine_vprofile(engine);
  86 + vpreset = config->get_engine_vpreset(engine);
  87 + config->get_engine_vparams(engine, vparams);
  88 + acodec = config->get_engine_acodec(engine);
  89 + abitrate = config->get_engine_abitrate(engine);
  90 + asample_rate = config->get_engine_asample_rate(engine);
  91 + achannels = config->get_engine_achannels(engine);
  92 + config->get_engine_aparams(engine, aparams);
  93 + output = config->get_engine_output(engine);
  94 +
  95 + // ensure the size is even.
  96 + vwidth -= vwidth % 2;
  97 + vheight -= vheight % 2;
  98 +
  99 + // input stream, from local.
  100 + // ie. rtmp://127.0.0.1:1935/live/livestream
  101 + input = "rtmp://127.0.0.1:";
  102 + input += req->port;
  103 + input += "/";
  104 + input += req->app;
  105 + input += "?vhost=";
  106 + input += req->vhost;
  107 + input += "/";
  108 + input += req->stream;
  109 +
  110 + // output stream, to other/self server
  111 + // ie. rtmp://127.0.0.1:1935/live/livestream_sd
  112 + output = srs_replace(output, "[vhost]", req->vhost);
  113 + output = srs_replace(output, "[port]", req->port);
  114 + output = srs_replace(output, "[app]", req->app);
  115 + output = srs_replace(output, "[stream]", req->stream);
  116 + output = srs_replace(output, "[engine]", engine->arg0());
  117 +
  118 + // write ffmpeg info to log file.
  119 + log_file = config->get_log_dir();
  120 + log_file += "/";
  121 + log_file += "encoder";
  122 + log_file += "-";
  123 + log_file += req->vhost;
  124 + log_file += "-";
  125 + log_file += req->app;
  126 + log_file += "-";
  127 + log_file += req->stream;
  128 + log_file += ".log";
  129 +
  130 + // important: loop check, donot transcode again.
  131 + std::vector<std::string>::iterator it;
  132 + it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);
  133 + if (it != _transcoded_url.end()) {
  134 + ret = ERROR_ENCODER_LOOP;
  135 + srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
  136 + input.c_str(), output.c_str(), ret);
  137 + return ret;
  138 + }
  139 + _transcoded_url.push_back(output);
  140 +
  141 + if (vcodec != SRS_ENCODER_VCODEC) {
  142 + ret = ERROR_ENCODER_VCODEC;
  143 + srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
  144 + SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
  145 + return ret;
  146 + }
  147 + if (vbitrate <= 0) {
  148 + ret = ERROR_ENCODER_VBITRATE;
  149 + srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
  150 + return ret;
  151 + }
  152 + if (vfps <= 0) {
  153 + ret = ERROR_ENCODER_VFPS;
  154 + srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
  155 + return ret;
  156 + }
  157 + if (vwidth <= 0) {
  158 + ret = ERROR_ENCODER_VWIDTH;
  159 + srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
  160 + return ret;
  161 + }
  162 + if (vheight <= 0) {
  163 + ret = ERROR_ENCODER_VHEIGHT;
  164 + srs_error("invalid vheight: %d, ret=%d", vheight, ret);
  165 + return ret;
  166 + }
  167 + if (vthreads < 0) {
  168 + ret = ERROR_ENCODER_VTHREADS;
  169 + srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
  170 + return ret;
  171 + }
  172 + if (vprofile.empty()) {
  173 + ret = ERROR_ENCODER_VPROFILE;
  174 + srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
  175 + return ret;
  176 + }
  177 + if (vpreset.empty()) {
  178 + ret = ERROR_ENCODER_VPRESET;
  179 + srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
  180 + return ret;
  181 + }
  182 + if (acodec != SRS_ENCODER_ACODEC) {
  183 + ret = ERROR_ENCODER_ACODEC;
  184 + srs_error("invalid acodec, must be %s, actual %s, ret=%d",
  185 + SRS_ENCODER_ACODEC, acodec.c_str(), ret);
  186 + return ret;
  187 + }
  188 + if (abitrate <= 0) {
  189 + ret = ERROR_ENCODER_ABITRATE;
  190 + srs_error("invalid abitrate: %d, ret=%d",
  191 + abitrate, ret);
  192 + return ret;
  193 + }
  194 + if (asample_rate <= 0) {
  195 + ret = ERROR_ENCODER_ASAMPLE_RATE;
  196 + srs_error("invalid sample rate: %d, ret=%d",
  197 + asample_rate, ret);
  198 + return ret;
  199 + }
  200 + if (achannels != 1 && achannels != 2) {
  201 + ret = ERROR_ENCODER_ACHANNELS;
  202 + srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
  203 + achannels, ret);
  204 + return ret;
  205 + }
  206 + if (output.empty()) {
  207 + ret = ERROR_ENCODER_OUTPUT;
  208 + srs_error("invalid empty output, ret=%d", ret);
  209 + return ret;
  210 + }
  211 +
  212 + return ret;
  213 +}
  214 +
  215 +int SrsFFMPEG::start()
  216 +{
  217 + int ret = ERROR_SUCCESS;
  218 +
  219 + if (started) {
  220 + return ret;
  221 + }
  222 +
  223 + // prepare exec params
  224 + char tmp[256];
  225 + std::vector<std::string> params;
  226 +
  227 + // argv[0], set to ffmpeg bin.
  228 + // The execv() and execvp() functions ....
  229 + // The first argument, by convention, should point to
  230 + // the filename associated with the file being executed.
  231 + params.push_back(ffmpeg);
  232 +
  233 + // input.
  234 + params.push_back("-f");
  235 + params.push_back("flv");
  236 +
  237 + params.push_back("-i");
  238 + params.push_back(input);
  239 +
  240 + // build the filter
  241 + if (!vfilter.empty()) {
  242 + std::vector<std::string>::iterator it;
  243 + for (it = vfilter.begin(); it != vfilter.end(); ++it) {
  244 + std::string p = *it;
  245 + if (!p.empty()) {
  246 + params.push_back(p);
  247 + }
  248 + }
  249 + }
  250 +
  251 + // video specified.
  252 + params.push_back("-vcodec");
  253 + params.push_back(vcodec);
  254 +
  255 + params.push_back("-b:v");
  256 + snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
  257 + params.push_back(tmp);
  258 +
  259 + params.push_back("-r");
  260 + snprintf(tmp, sizeof(tmp), "%.2f", vfps);
  261 + params.push_back(tmp);
  262 +
  263 + params.push_back("-s");
  264 + snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
  265 + params.push_back(tmp);
  266 +
  267 + // TODO: add aspect if needed.
  268 + params.push_back("-aspect");
  269 + snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
  270 + params.push_back(tmp);
  271 +
  272 + params.push_back("-threads");
  273 + snprintf(tmp, sizeof(tmp), "%d", vthreads);
  274 + params.push_back(tmp);
  275 +
  276 + params.push_back("-profile:v");
  277 + params.push_back(vprofile);
  278 +
  279 + params.push_back("-preset");
  280 + params.push_back(vpreset);
  281 +
  282 + // vparams
  283 + if (!vparams.empty()) {
  284 + std::vector<std::string>::iterator it;
  285 + for (it = vparams.begin(); it != vparams.end(); ++it) {
  286 + std::string p = *it;
  287 + if (!p.empty()) {
  288 + params.push_back(p);
  289 + }
  290 + }
  291 + }
  292 +
  293 + // audio specified.
  294 + params.push_back("-acodec");
  295 + params.push_back(acodec);
  296 +
  297 + params.push_back("-b:a");
  298 + snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
  299 + params.push_back(tmp);
  300 +
  301 + params.push_back("-ar");
  302 + snprintf(tmp, sizeof(tmp), "%d", asample_rate);
  303 + params.push_back(tmp);
  304 +
  305 + params.push_back("-ac");
  306 + snprintf(tmp, sizeof(tmp), "%d", achannels);
  307 + params.push_back(tmp);
  308 +
  309 + // aparams
  310 + if (!aparams.empty()) {
  311 + std::vector<std::string>::iterator it;
  312 + for (it = aparams.begin(); it != aparams.end(); ++it) {
  313 + std::string p = *it;
  314 + if (!p.empty()) {
  315 + params.push_back(p);
  316 + }
  317 + }
  318 + }
  319 +
  320 + // output
  321 + params.push_back("-f");
  322 + params.push_back("flv");
  323 +
  324 + params.push_back("-y");
  325 + params.push_back(output);
  326 +
  327 + if (true) {
  328 + int pparam_size = 8 * 1024;
  329 + char* pparam = new char[pparam_size];
  330 + char* p = pparam;
  331 + char* last = pparam + pparam_size;
  332 + for (int i = 0; i < (int)params.size(); i++) {
  333 + std::string ffp = params[i];
  334 + snprintf(p, last - p, "%s ", ffp.c_str());
  335 + p += ffp.length() + 1;
  336 + }
  337 + srs_trace("start transcoder, log: %s, params: %s",
  338 + log_file.c_str(), pparam);
  339 + srs_freepa(pparam);
  340 + }
  341 +
  342 + // TODO: fork or vfork?
  343 + if ((pid = fork()) < 0) {
  344 + ret = ERROR_ENCODER_FORK;
  345 + srs_error("vfork process failed. ret=%d", ret);
  346 + return ret;
  347 + }
  348 +
  349 + // child process: ffmpeg encoder engine.
  350 + if (pid == 0) {
  351 + // redirect logs to file.
  352 + int flags = O_CREAT|O_WRONLY|O_APPEND;
  353 + mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
  354 + if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {
  355 + ret = ERROR_ENCODER_OPEN;
  356 + srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret);
  357 + return ret;
  358 + }
  359 + if (dup2(log_fd, STDOUT_FILENO) < 0) {
  360 + ret = ERROR_ENCODER_DUP2;
  361 + srs_error("dup2 encoder file failed. ret=%d", ret);
  362 + return ret;
  363 + }
  364 + if (dup2(log_fd, STDERR_FILENO) < 0) {
  365 + ret = ERROR_ENCODER_DUP2;
  366 + srs_error("dup2 encoder file failed. ret=%d", ret);
  367 + return ret;
  368 + }
  369 + // close other fds
  370 + // TODO: do in right way.
  371 + for (int i = 3; i < 1024; i++) {
  372 + ::close(i);
  373 + }
  374 +
  375 + // memory leak in child process, it's ok.
  376 + char** charpv_params = new char*[params.size() + 1];
  377 + for (int i = 0; i < (int)params.size(); i++) {
  378 + std::string p = params[i];
  379 + charpv_params[i] = (char*)p.c_str();
  380 + }
  381 + // EOF: NULL
  382 + charpv_params[params.size()] = NULL;
  383 +
  384 + // TODO: execv or execvp
  385 + ret = execv(ffmpeg.c_str(), charpv_params);
  386 + if (ret < 0) {
  387 + fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
  388 + errno, strerror(errno));
  389 + }
  390 + exit(ret);
  391 + }
  392 +
  393 + // parent.
  394 + if (pid > 0) {
  395 + started = true;
  396 + srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
  397 + return ret;
  398 + }
  399 +
  400 + return ret;
  401 +}
  402 +
  403 +int SrsFFMPEG::cycle()
  404 +{
  405 + int ret = ERROR_SUCCESS;
  406 +
  407 + if (!started) {
  408 + return ret;
  409 + }
  410 +
  411 + int status = 0;
  412 + pid_t p = waitpid(pid, &status, WNOHANG);
  413 +
  414 + if (p < 0) {
  415 + ret = ERROR_SYSTEM_WAITPID;
  416 + srs_error("transcode waitpid failed, pid=%d, ret=%d", pid, ret);
  417 + return ret;
  418 + }
  419 +
  420 + if (p == 0) {
  421 + srs_info("transcode process pid=%d is running.", pid);
  422 + return ret;
  423 + }
  424 +
  425 + srs_trace("transcode process pid=%d terminate, restart it.", pid);
  426 + started = false;
  427 +
  428 + return ret;
  429 +}
  430 +
  431 +void SrsFFMPEG::stop()
  432 +{
  433 + if (log_fd > 0) {
  434 + ::close(log_fd);
  435 + log_fd = -1;
  436 + }
  437 +
  438 + if (!started) {
  439 + return;
  440 + }
  441 +
  442 + // kill the ffmpeg,
  443 + // when rewind, upstream will stop publish(unpublish),
  444 + // unpublish event will stop all ffmpeg encoders,
  445 + // then publish will start all ffmpeg encoders.
  446 + if (pid > 0) {
  447 + if (kill(pid, SIGKILL) < 0) {
  448 + srs_warn("kill the encoder failed, ignored. pid=%d", pid);
  449 + }
  450 +
  451 + // wait for the ffmpeg to quit.
  452 + // ffmpeg will gracefully quit if signal is:
  453 + // 1) SIGHUP 2) SIGINT 3) SIGQUIT
  454 + // other signals, directly exit(123).
  455 + int status = 0;
  456 + if (waitpid(pid, &status, 0) < 0) {
  457 + srs_warn("wait the encoder quit failed, ignored. pid=%d", pid);
  458 + }
  459 +
  460 + srs_trace("stop the encoder success. pid=%d", pid);
  461 + pid = -1;
  462 + }
  463 +
  464 + std::vector<std::string>::iterator it;
  465 + it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output);
  466 + if (it != _transcoded_url.end()) {
  467 + _transcoded_url.erase(it);
  468 + }
  469 +}
  470 +
  471 +SrsEncoder::SrsEncoder()
  472 +{
  473 + tid = NULL;
  474 + loop = false;
  475 +}
  476 +
  477 +SrsEncoder::~SrsEncoder()
  478 +{
  479 + on_unpublish();
  480 +}
  481 +
  482 +int SrsEncoder::parse_scope_engines(SrsRequest* req)
  483 +{
  484 + int ret = ERROR_SUCCESS;
  485 +
  486 + // parse all transcode engines.
  487 + SrsConfDirective* conf = NULL;
  488 +
  489 + // parse vhost scope engines
  490 + std::string scope = "";
  491 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  492 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  493 + srs_error("parse vhost scope=%s transcode engines failed. "
  494 + "ret=%d", scope.c_str(), ret);
  495 + return ret;
  496 + }
  497 + }
  498 + // parse app scope engines
  499 + scope = req->app;
  500 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  501 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  502 + srs_error("parse app scope=%s transcode engines failed. "
  503 + "ret=%d", scope.c_str(), ret);
  504 + return ret;
  505 + }
  506 + }
  507 + // parse stream scope engines
  508 + scope += "/";
  509 + scope += req->stream;
  510 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  511 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  512 + srs_error("parse stream scope=%s transcode engines failed. "
  513 + "ret=%d", scope.c_str(), ret);
  514 + return ret;
  515 + }
  516 + }
  517 +
  518 + return ret;
  519 +}
  520 +
  521 +int SrsEncoder::on_publish(SrsRequest* req)
  522 +{
  523 + int ret = ERROR_SUCCESS;
  524 +
  525 + ret = parse_scope_engines(req);
  526 +
  527 + // ignore the loop encoder
  528 + if (ret == ERROR_ENCODER_LOOP) {
  529 + ret = ERROR_SUCCESS;
  530 + }
  531 +
  532 + // return for error or no engine.
  533 + if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
  534 + return ret;
  535 + }
  536 +
  537 + // start thread to run all encoding engines.
  538 + srs_assert(!tid);
  539 + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
  540 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  541 + srs_error("st_thread_create failed. ret=%d", ret);
  542 + return ret;
  543 + }
  544 +
  545 + return ret;
  546 +}
  547 +
  548 +void SrsEncoder::on_unpublish()
  549 +{
  550 + if (tid) {
  551 + loop = false;
  552 + st_thread_interrupt(tid);
  553 + st_thread_join(tid, NULL);
  554 + tid = NULL;
  555 + }
  556 +
  557 + clear_engines();
  558 +}
  559 +
  560 +void SrsEncoder::clear_engines()
  561 +{
  562 + std::vector<SrsFFMPEG*>::iterator it;
  563 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  564 + SrsFFMPEG* ffmpeg = *it;
  565 + srs_freep(ffmpeg);
  566 + }
  567 + ffmpegs.clear();
  568 +}
  569 +
  570 +SrsFFMPEG* SrsEncoder::at(int index)
  571 +{
  572 + return ffmpegs[index];
  573 +}
  574 +
  575 +int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)
  576 +{
  577 + int ret = ERROR_SUCCESS;
  578 +
  579 + srs_assert(conf);
  580 +
  581 + // enabled
  582 + if (!config->get_transcode_enabled(conf)) {
  583 + srs_trace("ignore the disabled transcode: %s",
  584 + conf->arg0().c_str());
  585 + return ret;
  586 + }
  587 +
  588 + // ffmpeg
  589 + std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
  590 + if (ffmpeg_bin.empty()) {
  591 + srs_trace("ignore the empty ffmpeg transcode: %s",
  592 + conf->arg0().c_str());
  593 + return ret;
  594 + }
  595 +
  596 + // get all engines.
  597 + std::vector<SrsConfDirective*> engines;
  598 + config->get_transcode_engines(conf, engines);
  599 + if (engines.empty()) {
  600 + srs_trace("ignore the empty transcode engine: %s",
  601 + conf->arg0().c_str());
  602 + return ret;
  603 + }
  604 +
  605 + // create engine
  606 + for (int i = 0; i < (int)engines.size(); i++) {
  607 + SrsConfDirective* engine = engines[i];
  608 + if (!config->get_engine_enabled(engine)) {
  609 + srs_trace("ignore the diabled transcode engine: %s %s",
  610 + conf->arg0().c_str(), engine->arg0().c_str());
  611 + continue;
  612 + }
  613 +
  614 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  615 +
  616 + if ((ret = ffmpeg->initialize(req, engine)) != ERROR_SUCCESS) {
  617 + srs_freep(ffmpeg);
  618 +
  619 + // if got a loop, donot transcode the whole stream.
  620 + if (ret == ERROR_ENCODER_LOOP) {
  621 + clear_engines();
  622 + break;
  623 + }
  624 +
  625 + srs_error("invalid transcode engine: %s %s",
  626 + conf->arg0().c_str(), engine->arg0().c_str());
  627 + return ret;
  628 + }
  629 +
  630 + ffmpegs.push_back(ffmpeg);
  631 + }
  632 +
  633 + return ret;
  634 +}
  635 +
  636 +int SrsEncoder::cycle()
  637 +{
  638 + int ret = ERROR_SUCCESS;
  639 +
  640 + std::vector<SrsFFMPEG*>::iterator it;
  641 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  642 + SrsFFMPEG* ffmpeg = *it;
  643 +
  644 + // start all ffmpegs.
  645 + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
  646 + srs_error("ffmpeg start failed. ret=%d", ret);
  647 + return ret;
  648 + }
  649 +
  650 + // check ffmpeg status.
  651 + if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {
  652 + srs_error("ffmpeg cycle failed. ret=%d", ret);
  653 + return ret;
  654 + }
  655 + }
  656 +
  657 + return ret;
  658 +}
  659 +
  660 +void SrsEncoder::encoder_cycle()
  661 +{
  662 + int ret = ERROR_SUCCESS;
  663 +
  664 + log_context->generate_id();
  665 + srs_trace("encoder cycle start");
  666 +
  667 + SrsPithyPrint pithy_print(SRS_STAGE_ENCODER);
  668 +
  669 + while (loop) {
  670 + if ((ret = cycle()) != ERROR_SUCCESS) {
  671 + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
  672 + } else {
  673 + srs_info("encoder cycle success, retry");
  674 + }
  675 +
  676 + if (!loop) {
  677 + break;
  678 + }
  679 +
  680 + encoder(&pithy_print);
  681 + pithy_print.elapse(SRS_ENCODER_SLEEP_MS);
  682 +
  683 + st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
  684 + }
  685 +
  686 + // kill ffmpeg when finished and it alive
  687 + std::vector<SrsFFMPEG*>::iterator it;
  688 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  689 + SrsFFMPEG* ffmpeg = *it;
  690 + ffmpeg->stop();
  691 + }
  692 +
  693 + srs_trace("encoder cycle finished");
  694 +}
  695 +
  696 +void SrsEncoder::encoder(SrsPithyPrint* pithy_print)
  697 +{
  698 + // reportable
  699 + if (pithy_print->can_print()) {
  700 + srs_trace("-> time=%"PRId64", encoders=%d",
  701 + pithy_print->get_age(), (int)ffmpegs.size());
  702 + }
  703 +}
  704 +
  705 +void* SrsEncoder::encoder_thread(void* arg)
  706 +{
  707 + SrsEncoder* obj = (SrsEncoder*)arg;
  708 + srs_assert(obj != NULL);
  709 +
  710 + obj->loop = true;
  711 + obj->encoder_cycle();
  712 +
  713 + return NULL;
  714 +}
  715 +
  716 +#endif
  717 +