winlin

update readme

@@ -36,7 +36,8 @@ FMS URL: rtmp://127.0.0.1:1935/live @@ -36,7 +36,8 @@ FMS URL: rtmp://127.0.0.1:1935/live
36 Stream: livestream 36 Stream: livestream
37 For example, use ffmpeg to publish: 37 For example, use ffmpeg to publish:
38 for((;;)); do \ 38 for((;;)); do \
39 - ./objs/ffmpeg/bin/ffmpeg -re -i ./doc/source.200kbps.768x320.flv -vcodec copy -acodec copy \ 39 + ./objs/ffmpeg/bin/ffmpeg -re -i ./doc/source.200kbps.768x320.flv \
  40 + -vcodec copy -acodec copy \
40 -f flv -y rtmp://127.0.0.1:1935/live/livestream; \ 41 -f flv -y rtmp://127.0.0.1:1935/live/livestream; \
41 sleep 1; \ 42 sleep 1; \
42 done 43 done
1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_encoder.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <unistd.h>  
28 -  
29 -#include <srs_core_error.hpp>  
30 -#include <srs_core_log.hpp>  
31 -#include <srs_core_config.hpp>  
32 -  
33 -#define SRS_ENCODER_SLEEP_MS 2000  
34 -  
35 -#define SRS_ENCODER_VCODEC "libx264"  
36 -#define SRS_ENCODER_ACODEC "libaacplus"  
37 -  
38 -SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)  
39 -{  
40 - started = false;  
41 - pid = -1;  
42 - ffmpeg = ffmpeg_bin;  
43 -  
44 - vbitrate = 0;  
45 - vfps = 0;  
46 - vwidth = 0;  
47 - vheight = 0;  
48 - vthreads = 0;  
49 - abitrate = 0;  
50 - asample_rate = 0;  
51 - achannels = 0;  
52 -}  
53 -  
54 -SrsFFMPEG::~SrsFFMPEG()  
55 -{  
56 - stop();  
57 -}  
58 -  
59 -int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)  
60 -{  
61 - int ret = ERROR_SUCCESS;  
62 -  
63 - config->get_engine_vfilter(engine, vfilter);  
64 - vcodec = config->get_engine_vcodec(engine);  
65 - vbitrate = config->get_engine_vbitrate(engine);  
66 - vfps = config->get_engine_vfps(engine);  
67 - vwidth = config->get_engine_vwidth(engine);  
68 - vheight = config->get_engine_vheight(engine);  
69 - vthreads = config->get_engine_vthreads(engine);  
70 - vprofile = config->get_engine_vprofile(engine);  
71 - vpreset = config->get_engine_vpreset(engine);  
72 - config->get_engine_vparams(engine, vparams);  
73 - acodec = config->get_engine_acodec(engine);  
74 - abitrate = config->get_engine_abitrate(engine);  
75 - asample_rate = config->get_engine_asample_rate(engine);  
76 - achannels = config->get_engine_achannels(engine);  
77 - config->get_engine_aparams(engine, aparams);  
78 - output = config->get_engine_output(engine);  
79 -  
80 - // ensure the size is even.  
81 - vwidth -= vwidth % 2;  
82 - vheight -= vheight % 2;  
83 -  
84 - // input stream, from local.  
85 - // ie. rtmp://127.0.0.1:1935/live/livestream  
86 - input = "rtmp://127.0.0.1:";  
87 - input += port;  
88 - input += "/";  
89 - input += app;  
90 - input += "/";  
91 - input += stream;  
92 -  
93 - // output stream, to other/self server  
94 - // ie. rtmp://127.0.0.1:1935/live/livestream_sd  
95 - if (vhost == RTMP_VHOST_DEFAULT) {  
96 - output = srs_replace(output, "[vhost]", "127.0.0.1");  
97 - } else {  
98 - output = srs_replace(output, "[vhost]", vhost);  
99 - }  
100 - output = srs_replace(output, "[port]", port);  
101 - output = srs_replace(output, "[app]", app);  
102 - output = srs_replace(output, "[stream]", stream);  
103 -  
104 - // important: loop check, donot transcode again.  
105 - // we think the following is loop circle:  
106 - // input: rtmp://127.0.0.1:1935/live/livestream_sd  
107 - // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd  
108 - std::string tail = ""; // tail="_sd"  
109 - if (output.length() > input.length()) {  
110 - tail = output.substr(input.length());  
111 - }  
112 - // if input also endwiths the tail, loop detected.  
113 - if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {  
114 - ret = ERROR_ENCODER_LOOP;  
115 - srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",  
116 - input.c_str(), output.c_str(), ret);  
117 - return ret;  
118 - }  
119 -  
120 - if (vcodec != SRS_ENCODER_VCODEC) {  
121 - ret = ERROR_ENCODER_VCODEC;  
122 - srs_error("invalid vcodec, must be %s, actual %s, ret=%d",  
123 - SRS_ENCODER_VCODEC, vcodec.c_str(), ret);  
124 - return ret;  
125 - }  
126 - if (vbitrate <= 0) {  
127 - ret = ERROR_ENCODER_VBITRATE;  
128 - srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);  
129 - return ret;  
130 - }  
131 - if (vfps <= 0) {  
132 - ret = ERROR_ENCODER_VFPS;  
133 - srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);  
134 - return ret;  
135 - }  
136 - if (vwidth <= 0) {  
137 - ret = ERROR_ENCODER_VWIDTH;  
138 - srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);  
139 - return ret;  
140 - }  
141 - if (vheight <= 0) {  
142 - ret = ERROR_ENCODER_VHEIGHT;  
143 - srs_error("invalid vheight: %d, ret=%d", vheight, ret);  
144 - return ret;  
145 - }  
146 - if (vthreads < 0) {  
147 - ret = ERROR_ENCODER_VTHREADS;  
148 - srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);  
149 - return ret;  
150 - }  
151 - if (vprofile.empty()) {  
152 - ret = ERROR_ENCODER_VPROFILE;  
153 - srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);  
154 - return ret;  
155 - }  
156 - if (vpreset.empty()) {  
157 - ret = ERROR_ENCODER_VPRESET;  
158 - srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);  
159 - return ret;  
160 - }  
161 - if (acodec != SRS_ENCODER_ACODEC) {  
162 - ret = ERROR_ENCODER_ACODEC;  
163 - srs_error("invalid acodec, must be %s, actual %s, ret=%d",  
164 - SRS_ENCODER_ACODEC, acodec.c_str(), ret);  
165 - return ret;  
166 - }  
167 - if (abitrate <= 0) {  
168 - ret = ERROR_ENCODER_ABITRATE;  
169 - srs_error("invalid abitrate: %d, ret=%d",  
170 - abitrate, ret);  
171 - return ret;  
172 - }  
173 - if (asample_rate <= 0) {  
174 - ret = ERROR_ENCODER_ASAMPLE_RATE;  
175 - srs_error("invalid sample rate: %d, ret=%d",  
176 - asample_rate, ret);  
177 - return ret;  
178 - }  
179 - if (achannels != 1 && achannels != 2) {  
180 - ret = ERROR_ENCODER_ACHANNELS;  
181 - srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",  
182 - achannels, ret);  
183 - return ret;  
184 - }  
185 - if (output.empty()) {  
186 - ret = ERROR_ENCODER_OUTPUT;  
187 - srs_error("invalid empty output, ret=%d", ret);  
188 - return ret;  
189 - }  
190 -  
191 - return ret;  
192 -}  
193 -  
194 -int SrsFFMPEG::start()  
195 -{  
196 - int ret = ERROR_SUCCESS;  
197 -  
198 - if (started) {  
199 - return ret;  
200 - }  
201 -  
202 - // prepare exec params  
203 - char tmp[256];  
204 - std::vector<std::string> params;  
205 -  
206 - // argv[0], set to ffmpeg bin.  
207 - // The execv() and execvp() functions ....  
208 - // The first argument, by convention, should point to  
209 - // the filename associated with the file being executed.  
210 - params.push_back(ffmpeg);  
211 -  
212 - // input.  
213 - params.push_back("-f");  
214 - params.push_back("flv");  
215 -  
216 - params.push_back("-i");  
217 - params.push_back(input);  
218 -  
219 - // build the filter  
220 - if (!vfilter.empty()) {  
221 - std::vector<std::string>::iterator it;  
222 - for (it = vfilter.begin(); it != vfilter.end(); ++it) {  
223 - std::string p = *it;  
224 - if (!p.empty()) {  
225 - params.push_back(p);  
226 - }  
227 - }  
228 - }  
229 -  
230 - // video specified.  
231 - params.push_back("-vcodec");  
232 - params.push_back(vcodec);  
233 -  
234 - params.push_back("-b:v");  
235 - snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);  
236 - params.push_back(tmp);  
237 -  
238 - params.push_back("-r");  
239 - snprintf(tmp, sizeof(tmp), "%.2f", vfps);  
240 - params.push_back(tmp);  
241 -  
242 - params.push_back("-s");  
243 - snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);  
244 - params.push_back(tmp);  
245 -  
246 - // TODO: add aspect if needed.  
247 - params.push_back("-aspect");  
248 - snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);  
249 - params.push_back(tmp);  
250 -  
251 - params.push_back("-threads");  
252 - snprintf(tmp, sizeof(tmp), "%d", vthreads);  
253 - params.push_back(tmp);  
254 -  
255 - params.push_back("-profile:v");  
256 - params.push_back(vprofile);  
257 -  
258 - params.push_back("-preset");  
259 - params.push_back(vpreset);  
260 -  
261 - // vparams  
262 - if (!vparams.empty()) {  
263 - std::vector<std::string>::iterator it;  
264 - for (it = vparams.begin(); it != vparams.end(); ++it) {  
265 - std::string p = *it;  
266 - if (!p.empty()) {  
267 - params.push_back(p);  
268 - }  
269 - }  
270 - }  
271 -  
272 - // audio specified.  
273 - params.push_back("-acodec");  
274 - params.push_back(acodec);  
275 -  
276 - params.push_back("-b:a");  
277 - snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);  
278 - params.push_back(tmp);  
279 -  
280 - params.push_back("-ar");  
281 - snprintf(tmp, sizeof(tmp), "%d", asample_rate);  
282 - params.push_back(tmp);  
283 -  
284 - params.push_back("-ac");  
285 - snprintf(tmp, sizeof(tmp), "%d", achannels);  
286 - params.push_back(tmp);  
287 -  
288 - // aparams  
289 - if (!aparams.empty()) {  
290 - std::vector<std::string>::iterator it;  
291 - for (it = aparams.begin(); it != aparams.end(); ++it) {  
292 - std::string p = *it;  
293 - if (!p.empty()) {  
294 - params.push_back(p);  
295 - }  
296 - }  
297 - }  
298 -  
299 - // output  
300 - params.push_back("-f");  
301 - params.push_back("flv");  
302 -  
303 - params.push_back("-y");  
304 - params.push_back(output);  
305 -  
306 - // TODO: fork or vfork?  
307 - if ((pid = fork()) < 0) {  
308 - ret = ERROR_ENCODER_FORK;  
309 - srs_error("vfork process failed. ret=%d", ret);  
310 - return ret;  
311 - }  
312 -  
313 - // child process: ffmpeg encoder engine.  
314 - if (pid == 0) {  
315 - // memory leak in child process, it's ok.  
316 - char** charpv_params = new char*[params.size() + 1];  
317 - for (int i = 0; i < (int)params.size(); i++) {  
318 - std::string p = params[i];  
319 - charpv_params[i] = (char*)p.c_str();  
320 - }  
321 - // EOF: NULL  
322 - charpv_params[params.size()] = NULL;  
323 -  
324 - // TODO: execv or execvp  
325 - ret = execv(ffmpeg.c_str(), charpv_params);  
326 - if (ret < 0) {  
327 - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",  
328 - errno, strerror(errno));  
329 - }  
330 - exit(ret);  
331 - }  
332 -  
333 - // parent.  
334 - if (pid > 0) {  
335 - started = true;  
336 - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);  
337 - return ret;  
338 - }  
339 -  
340 - return ret;  
341 -}  
342 -  
343 -void SrsFFMPEG::stop()  
344 -{  
345 - if (!started) {  
346 - return;  
347 - }  
348 -}  
349 -  
350 -SrsEncoder::SrsEncoder()  
351 -{  
352 - tid = NULL;  
353 - loop = false;  
354 -}  
355 -  
356 -SrsEncoder::~SrsEncoder()  
357 -{  
358 - on_unpublish();  
359 -}  
360 -  
361 -int SrsEncoder::parse_scope_engines()  
362 -{  
363 - int ret = ERROR_SUCCESS;  
364 -  
365 - // parse all transcode engines.  
366 - SrsConfDirective* conf = NULL;  
367 -  
368 - // parse vhost scope engines  
369 - std::string scope = "";  
370 - if ((conf = config->get_transcode(vhost, "")) != NULL) {  
371 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
372 - srs_error("parse vhost scope=%s transcode engines failed. "  
373 - "ret=%d", scope.c_str(), ret);  
374 - return ret;  
375 - }  
376 - }  
377 - // parse app scope engines  
378 - scope = app;  
379 - if ((conf = config->get_transcode(vhost, app)) != NULL) {  
380 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
381 - srs_error("parse app scope=%s transcode engines failed. "  
382 - "ret=%d", scope.c_str(), ret);  
383 - return ret;  
384 - }  
385 - }  
386 - // parse stream scope engines  
387 - scope += "/";  
388 - scope += stream;  
389 - if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {  
390 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
391 - srs_error("parse stream scope=%s transcode engines failed. "  
392 - "ret=%d", scope.c_str(), ret);  
393 - return ret;  
394 - }  
395 - }  
396 -  
397 - return ret;  
398 -}  
399 -  
400 -int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)  
401 -{  
402 - int ret = ERROR_SUCCESS;  
403 -  
404 - vhost = _vhost;  
405 - port = _port;  
406 - app = _app;  
407 - stream = _stream;  
408 -  
409 - ret = parse_scope_engines();  
410 -  
411 - // ignore the loop encoder  
412 - if (ret == ERROR_ENCODER_LOOP) {  
413 - ret = ERROR_SUCCESS;  
414 - }  
415 -  
416 - // return for error or no engine.  
417 - if (ret != ERROR_SUCCESS || ffmpegs.empty()) {  
418 - return ret;  
419 - }  
420 -  
421 - // start thread to run all encoding engines.  
422 - srs_assert(!tid);  
423 - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {  
424 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
425 - srs_error("st_thread_create failed. ret=%d", ret);  
426 - return ret;  
427 - }  
428 -  
429 - return ret;  
430 -}  
431 -  
432 -void SrsEncoder::on_unpublish()  
433 -{  
434 - if (tid) {  
435 - loop = false;  
436 - st_thread_interrupt(tid);  
437 - st_thread_join(tid, NULL);  
438 - tid = NULL;  
439 - }  
440 -  
441 - clear_engines();  
442 -}  
443 -  
444 -void SrsEncoder::clear_engines()  
445 -{  
446 - std::vector<SrsFFMPEG*>::iterator it;  
447 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
448 - SrsFFMPEG* ffmpeg = *it;  
449 - srs_freep(ffmpeg);  
450 - }  
451 - ffmpegs.clear();  
452 -}  
453 -  
454 -SrsFFMPEG* SrsEncoder::at(int index)  
455 -{  
456 - return ffmpegs[index];  
457 -}  
458 -  
459 -int SrsEncoder::parse_transcode(SrsConfDirective* conf)  
460 -{  
461 - int ret = ERROR_SUCCESS;  
462 -  
463 - srs_assert(conf);  
464 -  
465 - // enabled  
466 - if (!config->get_transcode_enabled(conf)) {  
467 - srs_trace("ignore the disabled transcode: %s",  
468 - conf->arg0().c_str());  
469 - return ret;  
470 - }  
471 -  
472 - // ffmpeg  
473 - std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);  
474 - if (ffmpeg_bin.empty()) {  
475 - srs_trace("ignore the empty ffmpeg transcode: %s",  
476 - conf->arg0().c_str());  
477 - return ret;  
478 - }  
479 -  
480 - // get all engines.  
481 - std::vector<SrsConfDirective*> engines;  
482 - config->get_transcode_engines(conf, engines);  
483 - if (engines.empty()) {  
484 - srs_trace("ignore the empty transcode engine: %s",  
485 - conf->arg0().c_str());  
486 - return ret;  
487 - }  
488 -  
489 - // create engine  
490 - for (int i = 0; i < (int)engines.size(); i++) {  
491 - SrsConfDirective* engine = engines[i];  
492 - if (!config->get_engine_enabled(engine)) {  
493 - srs_trace("ignore the diabled transcode engine: %s %s",  
494 - conf->arg0().c_str(), engine->arg0().c_str());  
495 - continue;  
496 - }  
497 -  
498 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
499 -  
500 - if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {  
501 - srs_freep(ffmpeg);  
502 -  
503 - // if got a loop, donot transcode the whole stream.  
504 - if (ret == ERROR_ENCODER_LOOP) {  
505 - clear_engines();  
506 - break;  
507 - }  
508 -  
509 - srs_error("invalid transcode engine: %s %s",  
510 - conf->arg0().c_str(), engine->arg0().c_str());  
511 - return ret;  
512 - }  
513 -  
514 - ffmpegs.push_back(ffmpeg);  
515 - }  
516 -  
517 - return ret;  
518 -}  
519 -  
520 -int SrsEncoder::cycle()  
521 -{  
522 - int ret = ERROR_SUCCESS;  
523 -  
524 - // start all ffmpegs.  
525 - std::vector<SrsFFMPEG*>::iterator it;  
526 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
527 - SrsFFMPEG* ffmpeg = *it;  
528 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {  
529 - srs_error("ffmpeg start failed. ret=%d", ret);  
530 - return ret;  
531 - }  
532 - }  
533 -  
534 - return ret;  
535 -}  
536 -  
537 -void SrsEncoder::encoder_cycle()  
538 -{  
539 - int ret = ERROR_SUCCESS;  
540 -  
541 - log_context->generate_id();  
542 - srs_trace("encoder cycle start");  
543 -  
544 - while (loop) {  
545 - if ((ret = cycle()) != ERROR_SUCCESS) {  
546 - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);  
547 - } else {  
548 - srs_info("encoder cycle success, retry");  
549 - }  
550 -  
551 - if (!loop) {  
552 - break;  
553 - }  
554 -  
555 - st_usleep(SRS_ENCODER_SLEEP_MS * 1000);  
556 - }  
557 -  
558 - // kill ffmpeg when finished and it alive  
559 - std::vector<SrsFFMPEG*>::iterator it;  
560 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
561 - SrsFFMPEG* ffmpeg = *it;  
562 - ffmpeg->stop();  
563 - }  
564 -  
565 - srs_trace("encoder cycle finished");  
566 -}  
567 -  
568 -void* SrsEncoder::encoder_thread(void* arg)  
569 -{  
570 - SrsEncoder* obj = (SrsEncoder*)arg;  
571 - srs_assert(obj != NULL);  
572 -  
573 - obj->loop = true;  
574 - obj->encoder_cycle();  
575 -  
576 - return NULL;  
577 -}  
578 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_encoder.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <unistd.h>
  28 +
  29 +#include <srs_core_error.hpp>
  30 +#include <srs_core_log.hpp>
  31 +#include <srs_core_config.hpp>
  32 +
  33 +#define SRS_ENCODER_SLEEP_MS 2000
  34 +
  35 +#define SRS_ENCODER_VCODEC "libx264"
  36 +#define SRS_ENCODER_ACODEC "libaacplus"
  37 +
  38 +SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
  39 +{
  40 + started = false;
  41 + pid = -1;
  42 + ffmpeg = ffmpeg_bin;
  43 +
  44 + vbitrate = 0;
  45 + vfps = 0;
  46 + vwidth = 0;
  47 + vheight = 0;
  48 + vthreads = 0;
  49 + abitrate = 0;
  50 + asample_rate = 0;
  51 + achannels = 0;
  52 +}
  53 +
  54 +SrsFFMPEG::~SrsFFMPEG()
  55 +{
  56 + stop();
  57 +}
  58 +
  59 +int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
  60 +{
  61 + int ret = ERROR_SUCCESS;
  62 +
  63 + config->get_engine_vfilter(engine, vfilter);
  64 + vcodec = config->get_engine_vcodec(engine);
  65 + vbitrate = config->get_engine_vbitrate(engine);
  66 + vfps = config->get_engine_vfps(engine);
  67 + vwidth = config->get_engine_vwidth(engine);
  68 + vheight = config->get_engine_vheight(engine);
  69 + vthreads = config->get_engine_vthreads(engine);
  70 + vprofile = config->get_engine_vprofile(engine);
  71 + vpreset = config->get_engine_vpreset(engine);
  72 + config->get_engine_vparams(engine, vparams);
  73 + acodec = config->get_engine_acodec(engine);
  74 + abitrate = config->get_engine_abitrate(engine);
  75 + asample_rate = config->get_engine_asample_rate(engine);
  76 + achannels = config->get_engine_achannels(engine);
  77 + config->get_engine_aparams(engine, aparams);
  78 + output = config->get_engine_output(engine);
  79 +
  80 + // ensure the size is even.
  81 + vwidth -= vwidth % 2;
  82 + vheight -= vheight % 2;
  83 +
  84 + // input stream, from local.
  85 + // ie. rtmp://127.0.0.1:1935/live/livestream
  86 + input = "rtmp://127.0.0.1:";
  87 + input += port;
  88 + input += "/";
  89 + input += app;
  90 + input += "/";
  91 + input += stream;
  92 +
  93 + // output stream, to other/self server
  94 + // ie. rtmp://127.0.0.1:1935/live/livestream_sd
  95 + if (vhost == RTMP_VHOST_DEFAULT) {
  96 + output = srs_replace(output, "[vhost]", "127.0.0.1");
  97 + } else {
  98 + output = srs_replace(output, "[vhost]", vhost);
  99 + }
  100 + output = srs_replace(output, "[port]", port);
  101 + output = srs_replace(output, "[app]", app);
  102 + output = srs_replace(output, "[stream]", stream);
  103 +
  104 + // important: loop check, donot transcode again.
  105 + // we think the following is loop circle:
  106 + // input: rtmp://127.0.0.1:1935/live/livestream_sd
  107 + // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
  108 + std::string tail = ""; // tail="_sd"
  109 + if (output.length() > input.length()) {
  110 + tail = output.substr(input.length());
  111 + }
  112 + // TODO: better dead loop check.
  113 + // if input also endwiths the tail, loop detected.
  114 + if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
  115 + ret = ERROR_ENCODER_LOOP;
  116 + srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
  117 + input.c_str(), output.c_str(), ret);
  118 + return ret;
  119 + }
  120 +
  121 + if (vcodec != SRS_ENCODER_VCODEC) {
  122 + ret = ERROR_ENCODER_VCODEC;
  123 + srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
  124 + SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
  125 + return ret;
  126 + }
  127 + if (vbitrate <= 0) {
  128 + ret = ERROR_ENCODER_VBITRATE;
  129 + srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
  130 + return ret;
  131 + }
  132 + if (vfps <= 0) {
  133 + ret = ERROR_ENCODER_VFPS;
  134 + srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
  135 + return ret;
  136 + }
  137 + if (vwidth <= 0) {
  138 + ret = ERROR_ENCODER_VWIDTH;
  139 + srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
  140 + return ret;
  141 + }
  142 + if (vheight <= 0) {
  143 + ret = ERROR_ENCODER_VHEIGHT;
  144 + srs_error("invalid vheight: %d, ret=%d", vheight, ret);
  145 + return ret;
  146 + }
  147 + if (vthreads < 0) {
  148 + ret = ERROR_ENCODER_VTHREADS;
  149 + srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
  150 + return ret;
  151 + }
  152 + if (vprofile.empty()) {
  153 + ret = ERROR_ENCODER_VPROFILE;
  154 + srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
  155 + return ret;
  156 + }
  157 + if (vpreset.empty()) {
  158 + ret = ERROR_ENCODER_VPRESET;
  159 + srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
  160 + return ret;
  161 + }
  162 + if (acodec != SRS_ENCODER_ACODEC) {
  163 + ret = ERROR_ENCODER_ACODEC;
  164 + srs_error("invalid acodec, must be %s, actual %s, ret=%d",
  165 + SRS_ENCODER_ACODEC, acodec.c_str(), ret);
  166 + return ret;
  167 + }
  168 + if (abitrate <= 0) {
  169 + ret = ERROR_ENCODER_ABITRATE;
  170 + srs_error("invalid abitrate: %d, ret=%d",
  171 + abitrate, ret);
  172 + return ret;
  173 + }
  174 + if (asample_rate <= 0) {
  175 + ret = ERROR_ENCODER_ASAMPLE_RATE;
  176 + srs_error("invalid sample rate: %d, ret=%d",
  177 + asample_rate, ret);
  178 + return ret;
  179 + }
  180 + if (achannels != 1 && achannels != 2) {
  181 + ret = ERROR_ENCODER_ACHANNELS;
  182 + srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
  183 + achannels, ret);
  184 + return ret;
  185 + }
  186 + if (output.empty()) {
  187 + ret = ERROR_ENCODER_OUTPUT;
  188 + srs_error("invalid empty output, ret=%d", ret);
  189 + return ret;
  190 + }
  191 +
  192 + return ret;
  193 +}
  194 +
  195 +int SrsFFMPEG::start()
  196 +{
  197 + int ret = ERROR_SUCCESS;
  198 +
  199 + if (started) {
  200 + return ret;
  201 + }
  202 +
  203 + // prepare exec params
  204 + char tmp[256];
  205 + std::vector<std::string> params;
  206 +
  207 + // argv[0], set to ffmpeg bin.
  208 + // The execv() and execvp() functions ....
  209 + // The first argument, by convention, should point to
  210 + // the filename associated with the file being executed.
  211 + params.push_back(ffmpeg);
  212 +
  213 + // input.
  214 + params.push_back("-f");
  215 + params.push_back("flv");
  216 +
  217 + params.push_back("-i");
  218 + params.push_back(input);
  219 +
  220 + // build the filter
  221 + if (!vfilter.empty()) {
  222 + std::vector<std::string>::iterator it;
  223 + for (it = vfilter.begin(); it != vfilter.end(); ++it) {
  224 + std::string p = *it;
  225 + if (!p.empty()) {
  226 + params.push_back(p);
  227 + }
  228 + }
  229 + }
  230 +
  231 + // video specified.
  232 + params.push_back("-vcodec");
  233 + params.push_back(vcodec);
  234 +
  235 + params.push_back("-b:v");
  236 + snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
  237 + params.push_back(tmp);
  238 +
  239 + params.push_back("-r");
  240 + snprintf(tmp, sizeof(tmp), "%.2f", vfps);
  241 + params.push_back(tmp);
  242 +
  243 + params.push_back("-s");
  244 + snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
  245 + params.push_back(tmp);
  246 +
  247 + // TODO: add aspect if needed.
  248 + params.push_back("-aspect");
  249 + snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
  250 + params.push_back(tmp);
  251 +
  252 + params.push_back("-threads");
  253 + snprintf(tmp, sizeof(tmp), "%d", vthreads);
  254 + params.push_back(tmp);
  255 +
  256 + params.push_back("-profile:v");
  257 + params.push_back(vprofile);
  258 +
  259 + params.push_back("-preset");
  260 + params.push_back(vpreset);
  261 +
  262 + // vparams
  263 + if (!vparams.empty()) {
  264 + std::vector<std::string>::iterator it;
  265 + for (it = vparams.begin(); it != vparams.end(); ++it) {
  266 + std::string p = *it;
  267 + if (!p.empty()) {
  268 + params.push_back(p);
  269 + }
  270 + }
  271 + }
  272 +
  273 + // audio specified.
  274 + params.push_back("-acodec");
  275 + params.push_back(acodec);
  276 +
  277 + params.push_back("-b:a");
  278 + snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
  279 + params.push_back(tmp);
  280 +
  281 + params.push_back("-ar");
  282 + snprintf(tmp, sizeof(tmp), "%d", asample_rate);
  283 + params.push_back(tmp);
  284 +
  285 + params.push_back("-ac");
  286 + snprintf(tmp, sizeof(tmp), "%d", achannels);
  287 + params.push_back(tmp);
  288 +
  289 + // aparams
  290 + if (!aparams.empty()) {
  291 + std::vector<std::string>::iterator it;
  292 + for (it = aparams.begin(); it != aparams.end(); ++it) {
  293 + std::string p = *it;
  294 + if (!p.empty()) {
  295 + params.push_back(p);
  296 + }
  297 + }
  298 + }
  299 +
  300 + // output
  301 + params.push_back("-f");
  302 + params.push_back("flv");
  303 +
  304 + params.push_back("-y");
  305 + params.push_back(output);
  306 +
  307 + // TODO: fork or vfork?
  308 + if ((pid = fork()) < 0) {
  309 + ret = ERROR_ENCODER_FORK;
  310 + srs_error("vfork process failed. ret=%d", ret);
  311 + return ret;
  312 + }
  313 +
  314 + // child process: ffmpeg encoder engine.
  315 + if (pid == 0) {
  316 + // memory leak in child process, it's ok.
  317 + char** charpv_params = new char*[params.size() + 1];
  318 + for (int i = 0; i < (int)params.size(); i++) {
  319 + std::string p = params[i];
  320 + charpv_params[i] = (char*)p.c_str();
  321 + }
  322 + // EOF: NULL
  323 + charpv_params[params.size()] = NULL;
  324 +
  325 + // TODO: execv or execvp
  326 + ret = execv(ffmpeg.c_str(), charpv_params);
  327 + if (ret < 0) {
  328 + fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
  329 + errno, strerror(errno));
  330 + }
  331 + exit(ret);
  332 + }
  333 +
  334 + // parent.
  335 + if (pid > 0) {
  336 + started = true;
  337 + srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
  338 + return ret;
  339 + }
  340 +
  341 + return ret;
  342 +}
  343 +
  344 +void SrsFFMPEG::stop()
  345 +{
  346 + if (!started) {
  347 + return;
  348 + }
  349 +}
  350 +
  351 +SrsEncoder::SrsEncoder()
  352 +{
  353 + tid = NULL;
  354 + loop = false;
  355 +}
  356 +
  357 +SrsEncoder::~SrsEncoder()
  358 +{
  359 + on_unpublish();
  360 +}
  361 +
  362 +int SrsEncoder::parse_scope_engines()
  363 +{
  364 + int ret = ERROR_SUCCESS;
  365 +
  366 + // parse all transcode engines.
  367 + SrsConfDirective* conf = NULL;
  368 +
  369 + // parse vhost scope engines
  370 + std::string scope = "";
  371 + if ((conf = config->get_transcode(vhost, "")) != NULL) {
  372 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  373 + srs_error("parse vhost scope=%s transcode engines failed. "
  374 + "ret=%d", scope.c_str(), ret);
  375 + return ret;
  376 + }
  377 + }
  378 + // parse app scope engines
  379 + scope = app;
  380 + if ((conf = config->get_transcode(vhost, app)) != NULL) {
  381 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  382 + srs_error("parse app scope=%s transcode engines failed. "
  383 + "ret=%d", scope.c_str(), ret);
  384 + return ret;
  385 + }
  386 + }
  387 + // parse stream scope engines
  388 + scope += "/";
  389 + scope += stream;
  390 + if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
  391 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  392 + srs_error("parse stream scope=%s transcode engines failed. "
  393 + "ret=%d", scope.c_str(), ret);
  394 + return ret;
  395 + }
  396 + }
  397 +
  398 + return ret;
  399 +}
  400 +
  401 +int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
  402 +{
  403 + int ret = ERROR_SUCCESS;
  404 +
  405 + vhost = _vhost;
  406 + port = _port;
  407 + app = _app;
  408 + stream = _stream;
  409 +
  410 + ret = parse_scope_engines();
  411 +
  412 + // ignore the loop encoder
  413 + if (ret == ERROR_ENCODER_LOOP) {
  414 + ret = ERROR_SUCCESS;
  415 + }
  416 +
  417 + // return for error or no engine.
  418 + if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
  419 + return ret;
  420 + }
  421 +
  422 + // start thread to run all encoding engines.
  423 + srs_assert(!tid);
  424 + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
  425 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  426 + srs_error("st_thread_create failed. ret=%d", ret);
  427 + return ret;
  428 + }
  429 +
  430 + return ret;
  431 +}
  432 +
  433 +void SrsEncoder::on_unpublish()
  434 +{
  435 + if (tid) {
  436 + loop = false;
  437 + st_thread_interrupt(tid);
  438 + st_thread_join(tid, NULL);
  439 + tid = NULL;
  440 + }
  441 +
  442 + clear_engines();
  443 +}
  444 +
  445 +void SrsEncoder::clear_engines()
  446 +{
  447 + std::vector<SrsFFMPEG*>::iterator it;
  448 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  449 + SrsFFMPEG* ffmpeg = *it;
  450 + srs_freep(ffmpeg);
  451 + }
  452 + ffmpegs.clear();
  453 +}
  454 +
  455 +SrsFFMPEG* SrsEncoder::at(int index)
  456 +{
  457 + return ffmpegs[index];
  458 +}
  459 +
  460 +int SrsEncoder::parse_transcode(SrsConfDirective* conf)
  461 +{
  462 + int ret = ERROR_SUCCESS;
  463 +
  464 + srs_assert(conf);
  465 +
  466 + // enabled
  467 + if (!config->get_transcode_enabled(conf)) {
  468 + srs_trace("ignore the disabled transcode: %s",
  469 + conf->arg0().c_str());
  470 + return ret;
  471 + }
  472 +
  473 + // ffmpeg
  474 + std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
  475 + if (ffmpeg_bin.empty()) {
  476 + srs_trace("ignore the empty ffmpeg transcode: %s",
  477 + conf->arg0().c_str());
  478 + return ret;
  479 + }
  480 +
  481 + // get all engines.
  482 + std::vector<SrsConfDirective*> engines;
  483 + config->get_transcode_engines(conf, engines);
  484 + if (engines.empty()) {
  485 + srs_trace("ignore the empty transcode engine: %s",
  486 + conf->arg0().c_str());
  487 + return ret;
  488 + }
  489 +
  490 + // create engine
  491 + for (int i = 0; i < (int)engines.size(); i++) {
  492 + SrsConfDirective* engine = engines[i];
  493 + if (!config->get_engine_enabled(engine)) {
  494 + srs_trace("ignore the diabled transcode engine: %s %s",
  495 + conf->arg0().c_str(), engine->arg0().c_str());
  496 + continue;
  497 + }
  498 +
  499 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  500 +
  501 + if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
  502 + srs_freep(ffmpeg);
  503 +
  504 + // if got a loop, donot transcode the whole stream.
  505 + if (ret == ERROR_ENCODER_LOOP) {
  506 + clear_engines();
  507 + break;
  508 + }
  509 +
  510 + srs_error("invalid transcode engine: %s %s",
  511 + conf->arg0().c_str(), engine->arg0().c_str());
  512 + return ret;
  513 + }
  514 +
  515 + ffmpegs.push_back(ffmpeg);
  516 + }
  517 +
  518 + return ret;
  519 +}
  520 +
  521 +int SrsEncoder::cycle()
  522 +{
  523 + int ret = ERROR_SUCCESS;
  524 +
  525 + // start all ffmpegs.
  526 + std::vector<SrsFFMPEG*>::iterator it;
  527 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  528 + SrsFFMPEG* ffmpeg = *it;
  529 + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
  530 + srs_error("ffmpeg start failed. ret=%d", ret);
  531 + return ret;
  532 + }
  533 + }
  534 +
  535 + return ret;
  536 +}
  537 +
  538 +void SrsEncoder::encoder_cycle()
  539 +{
  540 + int ret = ERROR_SUCCESS;
  541 +
  542 + log_context->generate_id();
  543 + srs_trace("encoder cycle start");
  544 +
  545 + while (loop) {
  546 + if ((ret = cycle()) != ERROR_SUCCESS) {
  547 + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
  548 + } else {
  549 + srs_info("encoder cycle success, retry");
  550 + }
  551 +
  552 + if (!loop) {
  553 + break;
  554 + }
  555 +
  556 + st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
  557 + }
  558 +
  559 + // kill ffmpeg when finished and it alive
  560 + std::vector<SrsFFMPEG*>::iterator it;
  561 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  562 + SrsFFMPEG* ffmpeg = *it;
  563 + ffmpeg->stop();
  564 + }
  565 +
  566 + srs_trace("encoder cycle finished");
  567 +}
  568 +
  569 +void* SrsEncoder::encoder_thread(void* arg)
  570 +{
  571 + SrsEncoder* obj = (SrsEncoder*)arg;
  572 + srs_assert(obj != NULL);
  573 +
  574 + obj->loop = true;
  575 + obj->encoder_cycle();
  576 +
  577 + return NULL;
  578 +}
  579 +
1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_forward.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <sys/socket.h>  
28 -#include <netinet/in.h>  
29 -#include <arpa/inet.h>  
30 -#include <netdb.h>  
31 -  
32 -#include <srs_core_error.hpp>  
33 -#include <srs_core_rtmp.hpp>  
34 -#include <srs_core_log.hpp>  
35 -#include <srs_core_protocol.hpp>  
36 -#include <srs_core_pithy_print.hpp>  
37 -  
38 -#define SRS_PULSE_TIMEOUT_MS 100  
39 -#define SRS_FORWARDER_SLEEP_MS 2000  
40 -#define SRS_SEND_TIMEOUT_US 3000000L  
41 -#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US  
42 -  
43 -SrsForwarder::SrsForwarder()  
44 -{  
45 - client = NULL;  
46 - stfd = NULL;  
47 - stream_id = 0;  
48 -  
49 - tid = NULL;  
50 - loop = false;  
51 -}  
52 -  
53 -SrsForwarder::~SrsForwarder()  
54 -{  
55 - on_unpublish();  
56 -  
57 - std::vector<SrsSharedPtrMessage*>::iterator it;  
58 - for (it = msgs.begin(); it != msgs.end(); ++it) {  
59 - SrsSharedPtrMessage* msg = *it;  
60 - srs_freep(msg);  
61 - }  
62 - msgs.clear();  
63 -}  
64 -  
65 -int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)  
66 -{  
67 - int ret = ERROR_SUCCESS;  
68 -  
69 - app = _app;  
70 -  
71 - tc_url = "rtmp://";  
72 - tc_url += vhost;  
73 - tc_url += "/";  
74 - tc_url += app;  
75 -  
76 - stream_name = stream;  
77 - server = forward_server;  
78 - port = 1935;  
79 -  
80 - size_t pos = forward_server.find(":");  
81 - if (pos != std::string::npos) {  
82 - port = ::atoi(forward_server.substr(pos + 1).c_str());  
83 - server = forward_server.substr(0, pos);  
84 - }  
85 -  
86 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
87 - return ret;  
88 - }  
89 -  
90 - srs_assert(!tid);  
91 - if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){  
92 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
93 - srs_error("st_thread_create failed. ret=%d", ret);  
94 - return ret;  
95 - }  
96 -  
97 - return ret;  
98 -}  
99 -  
100 -void SrsForwarder::on_unpublish()  
101 -{  
102 - if (tid) {  
103 - loop = false;  
104 - st_thread_interrupt(tid);  
105 - st_thread_join(tid, NULL);  
106 - tid = NULL;  
107 - }  
108 -  
109 - if (stfd) {  
110 - int fd = st_netfd_fileno(stfd);  
111 - st_netfd_close(stfd);  
112 - stfd = NULL;  
113 -  
114 - // st does not close it sometimes,  
115 - // close it manually.  
116 - close(fd);  
117 - }  
118 -  
119 - srs_freep(client);  
120 -}  
121 -  
122 -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)  
123 -{  
124 - int ret = ERROR_SUCCESS;  
125 -  
126 - msgs.push_back(metadata);  
127 -  
128 - return ret;  
129 -}  
130 -  
131 -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)  
132 -{  
133 - int ret = ERROR_SUCCESS;  
134 -  
135 - msgs.push_back(msg);  
136 -  
137 - return ret;  
138 -}  
139 -  
140 -int SrsForwarder::on_video(SrsSharedPtrMessage* msg)  
141 -{  
142 - int ret = ERROR_SUCCESS;  
143 -  
144 - msgs.push_back(msg);  
145 -  
146 - return ret;  
147 -}  
148 -  
149 -int SrsForwarder::open_socket()  
150 -{  
151 - int ret = ERROR_SUCCESS;  
152 -  
153 - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",  
154 - stream_name.c_str(), tc_url.c_str(), server.c_str(), port);  
155 -  
156 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
157 - if(sock == -1){  
158 - ret = ERROR_SOCKET_CREATE;  
159 - srs_error("create socket error. ret=%d", ret);  
160 - return ret;  
161 - }  
162 -  
163 - stfd = st_netfd_open_socket(sock);  
164 - if(stfd == NULL){  
165 - ret = ERROR_ST_OPEN_SOCKET;  
166 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
167 - return ret;  
168 - }  
169 -  
170 - srs_freep(client);  
171 - client = new SrsRtmpClient(stfd);  
172 -  
173 - return ret;  
174 -}  
175 -  
176 -int SrsForwarder::connect_server()  
177 -{  
178 - int ret = ERROR_SUCCESS;  
179 -  
180 - std::string ip = parse_server(server);  
181 - if (ip.empty()) {  
182 - ret = ERROR_SYSTEM_IP_INVALID;  
183 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
184 - return ret;  
185 - }  
186 -  
187 - sockaddr_in addr;  
188 - addr.sin_family = AF_INET;  
189 - addr.sin_port = htons(port);  
190 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
191 -  
192 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){  
193 - ret = ERROR_ST_CONNECT;  
194 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
195 - return ret;  
196 - }  
197 - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);  
198 -  
199 - return ret;  
200 -}  
201 -  
202 -std::string SrsForwarder::parse_server(std::string host)  
203 -{  
204 - if (inet_addr(host.c_str()) != INADDR_NONE) {  
205 - return host;  
206 - }  
207 -  
208 - hostent* answer = gethostbyname(host.c_str());  
209 - if (answer == NULL) {  
210 - srs_error("dns resolve host %s error.", host.c_str());  
211 - return "";  
212 - }  
213 -  
214 - char ipv4[16];  
215 - memset(ipv4, 0, sizeof(ipv4));  
216 - for (int i = 0; i < answer->h_length; i++) {  
217 - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));  
218 - srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);  
219 - break;  
220 - }  
221 -  
222 - return ipv4;  
223 -}  
224 -  
225 -int SrsForwarder::cycle()  
226 -{  
227 - int ret = ERROR_SUCCESS;  
228 -  
229 - client->set_recv_timeout(SRS_RECV_TIMEOUT_US);  
230 - client->set_send_timeout(SRS_SEND_TIMEOUT_US);  
231 -  
232 - if ((ret = connect_server()) != ERROR_SUCCESS) {  
233 - return ret;  
234 - }  
235 - srs_assert(client);  
236 -  
237 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
238 - srs_error("handshake with server failed. ret=%d", ret);  
239 - return ret;  
240 - }  
241 - if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {  
242 - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);  
243 - return ret;  
244 - }  
245 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
246 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);  
247 - return ret;  
248 - }  
249 - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {  
250 - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",  
251 - stream_name.c_str(), stream_id, ret);  
252 - return ret;  
253 - }  
254 -  
255 - if ((ret = forward()) != ERROR_SUCCESS) {  
256 - return ret;  
257 - }  
258 -  
259 - return ret;  
260 -}  
261 -  
262 -int SrsForwarder::forward()  
263 -{  
264 - int ret = ERROR_SUCCESS;  
265 -  
266 - client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);  
267 -  
268 - SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);  
269 -  
270 - while (loop) {  
271 - pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);  
272 -  
273 - // switch to other st-threads.  
274 - st_usleep(0);  
275 -  
276 - // read from client.  
277 - if (true) {  
278 - SrsCommonMessage* msg = NULL;  
279 - ret = client->recv_message(&msg);  
280 -  
281 - srs_verbose("play loop recv message. ret=%d", ret);  
282 - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {  
283 - srs_error("recv server control message failed. ret=%d", ret);  
284 - return ret;  
285 - }  
286 - }  
287 -  
288 - int count = (int)msgs.size();  
289 -  
290 - // reportable  
291 - if (pithy_print.can_print()) {  
292 - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
293 - pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());  
294 - }  
295 -  
296 - // all msgs to forward.  
297 - for (int i = 0; i < count; i++) {  
298 - SrsSharedPtrMessage* msg = msgs[i];  
299 - msgs[i] = NULL;  
300 -  
301 - if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {  
302 - srs_error("forwarder send message to server failed. ret=%d", ret);  
303 - return ret;  
304 - }  
305 - }  
306 - msgs.clear();  
307 - }  
308 -  
309 - return ret;  
310 -}  
311 -  
312 -void SrsForwarder::forward_cycle()  
313 -{  
314 - int ret = ERROR_SUCCESS;  
315 -  
316 - log_context->generate_id();  
317 - srs_trace("forward cycle start");  
318 -  
319 - while (loop) {  
320 - if ((ret = cycle()) != ERROR_SUCCESS) {  
321 - srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);  
322 - } else {  
323 - srs_info("forward cycle success, retry");  
324 - }  
325 -  
326 - if (!loop) {  
327 - break;  
328 - }  
329 -  
330 - st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);  
331 -  
332 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
333 - srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);  
334 - } else {  
335 - srs_info("forward cycle reopen success");  
336 - }  
337 - }  
338 - srs_trace("forward cycle finished");  
339 -}  
340 -  
341 -void* SrsForwarder::forward_thread(void* arg)  
342 -{  
343 - SrsForwarder* obj = (SrsForwarder*)arg;  
344 - srs_assert(obj != NULL);  
345 -  
346 - obj->loop = true;  
347 - obj->forward_cycle();  
348 -  
349 - return NULL;  
350 -}  
351 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_forward.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <sys/socket.h>
  28 +#include <netinet/in.h>
  29 +#include <arpa/inet.h>
  30 +#include <netdb.h>
  31 +
  32 +#include <srs_core_error.hpp>
  33 +#include <srs_core_rtmp.hpp>
  34 +#include <srs_core_log.hpp>
  35 +#include <srs_core_protocol.hpp>
  36 +#include <srs_core_pithy_print.hpp>
  37 +
  38 +#define SRS_PULSE_TIMEOUT_MS 100
  39 +#define SRS_FORWARDER_SLEEP_MS 2000
  40 +#define SRS_SEND_TIMEOUT_US 3000000L
  41 +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
  42 +
  43 +SrsForwarder::SrsForwarder()
  44 +{
  45 + client = NULL;
  46 + stfd = NULL;
  47 + stream_id = 0;
  48 +
  49 + tid = NULL;
  50 + loop = false;
  51 +}
  52 +
  53 +SrsForwarder::~SrsForwarder()
  54 +{
  55 + on_unpublish();
  56 +
  57 + std::vector<SrsSharedPtrMessage*>::iterator it;
  58 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  59 + SrsSharedPtrMessage* msg = *it;
  60 + srs_freep(msg);
  61 + }
  62 + msgs.clear();
  63 +}
  64 +
  65 +int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
  66 +{
  67 + int ret = ERROR_SUCCESS;
  68 +
  69 + app = _app;
  70 +
  71 + tc_url = "rtmp://";
  72 + tc_url += vhost;
  73 + tc_url += "/";
  74 + tc_url += app;
  75 +
  76 + stream_name = stream;
  77 + server = forward_server;
  78 + port = 1935;
  79 +
  80 + // TODO: dead loop check.
  81 +
  82 + size_t pos = forward_server.find(":");
  83 + if (pos != std::string::npos) {
  84 + port = ::atoi(forward_server.substr(pos + 1).c_str());
  85 + server = forward_server.substr(0, pos);
  86 + }
  87 +
  88 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  89 + return ret;
  90 + }
  91 +
  92 + srs_assert(!tid);
  93 + if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
  94 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  95 + srs_error("st_thread_create failed. ret=%d", ret);
  96 + return ret;
  97 + }
  98 +
  99 + return ret;
  100 +}
  101 +
  102 +void SrsForwarder::on_unpublish()
  103 +{
  104 + if (tid) {
  105 + loop = false;
  106 + st_thread_interrupt(tid);
  107 + st_thread_join(tid, NULL);
  108 + tid = NULL;
  109 + }
  110 +
  111 + if (stfd) {
  112 + int fd = st_netfd_fileno(stfd);
  113 + st_netfd_close(stfd);
  114 + stfd = NULL;
  115 +
  116 + // st does not close it sometimes,
  117 + // close it manually.
  118 + close(fd);
  119 + }
  120 +
  121 + srs_freep(client);
  122 +}
  123 +
  124 +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
  125 +{
  126 + int ret = ERROR_SUCCESS;
  127 +
  128 + msgs.push_back(metadata);
  129 +
  130 + return ret;
  131 +}
  132 +
  133 +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
  134 +{
  135 + int ret = ERROR_SUCCESS;
  136 +
  137 + msgs.push_back(msg);
  138 +
  139 + return ret;
  140 +}
  141 +
  142 +int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
  143 +{
  144 + int ret = ERROR_SUCCESS;
  145 +
  146 + msgs.push_back(msg);
  147 +
  148 + return ret;
  149 +}
  150 +
  151 +int SrsForwarder::open_socket()
  152 +{
  153 + int ret = ERROR_SUCCESS;
  154 +
  155 + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
  156 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
  157 +
  158 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  159 + if(sock == -1){
  160 + ret = ERROR_SOCKET_CREATE;
  161 + srs_error("create socket error. ret=%d", ret);
  162 + return ret;
  163 + }
  164 +
  165 + stfd = st_netfd_open_socket(sock);
  166 + if(stfd == NULL){
  167 + ret = ERROR_ST_OPEN_SOCKET;
  168 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  169 + return ret;
  170 + }
  171 +
  172 + srs_freep(client);
  173 + client = new SrsRtmpClient(stfd);
  174 +
  175 + return ret;
  176 +}
  177 +
  178 +int SrsForwarder::connect_server()
  179 +{
  180 + int ret = ERROR_SUCCESS;
  181 +
  182 + std::string ip = parse_server(server);
  183 + if (ip.empty()) {
  184 + ret = ERROR_SYSTEM_IP_INVALID;
  185 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  186 + return ret;
  187 + }
  188 +
  189 + sockaddr_in addr;
  190 + addr.sin_family = AF_INET;
  191 + addr.sin_port = htons(port);
  192 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  193 +
  194 + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
  195 + ret = ERROR_ST_CONNECT;
  196 + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
  197 + return ret;
  198 + }
  199 + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
  200 +
  201 + return ret;
  202 +}
  203 +
  204 +std::string SrsForwarder::parse_server(std::string host)
  205 +{
  206 + if (inet_addr(host.c_str()) != INADDR_NONE) {
  207 + return host;
  208 + }
  209 +
  210 + hostent* answer = gethostbyname(host.c_str());
  211 + if (answer == NULL) {
  212 + srs_error("dns resolve host %s error.", host.c_str());
  213 + return "";
  214 + }
  215 +
  216 + char ipv4[16];
  217 + memset(ipv4, 0, sizeof(ipv4));
  218 + for (int i = 0; i < answer->h_length; i++) {
  219 + inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
  220 + srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
  221 + break;
  222 + }
  223 +
  224 + return ipv4;
  225 +}
  226 +
  227 +int SrsForwarder::cycle()
  228 +{
  229 + int ret = ERROR_SUCCESS;
  230 +
  231 + client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  232 + client->set_send_timeout(SRS_SEND_TIMEOUT_US);
  233 +
  234 + if ((ret = connect_server()) != ERROR_SUCCESS) {
  235 + return ret;
  236 + }
  237 + srs_assert(client);
  238 +
  239 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  240 + srs_error("handshake with server failed. ret=%d", ret);
  241 + return ret;
  242 + }
  243 + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
  244 + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
  245 + return ret;
  246 + }
  247 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  248 + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  249 + return ret;
  250 + }
  251 + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
  252 + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
  253 + stream_name.c_str(), stream_id, ret);
  254 + return ret;
  255 + }
  256 +
  257 + if ((ret = forward()) != ERROR_SUCCESS) {
  258 + return ret;
  259 + }
  260 +
  261 + return ret;
  262 +}
  263 +
  264 +int SrsForwarder::forward()
  265 +{
  266 + int ret = ERROR_SUCCESS;
  267 +
  268 + client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
  269 +
  270 + SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
  271 +
  272 + while (loop) {
  273 + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
  274 +
  275 + // switch to other st-threads.
  276 + st_usleep(0);
  277 +
  278 + // read from client.
  279 + if (true) {
  280 + SrsCommonMessage* msg = NULL;
  281 + ret = client->recv_message(&msg);
  282 +
  283 + srs_verbose("play loop recv message. ret=%d", ret);
  284 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
  285 + srs_error("recv server control message failed. ret=%d", ret);
  286 + return ret;
  287 + }
  288 + }
  289 +
  290 + int count = (int)msgs.size();
  291 +
  292 + // reportable
  293 + if (pithy_print.can_print()) {
  294 + srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  295 + pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
  296 + }
  297 +
  298 + // all msgs to forward.
  299 + for (int i = 0; i < count; i++) {
  300 + SrsSharedPtrMessage* msg = msgs[i];
  301 + msgs[i] = NULL;
  302 +
  303 + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
  304 + srs_error("forwarder send message to server failed. ret=%d", ret);
  305 + return ret;
  306 + }
  307 + }
  308 + msgs.clear();
  309 + }
  310 +
  311 + return ret;
  312 +}
  313 +
  314 +void SrsForwarder::forward_cycle()
  315 +{
  316 + int ret = ERROR_SUCCESS;
  317 +
  318 + log_context->generate_id();
  319 + srs_trace("forward cycle start");
  320 +
  321 + while (loop) {
  322 + if ((ret = cycle()) != ERROR_SUCCESS) {
  323 + srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
  324 + } else {
  325 + srs_info("forward cycle success, retry");
  326 + }
  327 +
  328 + if (!loop) {
  329 + break;
  330 + }
  331 +
  332 + st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
  333 +
  334 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  335 + srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
  336 + } else {
  337 + srs_info("forward cycle reopen success");
  338 + }
  339 + }
  340 + srs_trace("forward cycle finished");
  341 +}
  342 +
  343 +void* SrsForwarder::forward_thread(void* arg)
  344 +{
  345 + SrsForwarder* obj = (SrsForwarder*)arg;
  346 + srs_assert(obj != NULL);
  347 +
  348 + obj->loop = true;
  349 + obj->forward_cycle();
  350 +
  351 + return NULL;
  352 +}
  353 +