winlin

add ffempty

1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_encoder.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <unistd.h>  
28 -  
29 -#include <srs_core_error.hpp>  
30 -#include <srs_core_log.hpp>  
31 -#include <srs_core_config.hpp>  
32 -  
33 -#define SRS_ENCODER_SLEEP_MS 2000  
34 -  
35 -#define SRS_ENCODER_VCODEC "libx264"  
36 -#define SRS_ENCODER_ACODEC "libaacplus"  
37 -  
38 -SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)  
39 -{  
40 - started = false;  
41 - pid = -1;  
42 - ffmpeg = ffmpeg_bin;  
43 -  
44 - vbitrate = 0;  
45 - vfps = 0;  
46 - vwidth = 0;  
47 - vheight = 0;  
48 - vthreads = 0;  
49 - abitrate = 0;  
50 - asample_rate = 0;  
51 - achannels = 0;  
52 -}  
53 -  
54 -SrsFFMPEG::~SrsFFMPEG()  
55 -{  
56 - stop();  
57 -}  
58 -  
59 -int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)  
60 -{  
61 - int ret = ERROR_SUCCESS;  
62 -  
63 - vcodec = config->get_engine_vcodec(engine);  
64 - vbitrate = config->get_engine_vbitrate(engine);  
65 - vfps = config->get_engine_vfps(engine);  
66 - vwidth = config->get_engine_vwidth(engine);  
67 - vheight = config->get_engine_vheight(engine);  
68 - vthreads = config->get_engine_vthreads(engine);  
69 - vprofile = config->get_engine_vprofile(engine);  
70 - vpreset = config->get_engine_vpreset(engine);  
71 - vparams = config->get_engine_vparams(engine);  
72 - acodec = config->get_engine_acodec(engine);  
73 - abitrate = config->get_engine_abitrate(engine);  
74 - asample_rate = config->get_engine_asample_rate(engine);  
75 - achannels = config->get_engine_achannels(engine);  
76 - aparams = config->get_engine_aparams(engine);  
77 - output = config->get_engine_output(engine);  
78 -  
79 - // ensure the size is even.  
80 - vwidth -= vwidth % 2;  
81 - vheight -= vheight % 2;  
82 -  
83 - // input stream, from local.  
84 - // ie. rtmp://127.0.0.1:1935/live/livestream  
85 - input = "rtmp://127.0.0.1:";  
86 - input += port;  
87 - input += "/";  
88 - input += app;  
89 - input += "/";  
90 - input += stream;  
91 -  
92 - // output stream, to other/self server  
93 - // ie. rtmp://127.0.0.1:1935/live/livestream_sd  
94 - if (vhost == RTMP_VHOST_DEFAULT) {  
95 - output = srs_replace(output, "[vhost]", "127.0.0.1");  
96 - } else {  
97 - output = srs_replace(output, "[vhost]", vhost);  
98 - }  
99 - output = srs_replace(output, "[port]", port);  
100 - output = srs_replace(output, "[app]", app);  
101 - output = srs_replace(output, "[stream]", stream);  
102 -  
103 - // important: loop check, donot transcode again.  
104 - // we think the following is loop circle:  
105 - // input: rtmp://127.0.0.1:1935/live/livestream_sd  
106 - // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd  
107 - std::string tail = ""; // tail="_sd"  
108 - if (output.length() > input.length()) {  
109 - tail = output.substr(input.length());  
110 - }  
111 - // if input also endwiths the tail, loop detected.  
112 - if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {  
113 - ret = ERROR_ENCODER_LOOP;  
114 - srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",  
115 - input.c_str(), output.c_str(), ret);  
116 - return ret;  
117 - }  
118 -  
119 - if (vcodec != SRS_ENCODER_VCODEC) {  
120 - ret = ERROR_ENCODER_VCODEC;  
121 - srs_error("invalid vcodec, must be %s, actual %s, ret=%d",  
122 - SRS_ENCODER_VCODEC, vcodec.c_str(), ret);  
123 - return ret;  
124 - }  
125 - if (vbitrate <= 0) {  
126 - ret = ERROR_ENCODER_VBITRATE;  
127 - srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);  
128 - return ret;  
129 - }  
130 - if (vfps <= 0) {  
131 - ret = ERROR_ENCODER_VFPS;  
132 - srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);  
133 - return ret;  
134 - }  
135 - if (vwidth <= 0) {  
136 - ret = ERROR_ENCODER_VWIDTH;  
137 - srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);  
138 - return ret;  
139 - }  
140 - if (vheight <= 0) {  
141 - ret = ERROR_ENCODER_VHEIGHT;  
142 - srs_error("invalid vheight: %d, ret=%d", vheight, ret);  
143 - return ret;  
144 - }  
145 - if (vthreads < 0) {  
146 - ret = ERROR_ENCODER_VTHREADS;  
147 - srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);  
148 - return ret;  
149 - }  
150 - if (vprofile.empty()) {  
151 - ret = ERROR_ENCODER_VPROFILE;  
152 - srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);  
153 - return ret;  
154 - }  
155 - if (vpreset.empty()) {  
156 - ret = ERROR_ENCODER_VPRESET;  
157 - srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);  
158 - return ret;  
159 - }  
160 - if (acodec != SRS_ENCODER_ACODEC) {  
161 - ret = ERROR_ENCODER_ACODEC;  
162 - srs_error("invalid acodec, must be %s, actual %s, ret=%d",  
163 - SRS_ENCODER_ACODEC, acodec.c_str(), ret);  
164 - return ret;  
165 - }  
166 - if (abitrate <= 0) {  
167 - ret = ERROR_ENCODER_ABITRATE;  
168 - srs_error("invalid abitrate: %d, ret=%d",  
169 - abitrate, ret);  
170 - return ret;  
171 - }  
172 - if (asample_rate <= 0) {  
173 - ret = ERROR_ENCODER_ASAMPLE_RATE;  
174 - srs_error("invalid sample rate: %d, ret=%d",  
175 - asample_rate, ret);  
176 - return ret;  
177 - }  
178 - if (achannels != 1 && achannels != 2) {  
179 - ret = ERROR_ENCODER_ACHANNELS;  
180 - srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",  
181 - achannels, ret);  
182 - return ret;  
183 - }  
184 - if (output.empty()) {  
185 - ret = ERROR_ENCODER_OUTPUT;  
186 - srs_error("invalid empty output, ret=%d", ret);  
187 - return ret;  
188 - }  
189 -  
190 - return ret;  
191 -}  
192 -  
193 -int SrsFFMPEG::start()  
194 -{  
195 - int ret = ERROR_SUCCESS;  
196 -  
197 - if (started) {  
198 - return ret;  
199 - }  
200 -  
201 - // prepare execl params  
202 - char vsize[22];  
203 - snprintf(vsize, sizeof(vsize), "%dx%d", vwidth, vheight);  
204 - char vaspect[22];  
205 - snprintf(vaspect, sizeof(vaspect), "%d:%d", vwidth, vheight);  
206 - char s_vbitrate[10];  
207 - snprintf(s_vbitrate, sizeof(s_vbitrate), "%d", vbitrate * 1000);  
208 - char s_vfps[10];  
209 - snprintf(s_vfps, sizeof(s_vfps), "%.2f", vfps);  
210 - char s_vthreads[10];  
211 - snprintf(s_vthreads, sizeof(s_vthreads), "%d", vthreads);  
212 - char s_abitrate[10];  
213 - snprintf(s_abitrate, sizeof(s_abitrate), "%d", abitrate * 1000);  
214 - char s_asample_rate[10];  
215 - snprintf(s_asample_rate, sizeof(s_asample_rate), "%d", asample_rate);  
216 - char s_achannels[10];  
217 - snprintf(s_achannels, sizeof(s_achannels), "%d", achannels);  
218 -  
219 - // TODO: execl donot support the params.  
220 - // video params  
221 - std::string s_vpreset = vpreset;  
222 - if (!vparams.empty()) {  
223 - s_vpreset += " ";  
224 - s_vpreset += vparams;  
225 - }  
226 - // audio params  
227 - std::string s_aparams = s_achannels;  
228 - if (!aparams.empty()) {  
229 - s_aparams += " ";  
230 - s_aparams += aparams;  
231 - }  
232 -  
233 - // TODO: fork or vfork?  
234 - if ((pid = fork()) < 0) {  
235 - ret = ERROR_ENCODER_FORK;  
236 - srs_error("vfork process failed. ret=%d", ret);  
237 - return ret;  
238 - }  
239 -  
240 - // child process: ffmpeg encoder engine.  
241 - if (pid == 0) {  
242 - // TODO: execl or execlp  
243 - ret = execl(ffmpeg.c_str(),  
244 - ffmpeg.c_str(),  
245 - "-f", "flv",  
246 - "-i", input.c_str(),  
247 - // video specified.  
248 - "-vcodec", vcodec.c_str(),  
249 - "-b:v", s_vbitrate,  
250 - "-r", s_vfps,  
251 - "-s", vsize,  
252 - "-aspect", vaspect, // TODO: add aspect if needed.  
253 - "-threads", s_vthreads,  
254 - "-profile:v", vprofile.c_str(),  
255 - "-preset", s_vpreset.c_str(),  
256 - // audio specified.  
257 - "-acodec", acodec.c_str(),  
258 - "-b:a", s_abitrate,  
259 - "-ar", s_asample_rate,  
260 - "-ac", s_aparams.c_str(),  
261 - "-f", "flv",  
262 - "-y", output.c_str(),  
263 - NULL  
264 - );  
265 - if (ret < 0) {  
266 - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",  
267 - errno, strerror(errno));  
268 - }  
269 - exit(ret);  
270 - }  
271 -  
272 - // parent.  
273 - if (pid > 0) {  
274 - started = true;  
275 - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);  
276 - return ret;  
277 - }  
278 -  
279 - return ret;  
280 -}  
281 -  
282 -void SrsFFMPEG::stop()  
283 -{  
284 - if (!started) {  
285 - return;  
286 - }  
287 -}  
288 -  
289 -SrsEncoder::SrsEncoder()  
290 -{  
291 - tid = NULL;  
292 - loop = false;  
293 -}  
294 -  
295 -SrsEncoder::~SrsEncoder()  
296 -{  
297 - on_unpublish();  
298 -}  
299 -  
300 -int SrsEncoder::parse_scope_engines()  
301 -{  
302 - int ret = ERROR_SUCCESS;  
303 -  
304 - // parse all transcode engines.  
305 - SrsConfDirective* conf = NULL;  
306 -  
307 - // parse vhost scope engines  
308 - std::string scope = "";  
309 - if ((conf = config->get_transcode(vhost, "")) != NULL) {  
310 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
311 - srs_error("parse vhost scope=%s transcode engines failed. "  
312 - "ret=%d", scope.c_str(), ret);  
313 - return ret;  
314 - }  
315 - }  
316 - // parse app scope engines  
317 - scope = app;  
318 - if ((conf = config->get_transcode(vhost, app)) != NULL) {  
319 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
320 - srs_error("parse app scope=%s transcode engines failed. "  
321 - "ret=%d", scope.c_str(), ret);  
322 - return ret;  
323 - }  
324 - }  
325 - // parse stream scope engines  
326 - scope += "/";  
327 - scope += stream;  
328 - if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {  
329 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
330 - srs_error("parse stream scope=%s transcode engines failed. "  
331 - "ret=%d", scope.c_str(), ret);  
332 - return ret;  
333 - }  
334 - }  
335 -  
336 - return ret;  
337 -}  
338 -  
339 -int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)  
340 -{  
341 - int ret = ERROR_SUCCESS;  
342 -  
343 - vhost = _vhost;  
344 - port = _port;  
345 - app = _app;  
346 - stream = _stream;  
347 -  
348 - ret = parse_scope_engines();  
349 -  
350 - // ignore the loop encoder  
351 - if (ret = ERROR_ENCODER_LOOP) {  
352 - ret = ERROR_SUCCESS;  
353 - }  
354 -  
355 - // return for error or no engine.  
356 - if (ret != ERROR_SUCCESS || ffmpegs.empty()) {  
357 - return ret;  
358 - }  
359 -  
360 - // start thread to run all encoding engines.  
361 - srs_assert(!tid);  
362 - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {  
363 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
364 - srs_error("st_thread_create failed. ret=%d", ret);  
365 - return ret;  
366 - }  
367 -  
368 - return ret;  
369 -}  
370 -  
371 -void SrsEncoder::on_unpublish()  
372 -{  
373 - if (tid) {  
374 - loop = false;  
375 - st_thread_interrupt(tid);  
376 - st_thread_join(tid, NULL);  
377 - tid = NULL;  
378 - }  
379 -  
380 - clear_engines();  
381 -}  
382 -  
383 -void SrsEncoder::clear_engines()  
384 -{  
385 - std::vector<SrsFFMPEG*>::iterator it;  
386 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
387 - SrsFFMPEG* ffmpeg = *it;  
388 - srs_freep(ffmpeg);  
389 - }  
390 - ffmpegs.clear();  
391 -}  
392 -  
393 -SrsFFMPEG* SrsEncoder::at(int index)  
394 -{  
395 - return ffmpegs[index];  
396 -}  
397 -  
398 -int SrsEncoder::parse_transcode(SrsConfDirective* conf)  
399 -{  
400 - int ret = ERROR_SUCCESS;  
401 -  
402 - srs_assert(conf);  
403 -  
404 - // enabled  
405 - if (!config->get_transcode_enabled(conf)) {  
406 - srs_trace("ignore the disabled transcode: %s",  
407 - conf->arg0().c_str());  
408 - return ret;  
409 - }  
410 -  
411 - // ffmpeg  
412 - std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);  
413 - if (ffmpeg_bin.empty()) {  
414 - srs_trace("ignore the empty ffmpeg transcode: %s",  
415 - conf->arg0().c_str());  
416 - return ret;  
417 - }  
418 -  
419 - // get all engines.  
420 - std::vector<SrsConfDirective*> engines;  
421 - config->get_transcode_engines(conf, engines);  
422 - if (engines.empty()) {  
423 - srs_trace("ignore the empty transcode engine: %s",  
424 - conf->arg0().c_str());  
425 - return ret;  
426 - }  
427 -  
428 - // create engine  
429 - for (int i = 0; i < (int)engines.size(); i++) {  
430 - SrsConfDirective* engine = engines[i];  
431 - if (!config->get_engine_enabled(engine)) {  
432 - srs_trace("ignore the diabled transcode engine: %s %s",  
433 - conf->arg0().c_str(), engine->arg0().c_str());  
434 - continue;  
435 - }  
436 -  
437 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
438 -  
439 - if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {  
440 - srs_freep(ffmpeg);  
441 -  
442 - // if got a loop, donot transcode the whole stream.  
443 - if (ret == ERROR_ENCODER_LOOP) {  
444 - clear_engines();  
445 - break;  
446 - }  
447 -  
448 - srs_error("invalid transcode engine: %s %s",  
449 - conf->arg0().c_str(), engine->arg0().c_str());  
450 - return ret;  
451 - }  
452 -  
453 - ffmpegs.push_back(ffmpeg);  
454 - }  
455 -  
456 - return ret;  
457 -}  
458 -  
459 -int SrsEncoder::cycle()  
460 -{  
461 - int ret = ERROR_SUCCESS;  
462 -  
463 - // start all ffmpegs.  
464 - std::vector<SrsFFMPEG*>::iterator it;  
465 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
466 - SrsFFMPEG* ffmpeg = *it;  
467 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {  
468 - srs_error("ffmpeg start failed. ret=%d", ret);  
469 - return ret;  
470 - }  
471 - }  
472 -  
473 - return ret;  
474 -}  
475 -  
476 -void SrsEncoder::encoder_cycle()  
477 -{  
478 - int ret = ERROR_SUCCESS;  
479 -  
480 - log_context->generate_id();  
481 - srs_trace("encoder cycle start");  
482 -  
483 - while (loop) {  
484 - if ((ret = cycle()) != ERROR_SUCCESS) {  
485 - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);  
486 - } else {  
487 - srs_info("encoder cycle success, retry");  
488 - }  
489 -  
490 - if (!loop) {  
491 - break;  
492 - }  
493 -  
494 - st_usleep(SRS_ENCODER_SLEEP_MS * 1000);  
495 - }  
496 -  
497 - // kill ffmpeg when finished and it alive  
498 - std::vector<SrsFFMPEG*>::iterator it;  
499 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
500 - SrsFFMPEG* ffmpeg = *it;  
501 - ffmpeg->stop();  
502 - }  
503 -  
504 - srs_trace("encoder cycle finished");  
505 -}  
506 -  
507 -void* SrsEncoder::encoder_thread(void* arg)  
508 -{  
509 - SrsEncoder* obj = (SrsEncoder*)arg;  
510 - srs_assert(obj != NULL);  
511 -  
512 - obj->loop = true;  
513 - obj->encoder_cycle();  
514 -  
515 - return NULL;  
516 -}  
517 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_encoder.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <unistd.h>
  28 +
  29 +#include <srs_core_error.hpp>
  30 +#include <srs_core_log.hpp>
  31 +#include <srs_core_config.hpp>
  32 +
  33 +#define SRS_ENCODER_SLEEP_MS 2000
  34 +
  35 +#define SRS_ENCODER_VCODEC "libx264"
  36 +#define SRS_ENCODER_ACODEC "libaacplus"
  37 +
  38 +SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
  39 +{
  40 + started = false;
  41 + pid = -1;
  42 + ffmpeg = ffmpeg_bin;
  43 +
  44 + vbitrate = 0;
  45 + vfps = 0;
  46 + vwidth = 0;
  47 + vheight = 0;
  48 + vthreads = 0;
  49 + abitrate = 0;
  50 + asample_rate = 0;
  51 + achannels = 0;
  52 +}
  53 +
  54 +SrsFFMPEG::~SrsFFMPEG()
  55 +{
  56 + stop();
  57 +}
  58 +
  59 +int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
  60 +{
  61 + int ret = ERROR_SUCCESS;
  62 +
  63 + vcodec = config->get_engine_vcodec(engine);
  64 + vbitrate = config->get_engine_vbitrate(engine);
  65 + vfps = config->get_engine_vfps(engine);
  66 + vwidth = config->get_engine_vwidth(engine);
  67 + vheight = config->get_engine_vheight(engine);
  68 + vthreads = config->get_engine_vthreads(engine);
  69 + vprofile = config->get_engine_vprofile(engine);
  70 + vpreset = config->get_engine_vpreset(engine);
  71 + vparams = config->get_engine_vparams(engine);
  72 + acodec = config->get_engine_acodec(engine);
  73 + abitrate = config->get_engine_abitrate(engine);
  74 + asample_rate = config->get_engine_asample_rate(engine);
  75 + achannels = config->get_engine_achannels(engine);
  76 + aparams = config->get_engine_aparams(engine);
  77 + output = config->get_engine_output(engine);
  78 +
  79 + // ensure the size is even.
  80 + vwidth -= vwidth % 2;
  81 + vheight -= vheight % 2;
  82 +
  83 + // input stream, from local.
  84 + // ie. rtmp://127.0.0.1:1935/live/livestream
  85 + input = "rtmp://127.0.0.1:";
  86 + input += port;
  87 + input += "/";
  88 + input += app;
  89 + input += "/";
  90 + input += stream;
  91 +
  92 + // output stream, to other/self server
  93 + // ie. rtmp://127.0.0.1:1935/live/livestream_sd
  94 + if (vhost == RTMP_VHOST_DEFAULT) {
  95 + output = srs_replace(output, "[vhost]", "127.0.0.1");
  96 + } else {
  97 + output = srs_replace(output, "[vhost]", vhost);
  98 + }
  99 + output = srs_replace(output, "[port]", port);
  100 + output = srs_replace(output, "[app]", app);
  101 + output = srs_replace(output, "[stream]", stream);
  102 +
  103 + // important: loop check, donot transcode again.
  104 + // we think the following is loop circle:
  105 + // input: rtmp://127.0.0.1:1935/live/livestream_sd
  106 + // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
  107 + std::string tail = ""; // tail="_sd"
  108 + if (output.length() > input.length()) {
  109 + tail = output.substr(input.length());
  110 + }
  111 + // if input also endwiths the tail, loop detected.
  112 + if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
  113 + ret = ERROR_ENCODER_LOOP;
  114 + srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
  115 + input.c_str(), output.c_str(), ret);
  116 + return ret;
  117 + }
  118 +
  119 + if (vcodec != SRS_ENCODER_VCODEC) {
  120 + ret = ERROR_ENCODER_VCODEC;
  121 + srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
  122 + SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
  123 + return ret;
  124 + }
  125 + if (vbitrate <= 0) {
  126 + ret = ERROR_ENCODER_VBITRATE;
  127 + srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
  128 + return ret;
  129 + }
  130 + if (vfps <= 0) {
  131 + ret = ERROR_ENCODER_VFPS;
  132 + srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
  133 + return ret;
  134 + }
  135 + if (vwidth <= 0) {
  136 + ret = ERROR_ENCODER_VWIDTH;
  137 + srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
  138 + return ret;
  139 + }
  140 + if (vheight <= 0) {
  141 + ret = ERROR_ENCODER_VHEIGHT;
  142 + srs_error("invalid vheight: %d, ret=%d", vheight, ret);
  143 + return ret;
  144 + }
  145 + if (vthreads < 0) {
  146 + ret = ERROR_ENCODER_VTHREADS;
  147 + srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
  148 + return ret;
  149 + }
  150 + if (vprofile.empty()) {
  151 + ret = ERROR_ENCODER_VPROFILE;
  152 + srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
  153 + return ret;
  154 + }
  155 + if (vpreset.empty()) {
  156 + ret = ERROR_ENCODER_VPRESET;
  157 + srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
  158 + return ret;
  159 + }
  160 + if (acodec != SRS_ENCODER_ACODEC) {
  161 + ret = ERROR_ENCODER_ACODEC;
  162 + srs_error("invalid acodec, must be %s, actual %s, ret=%d",
  163 + SRS_ENCODER_ACODEC, acodec.c_str(), ret);
  164 + return ret;
  165 + }
  166 + if (abitrate <= 0) {
  167 + ret = ERROR_ENCODER_ABITRATE;
  168 + srs_error("invalid abitrate: %d, ret=%d",
  169 + abitrate, ret);
  170 + return ret;
  171 + }
  172 + if (asample_rate <= 0) {
  173 + ret = ERROR_ENCODER_ASAMPLE_RATE;
  174 + srs_error("invalid sample rate: %d, ret=%d",
  175 + asample_rate, ret);
  176 + return ret;
  177 + }
  178 + if (achannels != 1 && achannels != 2) {
  179 + ret = ERROR_ENCODER_ACHANNELS;
  180 + srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
  181 + achannels, ret);
  182 + return ret;
  183 + }
  184 + if (output.empty()) {
  185 + ret = ERROR_ENCODER_OUTPUT;
  186 + srs_error("invalid empty output, ret=%d", ret);
  187 + return ret;
  188 + }
  189 +
  190 + return ret;
  191 +}
  192 +
  193 +int SrsFFMPEG::start()
  194 +{
  195 + int ret = ERROR_SUCCESS;
  196 +
  197 + if (started) {
  198 + return ret;
  199 + }
  200 +
  201 + // prepare execl params
  202 + char vsize[22];
  203 + snprintf(vsize, sizeof(vsize), "%dx%d", vwidth, vheight);
  204 + char vaspect[22];
  205 + snprintf(vaspect, sizeof(vaspect), "%d:%d", vwidth, vheight);
  206 + char s_vbitrate[10];
  207 + snprintf(s_vbitrate, sizeof(s_vbitrate), "%d", vbitrate * 1000);
  208 + char s_vfps[10];
  209 + snprintf(s_vfps, sizeof(s_vfps), "%.2f", vfps);
  210 + char s_vthreads[10];
  211 + snprintf(s_vthreads, sizeof(s_vthreads), "%d", vthreads);
  212 + char s_abitrate[10];
  213 + snprintf(s_abitrate, sizeof(s_abitrate), "%d", abitrate * 1000);
  214 + char s_asample_rate[10];
  215 + snprintf(s_asample_rate, sizeof(s_asample_rate), "%d", asample_rate);
  216 + char s_achannels[10];
  217 + snprintf(s_achannels, sizeof(s_achannels), "%d", achannels);
  218 +
  219 + // TODO: execl donot support the params.
  220 + // video params
  221 + std::string s_vpreset = vpreset;
  222 + if (!vparams.empty()) {
  223 + s_vpreset += " ";
  224 + s_vpreset += vparams;
  225 + }
  226 + // audio params
  227 + std::string s_aparams = s_achannels;
  228 + if (!aparams.empty()) {
  229 + s_aparams += " ";
  230 + s_aparams += aparams;
  231 + }
  232 +
  233 + // TODO: fork or vfork?
  234 + if ((pid = fork()) < 0) {
  235 + ret = ERROR_ENCODER_FORK;
  236 + srs_error("vfork process failed. ret=%d", ret);
  237 + return ret;
  238 + }
  239 +
  240 + // child process: ffmpeg encoder engine.
  241 + if (pid == 0) {
  242 + // TODO: execl or execlp
  243 + ret = execl(ffmpeg.c_str(),
  244 + "-f", "flv",
  245 + "-i", input.c_str(),
  246 + // video specified.
  247 + "-vcodec", vcodec.c_str(),
  248 + "-b:v", s_vbitrate,
  249 + "-r", s_vfps,
  250 + "-s", vsize,
  251 + "-aspect", vaspect, // TODO: add aspect if needed.
  252 + "-threads", s_vthreads,
  253 + "-profile:v", vprofile.c_str(),
  254 + "-preset", s_vpreset.c_str(),
  255 + // audio specified.
  256 + "-acodec", acodec.c_str(),
  257 + "-b:a", s_abitrate,
  258 + "-ar", s_asample_rate,
  259 + "-ac", s_aparams.c_str(),
  260 + "-f", "flv",
  261 + "-y", output.c_str(),
  262 + NULL
  263 + );
  264 + if (ret < 0) {
  265 + fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
  266 + errno, strerror(errno));
  267 + }
  268 + exit(ret);
  269 + }
  270 +
  271 + // parent.
  272 + if (pid > 0) {
  273 + started = true;
  274 + srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
  275 + return ret;
  276 + }
  277 +
  278 + return ret;
  279 +}
  280 +
  281 +void SrsFFMPEG::stop()
  282 +{
  283 + if (!started) {
  284 + return;
  285 + }
  286 +}
  287 +
  288 +SrsEncoder::SrsEncoder()
  289 +{
  290 + tid = NULL;
  291 + loop = false;
  292 +}
  293 +
  294 +SrsEncoder::~SrsEncoder()
  295 +{
  296 + on_unpublish();
  297 +}
  298 +
  299 +int SrsEncoder::parse_scope_engines()
  300 +{
  301 + int ret = ERROR_SUCCESS;
  302 +
  303 + // parse all transcode engines.
  304 + SrsConfDirective* conf = NULL;
  305 +
  306 + // parse vhost scope engines
  307 + std::string scope = "";
  308 + if ((conf = config->get_transcode(vhost, "")) != NULL) {
  309 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  310 + srs_error("parse vhost scope=%s transcode engines failed. "
  311 + "ret=%d", scope.c_str(), ret);
  312 + return ret;
  313 + }
  314 + }
  315 + // parse app scope engines
  316 + scope = app;
  317 + if ((conf = config->get_transcode(vhost, app)) != NULL) {
  318 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  319 + srs_error("parse app scope=%s transcode engines failed. "
  320 + "ret=%d", scope.c_str(), ret);
  321 + return ret;
  322 + }
  323 + }
  324 + // parse stream scope engines
  325 + scope += "/";
  326 + scope += stream;
  327 + if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
  328 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  329 + srs_error("parse stream scope=%s transcode engines failed. "
  330 + "ret=%d", scope.c_str(), ret);
  331 + return ret;
  332 + }
  333 + }
  334 +
  335 + return ret;
  336 +}
  337 +
  338 +int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
  339 +{
  340 + int ret = ERROR_SUCCESS;
  341 +
  342 + vhost = _vhost;
  343 + port = _port;
  344 + app = _app;
  345 + stream = _stream;
  346 +
  347 + ret = parse_scope_engines();
  348 +
  349 + // ignore the loop encoder
  350 + if (ret = ERROR_ENCODER_LOOP) {
  351 + ret = ERROR_SUCCESS;
  352 + }
  353 +
  354 + // return for error or no engine.
  355 + if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
  356 + return ret;
  357 + }
  358 +
  359 + // start thread to run all encoding engines.
  360 + srs_assert(!tid);
  361 + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
  362 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  363 + srs_error("st_thread_create failed. ret=%d", ret);
  364 + return ret;
  365 + }
  366 +
  367 + return ret;
  368 +}
  369 +
  370 +void SrsEncoder::on_unpublish()
  371 +{
  372 + if (tid) {
  373 + loop = false;
  374 + st_thread_interrupt(tid);
  375 + st_thread_join(tid, NULL);
  376 + tid = NULL;
  377 + }
  378 +
  379 + clear_engines();
  380 +}
  381 +
  382 +void SrsEncoder::clear_engines()
  383 +{
  384 + std::vector<SrsFFMPEG*>::iterator it;
  385 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  386 + SrsFFMPEG* ffmpeg = *it;
  387 + srs_freep(ffmpeg);
  388 + }
  389 + ffmpegs.clear();
  390 +}
  391 +
  392 +SrsFFMPEG* SrsEncoder::at(int index)
  393 +{
  394 + return ffmpegs[index];
  395 +}
  396 +
  397 +int SrsEncoder::parse_transcode(SrsConfDirective* conf)
  398 +{
  399 + int ret = ERROR_SUCCESS;
  400 +
  401 + srs_assert(conf);
  402 +
  403 + // enabled
  404 + if (!config->get_transcode_enabled(conf)) {
  405 + srs_trace("ignore the disabled transcode: %s",
  406 + conf->arg0().c_str());
  407 + return ret;
  408 + }
  409 +
  410 + // ffmpeg
  411 + std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
  412 + if (ffmpeg_bin.empty()) {
  413 + srs_trace("ignore the empty ffmpeg transcode: %s",
  414 + conf->arg0().c_str());
  415 + return ret;
  416 + }
  417 +
  418 + // get all engines.
  419 + std::vector<SrsConfDirective*> engines;
  420 + config->get_transcode_engines(conf, engines);
  421 + if (engines.empty()) {
  422 + srs_trace("ignore the empty transcode engine: %s",
  423 + conf->arg0().c_str());
  424 + return ret;
  425 + }
  426 +
  427 + // create engine
  428 + for (int i = 0; i < (int)engines.size(); i++) {
  429 + SrsConfDirective* engine = engines[i];
  430 + if (!config->get_engine_enabled(engine)) {
  431 + srs_trace("ignore the diabled transcode engine: %s %s",
  432 + conf->arg0().c_str(), engine->arg0().c_str());
  433 + continue;
  434 + }
  435 +
  436 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  437 +
  438 + if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
  439 + srs_freep(ffmpeg);
  440 +
  441 + // if got a loop, donot transcode the whole stream.
  442 + if (ret == ERROR_ENCODER_LOOP) {
  443 + clear_engines();
  444 + break;
  445 + }
  446 +
  447 + srs_error("invalid transcode engine: %s %s",
  448 + conf->arg0().c_str(), engine->arg0().c_str());
  449 + return ret;
  450 + }
  451 +
  452 + ffmpegs.push_back(ffmpeg);
  453 + }
  454 +
  455 + return ret;
  456 +}
  457 +
  458 +int SrsEncoder::cycle()
  459 +{
  460 + int ret = ERROR_SUCCESS;
  461 +
  462 + // start all ffmpegs.
  463 + std::vector<SrsFFMPEG*>::iterator it;
  464 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  465 + SrsFFMPEG* ffmpeg = *it;
  466 + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
  467 + srs_error("ffmpeg start failed. ret=%d", ret);
  468 + return ret;
  469 + }
  470 + }
  471 +
  472 + return ret;
  473 +}
  474 +
  475 +void SrsEncoder::encoder_cycle()
  476 +{
  477 + int ret = ERROR_SUCCESS;
  478 +
  479 + log_context->generate_id();
  480 + srs_trace("encoder cycle start");
  481 +
  482 + while (loop) {
  483 + if ((ret = cycle()) != ERROR_SUCCESS) {
  484 + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
  485 + } else {
  486 + srs_info("encoder cycle success, retry");
  487 + }
  488 +
  489 + if (!loop) {
  490 + break;
  491 + }
  492 +
  493 + st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
  494 + }
  495 +
  496 + // kill ffmpeg when finished and it alive
  497 + std::vector<SrsFFMPEG*>::iterator it;
  498 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  499 + SrsFFMPEG* ffmpeg = *it;
  500 + ffmpeg->stop();
  501 + }
  502 +
  503 + srs_trace("encoder cycle finished");
  504 +}
  505 +
  506 +void* SrsEncoder::encoder_thread(void* arg)
  507 +{
  508 + SrsEncoder* obj = (SrsEncoder*)arg;
  509 + srs_assert(obj != NULL);
  510 +
  511 + obj->loop = true;
  512 + obj->encoder_cycle();
  513 +
  514 + return NULL;
  515 +}
  516 +