winlin

convert to unix format

@@ -22,7 +22,7 @@ step 2: start srs <br/> @@ -22,7 +22,7 @@ step 2: start srs <br/>
22 <pre> 22 <pre>
23 ./objs/simple_rtmp_server -c conf/srs.conf 23 ./objs/simple_rtmp_server -c conf/srs.conf
24 </pre> 24 </pre>
25 -step 3(optinal): start srs listen at 19350 for forward<br/> 25 +step 3(optinal): start srs listen at 19350 to forward to<br/>
26 <pre> 26 <pre>
27 ./objs/simple_rtmp_server -c conf/srs.19350.conf 27 ./objs/simple_rtmp_server -c conf/srs.19350.conf
28 </pre> 28 </pre>
1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_encoder.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <unistd.h>  
28 -  
29 -#include <srs_core_error.hpp>  
30 -#include <srs_core_log.hpp>  
31 -#include <srs_core_config.hpp>  
32 -  
33 -#define SRS_ENCODER_SLEEP_MS 2000  
34 -  
35 -#define SRS_ENCODER_VCODEC "libx264"  
36 -#define SRS_ENCODER_ACODEC "libaacplus"  
37 -  
38 -SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)  
39 -{  
40 - started = false;  
41 - pid = -1;  
42 - ffmpeg = ffmpeg_bin;  
43 -  
44 - vbitrate = 0;  
45 - vfps = 0;  
46 - vwidth = 0;  
47 - vheight = 0;  
48 - vthreads = 0;  
49 - abitrate = 0;  
50 - asample_rate = 0;  
51 - achannels = 0;  
52 -}  
53 -  
54 -SrsFFMPEG::~SrsFFMPEG()  
55 -{  
56 - stop();  
57 -}  
58 -  
59 -int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)  
60 -{  
61 - int ret = ERROR_SUCCESS;  
62 -  
63 - config->get_engine_vfilter(engine, vfilter);  
64 - vcodec = config->get_engine_vcodec(engine);  
65 - vbitrate = config->get_engine_vbitrate(engine);  
66 - vfps = config->get_engine_vfps(engine);  
67 - vwidth = config->get_engine_vwidth(engine);  
68 - vheight = config->get_engine_vheight(engine);  
69 - vthreads = config->get_engine_vthreads(engine);  
70 - vprofile = config->get_engine_vprofile(engine);  
71 - vpreset = config->get_engine_vpreset(engine);  
72 - config->get_engine_vparams(engine, vparams);  
73 - acodec = config->get_engine_acodec(engine);  
74 - abitrate = config->get_engine_abitrate(engine);  
75 - asample_rate = config->get_engine_asample_rate(engine);  
76 - achannels = config->get_engine_achannels(engine);  
77 - config->get_engine_aparams(engine, aparams);  
78 - output = config->get_engine_output(engine);  
79 -  
80 - // ensure the size is even.  
81 - vwidth -= vwidth % 2;  
82 - vheight -= vheight % 2;  
83 -  
84 - // input stream, from local.  
85 - // ie. rtmp://127.0.0.1:1935/live/livestream  
86 - input = "rtmp://127.0.0.1:";  
87 - input += port;  
88 - input += "/";  
89 - input += app;  
90 - input += "/";  
91 - input += stream;  
92 -  
93 - // output stream, to other/self server  
94 - // ie. rtmp://127.0.0.1:1935/live/livestream_sd  
95 - if (vhost == RTMP_VHOST_DEFAULT) {  
96 - output = srs_replace(output, "[vhost]", "127.0.0.1");  
97 - } else {  
98 - output = srs_replace(output, "[vhost]", vhost);  
99 - }  
100 - output = srs_replace(output, "[port]", port);  
101 - output = srs_replace(output, "[app]", app);  
102 - output = srs_replace(output, "[stream]", stream);  
103 -  
104 - // important: loop check, donot transcode again.  
105 - // we think the following is loop circle:  
106 - // input: rtmp://127.0.0.1:1935/live/livestream_sd  
107 - // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd  
108 - std::string tail = ""; // tail="_sd"  
109 - if (output.length() > input.length()) {  
110 - tail = output.substr(input.length());  
111 - }  
112 - // TODO: better dead loop check.  
113 - // if input also endwiths the tail, loop detected.  
114 - if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {  
115 - ret = ERROR_ENCODER_LOOP;  
116 - srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",  
117 - input.c_str(), output.c_str(), ret);  
118 - return ret;  
119 - }  
120 -  
121 - if (vcodec != SRS_ENCODER_VCODEC) {  
122 - ret = ERROR_ENCODER_VCODEC;  
123 - srs_error("invalid vcodec, must be %s, actual %s, ret=%d",  
124 - SRS_ENCODER_VCODEC, vcodec.c_str(), ret);  
125 - return ret;  
126 - }  
127 - if (vbitrate <= 0) {  
128 - ret = ERROR_ENCODER_VBITRATE;  
129 - srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);  
130 - return ret;  
131 - }  
132 - if (vfps <= 0) {  
133 - ret = ERROR_ENCODER_VFPS;  
134 - srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);  
135 - return ret;  
136 - }  
137 - if (vwidth <= 0) {  
138 - ret = ERROR_ENCODER_VWIDTH;  
139 - srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);  
140 - return ret;  
141 - }  
142 - if (vheight <= 0) {  
143 - ret = ERROR_ENCODER_VHEIGHT;  
144 - srs_error("invalid vheight: %d, ret=%d", vheight, ret);  
145 - return ret;  
146 - }  
147 - if (vthreads < 0) {  
148 - ret = ERROR_ENCODER_VTHREADS;  
149 - srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);  
150 - return ret;  
151 - }  
152 - if (vprofile.empty()) {  
153 - ret = ERROR_ENCODER_VPROFILE;  
154 - srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);  
155 - return ret;  
156 - }  
157 - if (vpreset.empty()) {  
158 - ret = ERROR_ENCODER_VPRESET;  
159 - srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);  
160 - return ret;  
161 - }  
162 - if (acodec != SRS_ENCODER_ACODEC) {  
163 - ret = ERROR_ENCODER_ACODEC;  
164 - srs_error("invalid acodec, must be %s, actual %s, ret=%d",  
165 - SRS_ENCODER_ACODEC, acodec.c_str(), ret);  
166 - return ret;  
167 - }  
168 - if (abitrate <= 0) {  
169 - ret = ERROR_ENCODER_ABITRATE;  
170 - srs_error("invalid abitrate: %d, ret=%d",  
171 - abitrate, ret);  
172 - return ret;  
173 - }  
174 - if (asample_rate <= 0) {  
175 - ret = ERROR_ENCODER_ASAMPLE_RATE;  
176 - srs_error("invalid sample rate: %d, ret=%d",  
177 - asample_rate, ret);  
178 - return ret;  
179 - }  
180 - if (achannels != 1 && achannels != 2) {  
181 - ret = ERROR_ENCODER_ACHANNELS;  
182 - srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",  
183 - achannels, ret);  
184 - return ret;  
185 - }  
186 - if (output.empty()) {  
187 - ret = ERROR_ENCODER_OUTPUT;  
188 - srs_error("invalid empty output, ret=%d", ret);  
189 - return ret;  
190 - }  
191 -  
192 - return ret;  
193 -}  
194 -  
195 -int SrsFFMPEG::start()  
196 -{  
197 - int ret = ERROR_SUCCESS;  
198 -  
199 - if (started) {  
200 - return ret;  
201 - }  
202 -  
203 - // prepare exec params  
204 - char tmp[256];  
205 - std::vector<std::string> params;  
206 -  
207 - // argv[0], set to ffmpeg bin.  
208 - // The execv() and execvp() functions ....  
209 - // The first argument, by convention, should point to  
210 - // the filename associated with the file being executed.  
211 - params.push_back(ffmpeg);  
212 -  
213 - // input.  
214 - params.push_back("-f");  
215 - params.push_back("flv");  
216 -  
217 - params.push_back("-i");  
218 - params.push_back(input);  
219 -  
220 - // build the filter  
221 - if (!vfilter.empty()) {  
222 - std::vector<std::string>::iterator it;  
223 - for (it = vfilter.begin(); it != vfilter.end(); ++it) {  
224 - std::string p = *it;  
225 - if (!p.empty()) {  
226 - params.push_back(p);  
227 - }  
228 - }  
229 - }  
230 -  
231 - // video specified.  
232 - params.push_back("-vcodec");  
233 - params.push_back(vcodec);  
234 -  
235 - params.push_back("-b:v");  
236 - snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);  
237 - params.push_back(tmp);  
238 -  
239 - params.push_back("-r");  
240 - snprintf(tmp, sizeof(tmp), "%.2f", vfps);  
241 - params.push_back(tmp);  
242 -  
243 - params.push_back("-s");  
244 - snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);  
245 - params.push_back(tmp);  
246 -  
247 - // TODO: add aspect if needed.  
248 - params.push_back("-aspect");  
249 - snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);  
250 - params.push_back(tmp);  
251 -  
252 - params.push_back("-threads");  
253 - snprintf(tmp, sizeof(tmp), "%d", vthreads);  
254 - params.push_back(tmp);  
255 -  
256 - params.push_back("-profile:v");  
257 - params.push_back(vprofile);  
258 -  
259 - params.push_back("-preset");  
260 - params.push_back(vpreset);  
261 -  
262 - // vparams  
263 - if (!vparams.empty()) {  
264 - std::vector<std::string>::iterator it;  
265 - for (it = vparams.begin(); it != vparams.end(); ++it) {  
266 - std::string p = *it;  
267 - if (!p.empty()) {  
268 - params.push_back(p);  
269 - }  
270 - }  
271 - }  
272 -  
273 - // audio specified.  
274 - params.push_back("-acodec");  
275 - params.push_back(acodec);  
276 -  
277 - params.push_back("-b:a");  
278 - snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);  
279 - params.push_back(tmp);  
280 -  
281 - params.push_back("-ar");  
282 - snprintf(tmp, sizeof(tmp), "%d", asample_rate);  
283 - params.push_back(tmp);  
284 -  
285 - params.push_back("-ac");  
286 - snprintf(tmp, sizeof(tmp), "%d", achannels);  
287 - params.push_back(tmp);  
288 -  
289 - // aparams  
290 - if (!aparams.empty()) {  
291 - std::vector<std::string>::iterator it;  
292 - for (it = aparams.begin(); it != aparams.end(); ++it) {  
293 - std::string p = *it;  
294 - if (!p.empty()) {  
295 - params.push_back(p);  
296 - }  
297 - }  
298 - }  
299 -  
300 - // output  
301 - params.push_back("-f");  
302 - params.push_back("flv");  
303 -  
304 - params.push_back("-y");  
305 - params.push_back(output);  
306 -  
307 - // TODO: fork or vfork?  
308 - if ((pid = fork()) < 0) {  
309 - ret = ERROR_ENCODER_FORK;  
310 - srs_error("vfork process failed. ret=%d", ret);  
311 - return ret;  
312 - }  
313 -  
314 - // child process: ffmpeg encoder engine.  
315 - if (pid == 0) {  
316 - // memory leak in child process, it's ok.  
317 - char** charpv_params = new char*[params.size() + 1];  
318 - for (int i = 0; i < (int)params.size(); i++) {  
319 - std::string p = params[i];  
320 - charpv_params[i] = (char*)p.c_str();  
321 - }  
322 - // EOF: NULL  
323 - charpv_params[params.size()] = NULL;  
324 -  
325 - // TODO: execv or execvp  
326 - ret = execv(ffmpeg.c_str(), charpv_params);  
327 - if (ret < 0) {  
328 - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",  
329 - errno, strerror(errno));  
330 - }  
331 - exit(ret);  
332 - }  
333 -  
334 - // parent.  
335 - if (pid > 0) {  
336 - started = true;  
337 - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);  
338 - return ret;  
339 - }  
340 -  
341 - return ret;  
342 -}  
343 -  
344 -void SrsFFMPEG::stop()  
345 -{  
346 - if (!started) {  
347 - return;  
348 - }  
349 -}  
350 -  
351 -SrsEncoder::SrsEncoder()  
352 -{  
353 - tid = NULL;  
354 - loop = false;  
355 -}  
356 -  
357 -SrsEncoder::~SrsEncoder()  
358 -{  
359 - on_unpublish();  
360 -}  
361 -  
362 -int SrsEncoder::parse_scope_engines()  
363 -{  
364 - int ret = ERROR_SUCCESS;  
365 -  
366 - // parse all transcode engines.  
367 - SrsConfDirective* conf = NULL;  
368 -  
369 - // parse vhost scope engines  
370 - std::string scope = "";  
371 - if ((conf = config->get_transcode(vhost, "")) != NULL) {  
372 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
373 - srs_error("parse vhost scope=%s transcode engines failed. "  
374 - "ret=%d", scope.c_str(), ret);  
375 - return ret;  
376 - }  
377 - }  
378 - // parse app scope engines  
379 - scope = app;  
380 - if ((conf = config->get_transcode(vhost, app)) != NULL) {  
381 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
382 - srs_error("parse app scope=%s transcode engines failed. "  
383 - "ret=%d", scope.c_str(), ret);  
384 - return ret;  
385 - }  
386 - }  
387 - // parse stream scope engines  
388 - scope += "/";  
389 - scope += stream;  
390 - if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {  
391 - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {  
392 - srs_error("parse stream scope=%s transcode engines failed. "  
393 - "ret=%d", scope.c_str(), ret);  
394 - return ret;  
395 - }  
396 - }  
397 -  
398 - return ret;  
399 -}  
400 -  
401 -int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)  
402 -{  
403 - int ret = ERROR_SUCCESS;  
404 -  
405 - vhost = _vhost;  
406 - port = _port;  
407 - app = _app;  
408 - stream = _stream;  
409 -  
410 - ret = parse_scope_engines();  
411 -  
412 - // ignore the loop encoder  
413 - if (ret == ERROR_ENCODER_LOOP) {  
414 - ret = ERROR_SUCCESS;  
415 - }  
416 -  
417 - // return for error or no engine.  
418 - if (ret != ERROR_SUCCESS || ffmpegs.empty()) {  
419 - return ret;  
420 - }  
421 -  
422 - // start thread to run all encoding engines.  
423 - srs_assert(!tid);  
424 - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {  
425 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
426 - srs_error("st_thread_create failed. ret=%d", ret);  
427 - return ret;  
428 - }  
429 -  
430 - return ret;  
431 -}  
432 -  
433 -void SrsEncoder::on_unpublish()  
434 -{  
435 - if (tid) {  
436 - loop = false;  
437 - st_thread_interrupt(tid);  
438 - st_thread_join(tid, NULL);  
439 - tid = NULL;  
440 - }  
441 -  
442 - clear_engines();  
443 -}  
444 -  
445 -void SrsEncoder::clear_engines()  
446 -{  
447 - std::vector<SrsFFMPEG*>::iterator it;  
448 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
449 - SrsFFMPEG* ffmpeg = *it;  
450 - srs_freep(ffmpeg);  
451 - }  
452 - ffmpegs.clear();  
453 -}  
454 -  
455 -SrsFFMPEG* SrsEncoder::at(int index)  
456 -{  
457 - return ffmpegs[index];  
458 -}  
459 -  
460 -int SrsEncoder::parse_transcode(SrsConfDirective* conf)  
461 -{  
462 - int ret = ERROR_SUCCESS;  
463 -  
464 - srs_assert(conf);  
465 -  
466 - // enabled  
467 - if (!config->get_transcode_enabled(conf)) {  
468 - srs_trace("ignore the disabled transcode: %s",  
469 - conf->arg0().c_str());  
470 - return ret;  
471 - }  
472 -  
473 - // ffmpeg  
474 - std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);  
475 - if (ffmpeg_bin.empty()) {  
476 - srs_trace("ignore the empty ffmpeg transcode: %s",  
477 - conf->arg0().c_str());  
478 - return ret;  
479 - }  
480 -  
481 - // get all engines.  
482 - std::vector<SrsConfDirective*> engines;  
483 - config->get_transcode_engines(conf, engines);  
484 - if (engines.empty()) {  
485 - srs_trace("ignore the empty transcode engine: %s",  
486 - conf->arg0().c_str());  
487 - return ret;  
488 - }  
489 -  
490 - // create engine  
491 - for (int i = 0; i < (int)engines.size(); i++) {  
492 - SrsConfDirective* engine = engines[i];  
493 - if (!config->get_engine_enabled(engine)) {  
494 - srs_trace("ignore the diabled transcode engine: %s %s",  
495 - conf->arg0().c_str(), engine->arg0().c_str());  
496 - continue;  
497 - }  
498 -  
499 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
500 -  
501 - if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {  
502 - srs_freep(ffmpeg);  
503 -  
504 - // if got a loop, donot transcode the whole stream.  
505 - if (ret == ERROR_ENCODER_LOOP) {  
506 - clear_engines();  
507 - break;  
508 - }  
509 -  
510 - srs_error("invalid transcode engine: %s %s",  
511 - conf->arg0().c_str(), engine->arg0().c_str());  
512 - return ret;  
513 - }  
514 -  
515 - ffmpegs.push_back(ffmpeg);  
516 - }  
517 -  
518 - return ret;  
519 -}  
520 -  
521 -int SrsEncoder::cycle()  
522 -{  
523 - int ret = ERROR_SUCCESS;  
524 -  
525 - // start all ffmpegs.  
526 - std::vector<SrsFFMPEG*>::iterator it;  
527 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
528 - SrsFFMPEG* ffmpeg = *it;  
529 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {  
530 - srs_error("ffmpeg start failed. ret=%d", ret);  
531 - return ret;  
532 - }  
533 - }  
534 -  
535 - return ret;  
536 -}  
537 -  
538 -void SrsEncoder::encoder_cycle()  
539 -{  
540 - int ret = ERROR_SUCCESS;  
541 -  
542 - log_context->generate_id();  
543 - srs_trace("encoder cycle start");  
544 -  
545 - while (loop) {  
546 - if ((ret = cycle()) != ERROR_SUCCESS) {  
547 - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);  
548 - } else {  
549 - srs_info("encoder cycle success, retry");  
550 - }  
551 -  
552 - if (!loop) {  
553 - break;  
554 - }  
555 -  
556 - st_usleep(SRS_ENCODER_SLEEP_MS * 1000);  
557 - }  
558 -  
559 - // kill ffmpeg when finished and it alive  
560 - std::vector<SrsFFMPEG*>::iterator it;  
561 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
562 - SrsFFMPEG* ffmpeg = *it;  
563 - ffmpeg->stop();  
564 - }  
565 -  
566 - srs_trace("encoder cycle finished");  
567 -}  
568 -  
569 -void* SrsEncoder::encoder_thread(void* arg)  
570 -{  
571 - SrsEncoder* obj = (SrsEncoder*)arg;  
572 - srs_assert(obj != NULL);  
573 -  
574 - obj->loop = true;  
575 - obj->encoder_cycle();  
576 -  
577 - return NULL;  
578 -}  
579 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_encoder.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <unistd.h>
  28 +
  29 +#include <srs_core_error.hpp>
  30 +#include <srs_core_log.hpp>
  31 +#include <srs_core_config.hpp>
  32 +
  33 +#define SRS_ENCODER_SLEEP_MS 2000
  34 +
  35 +#define SRS_ENCODER_VCODEC "libx264"
  36 +#define SRS_ENCODER_ACODEC "libaacplus"
  37 +
  38 +SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
  39 +{
  40 + started = false;
  41 + pid = -1;
  42 + ffmpeg = ffmpeg_bin;
  43 +
  44 + vbitrate = 0;
  45 + vfps = 0;
  46 + vwidth = 0;
  47 + vheight = 0;
  48 + vthreads = 0;
  49 + abitrate = 0;
  50 + asample_rate = 0;
  51 + achannels = 0;
  52 +}
  53 +
  54 +SrsFFMPEG::~SrsFFMPEG()
  55 +{
  56 + stop();
  57 +}
  58 +
  59 +int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
  60 +{
  61 + int ret = ERROR_SUCCESS;
  62 +
  63 + config->get_engine_vfilter(engine, vfilter);
  64 + vcodec = config->get_engine_vcodec(engine);
  65 + vbitrate = config->get_engine_vbitrate(engine);
  66 + vfps = config->get_engine_vfps(engine);
  67 + vwidth = config->get_engine_vwidth(engine);
  68 + vheight = config->get_engine_vheight(engine);
  69 + vthreads = config->get_engine_vthreads(engine);
  70 + vprofile = config->get_engine_vprofile(engine);
  71 + vpreset = config->get_engine_vpreset(engine);
  72 + config->get_engine_vparams(engine, vparams);
  73 + acodec = config->get_engine_acodec(engine);
  74 + abitrate = config->get_engine_abitrate(engine);
  75 + asample_rate = config->get_engine_asample_rate(engine);
  76 + achannels = config->get_engine_achannels(engine);
  77 + config->get_engine_aparams(engine, aparams);
  78 + output = config->get_engine_output(engine);
  79 +
  80 + // ensure the size is even.
  81 + vwidth -= vwidth % 2;
  82 + vheight -= vheight % 2;
  83 +
  84 + // input stream, from local.
  85 + // ie. rtmp://127.0.0.1:1935/live/livestream
  86 + input = "rtmp://127.0.0.1:";
  87 + input += port;
  88 + input += "/";
  89 + input += app;
  90 + input += "/";
  91 + input += stream;
  92 +
  93 + // output stream, to other/self server
  94 + // ie. rtmp://127.0.0.1:1935/live/livestream_sd
  95 + if (vhost == RTMP_VHOST_DEFAULT) {
  96 + output = srs_replace(output, "[vhost]", "127.0.0.1");
  97 + } else {
  98 + output = srs_replace(output, "[vhost]", vhost);
  99 + }
  100 + output = srs_replace(output, "[port]", port);
  101 + output = srs_replace(output, "[app]", app);
  102 + output = srs_replace(output, "[stream]", stream);
  103 +
  104 + // important: loop check, donot transcode again.
  105 + // we think the following is loop circle:
  106 + // input: rtmp://127.0.0.1:1935/live/livestream_sd
  107 + // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
  108 + std::string tail = ""; // tail="_sd"
  109 + if (output.length() > input.length()) {
  110 + tail = output.substr(input.length());
  111 + }
  112 + // TODO: better dead loop check.
  113 + // if input also endwiths the tail, loop detected.
  114 + if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
  115 + ret = ERROR_ENCODER_LOOP;
  116 + srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
  117 + input.c_str(), output.c_str(), ret);
  118 + return ret;
  119 + }
  120 +
  121 + if (vcodec != SRS_ENCODER_VCODEC) {
  122 + ret = ERROR_ENCODER_VCODEC;
  123 + srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
  124 + SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
  125 + return ret;
  126 + }
  127 + if (vbitrate <= 0) {
  128 + ret = ERROR_ENCODER_VBITRATE;
  129 + srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
  130 + return ret;
  131 + }
  132 + if (vfps <= 0) {
  133 + ret = ERROR_ENCODER_VFPS;
  134 + srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
  135 + return ret;
  136 + }
  137 + if (vwidth <= 0) {
  138 + ret = ERROR_ENCODER_VWIDTH;
  139 + srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
  140 + return ret;
  141 + }
  142 + if (vheight <= 0) {
  143 + ret = ERROR_ENCODER_VHEIGHT;
  144 + srs_error("invalid vheight: %d, ret=%d", vheight, ret);
  145 + return ret;
  146 + }
  147 + if (vthreads < 0) {
  148 + ret = ERROR_ENCODER_VTHREADS;
  149 + srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
  150 + return ret;
  151 + }
  152 + if (vprofile.empty()) {
  153 + ret = ERROR_ENCODER_VPROFILE;
  154 + srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
  155 + return ret;
  156 + }
  157 + if (vpreset.empty()) {
  158 + ret = ERROR_ENCODER_VPRESET;
  159 + srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
  160 + return ret;
  161 + }
  162 + if (acodec != SRS_ENCODER_ACODEC) {
  163 + ret = ERROR_ENCODER_ACODEC;
  164 + srs_error("invalid acodec, must be %s, actual %s, ret=%d",
  165 + SRS_ENCODER_ACODEC, acodec.c_str(), ret);
  166 + return ret;
  167 + }
  168 + if (abitrate <= 0) {
  169 + ret = ERROR_ENCODER_ABITRATE;
  170 + srs_error("invalid abitrate: %d, ret=%d",
  171 + abitrate, ret);
  172 + return ret;
  173 + }
  174 + if (asample_rate <= 0) {
  175 + ret = ERROR_ENCODER_ASAMPLE_RATE;
  176 + srs_error("invalid sample rate: %d, ret=%d",
  177 + asample_rate, ret);
  178 + return ret;
  179 + }
  180 + if (achannels != 1 && achannels != 2) {
  181 + ret = ERROR_ENCODER_ACHANNELS;
  182 + srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
  183 + achannels, ret);
  184 + return ret;
  185 + }
  186 + if (output.empty()) {
  187 + ret = ERROR_ENCODER_OUTPUT;
  188 + srs_error("invalid empty output, ret=%d", ret);
  189 + return ret;
  190 + }
  191 +
  192 + return ret;
  193 +}
  194 +
  195 +int SrsFFMPEG::start()
  196 +{
  197 + int ret = ERROR_SUCCESS;
  198 +
  199 + if (started) {
  200 + return ret;
  201 + }
  202 +
  203 + // prepare exec params
  204 + char tmp[256];
  205 + std::vector<std::string> params;
  206 +
  207 + // argv[0], set to ffmpeg bin.
  208 + // The execv() and execvp() functions ....
  209 + // The first argument, by convention, should point to
  210 + // the filename associated with the file being executed.
  211 + params.push_back(ffmpeg);
  212 +
  213 + // input.
  214 + params.push_back("-f");
  215 + params.push_back("flv");
  216 +
  217 + params.push_back("-i");
  218 + params.push_back(input);
  219 +
  220 + // build the filter
  221 + if (!vfilter.empty()) {
  222 + std::vector<std::string>::iterator it;
  223 + for (it = vfilter.begin(); it != vfilter.end(); ++it) {
  224 + std::string p = *it;
  225 + if (!p.empty()) {
  226 + params.push_back(p);
  227 + }
  228 + }
  229 + }
  230 +
  231 + // video specified.
  232 + params.push_back("-vcodec");
  233 + params.push_back(vcodec);
  234 +
  235 + params.push_back("-b:v");
  236 + snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
  237 + params.push_back(tmp);
  238 +
  239 + params.push_back("-r");
  240 + snprintf(tmp, sizeof(tmp), "%.2f", vfps);
  241 + params.push_back(tmp);
  242 +
  243 + params.push_back("-s");
  244 + snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
  245 + params.push_back(tmp);
  246 +
  247 + // TODO: add aspect if needed.
  248 + params.push_back("-aspect");
  249 + snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
  250 + params.push_back(tmp);
  251 +
  252 + params.push_back("-threads");
  253 + snprintf(tmp, sizeof(tmp), "%d", vthreads);
  254 + params.push_back(tmp);
  255 +
  256 + params.push_back("-profile:v");
  257 + params.push_back(vprofile);
  258 +
  259 + params.push_back("-preset");
  260 + params.push_back(vpreset);
  261 +
  262 + // vparams
  263 + if (!vparams.empty()) {
  264 + std::vector<std::string>::iterator it;
  265 + for (it = vparams.begin(); it != vparams.end(); ++it) {
  266 + std::string p = *it;
  267 + if (!p.empty()) {
  268 + params.push_back(p);
  269 + }
  270 + }
  271 + }
  272 +
  273 + // audio specified.
  274 + params.push_back("-acodec");
  275 + params.push_back(acodec);
  276 +
  277 + params.push_back("-b:a");
  278 + snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
  279 + params.push_back(tmp);
  280 +
  281 + params.push_back("-ar");
  282 + snprintf(tmp, sizeof(tmp), "%d", asample_rate);
  283 + params.push_back(tmp);
  284 +
  285 + params.push_back("-ac");
  286 + snprintf(tmp, sizeof(tmp), "%d", achannels);
  287 + params.push_back(tmp);
  288 +
  289 + // aparams
  290 + if (!aparams.empty()) {
  291 + std::vector<std::string>::iterator it;
  292 + for (it = aparams.begin(); it != aparams.end(); ++it) {
  293 + std::string p = *it;
  294 + if (!p.empty()) {
  295 + params.push_back(p);
  296 + }
  297 + }
  298 + }
  299 +
  300 + // output
  301 + params.push_back("-f");
  302 + params.push_back("flv");
  303 +
  304 + params.push_back("-y");
  305 + params.push_back(output);
  306 +
  307 + // TODO: fork or vfork?
  308 + if ((pid = fork()) < 0) {
  309 + ret = ERROR_ENCODER_FORK;
  310 + srs_error("vfork process failed. ret=%d", ret);
  311 + return ret;
  312 + }
  313 +
  314 + // child process: ffmpeg encoder engine.
  315 + if (pid == 0) {
  316 + // memory leak in child process, it's ok.
  317 + char** charpv_params = new char*[params.size() + 1];
  318 + for (int i = 0; i < (int)params.size(); i++) {
  319 + std::string p = params[i];
  320 + charpv_params[i] = (char*)p.c_str();
  321 + }
  322 + // EOF: NULL
  323 + charpv_params[params.size()] = NULL;
  324 +
  325 + // TODO: execv or execvp
  326 + ret = execv(ffmpeg.c_str(), charpv_params);
  327 + if (ret < 0) {
  328 + fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
  329 + errno, strerror(errno));
  330 + }
  331 + exit(ret);
  332 + }
  333 +
  334 + // parent.
  335 + if (pid > 0) {
  336 + started = true;
  337 + srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
  338 + return ret;
  339 + }
  340 +
  341 + return ret;
  342 +}
  343 +
  344 +void SrsFFMPEG::stop()
  345 +{
  346 + if (!started) {
  347 + return;
  348 + }
  349 +}
  350 +
  351 +SrsEncoder::SrsEncoder()
  352 +{
  353 + tid = NULL;
  354 + loop = false;
  355 +}
  356 +
  357 +SrsEncoder::~SrsEncoder()
  358 +{
  359 + on_unpublish();
  360 +}
  361 +
  362 +int SrsEncoder::parse_scope_engines()
  363 +{
  364 + int ret = ERROR_SUCCESS;
  365 +
  366 + // parse all transcode engines.
  367 + SrsConfDirective* conf = NULL;
  368 +
  369 + // parse vhost scope engines
  370 + std::string scope = "";
  371 + if ((conf = config->get_transcode(vhost, "")) != NULL) {
  372 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  373 + srs_error("parse vhost scope=%s transcode engines failed. "
  374 + "ret=%d", scope.c_str(), ret);
  375 + return ret;
  376 + }
  377 + }
  378 + // parse app scope engines
  379 + scope = app;
  380 + if ((conf = config->get_transcode(vhost, app)) != NULL) {
  381 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  382 + srs_error("parse app scope=%s transcode engines failed. "
  383 + "ret=%d", scope.c_str(), ret);
  384 + return ret;
  385 + }
  386 + }
  387 + // parse stream scope engines
  388 + scope += "/";
  389 + scope += stream;
  390 + if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
  391 + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
  392 + srs_error("parse stream scope=%s transcode engines failed. "
  393 + "ret=%d", scope.c_str(), ret);
  394 + return ret;
  395 + }
  396 + }
  397 +
  398 + return ret;
  399 +}
  400 +
  401 +int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
  402 +{
  403 + int ret = ERROR_SUCCESS;
  404 +
  405 + vhost = _vhost;
  406 + port = _port;
  407 + app = _app;
  408 + stream = _stream;
  409 +
  410 + ret = parse_scope_engines();
  411 +
  412 + // ignore the loop encoder
  413 + if (ret == ERROR_ENCODER_LOOP) {
  414 + ret = ERROR_SUCCESS;
  415 + }
  416 +
  417 + // return for error or no engine.
  418 + if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
  419 + return ret;
  420 + }
  421 +
  422 + // start thread to run all encoding engines.
  423 + srs_assert(!tid);
  424 + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
  425 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  426 + srs_error("st_thread_create failed. ret=%d", ret);
  427 + return ret;
  428 + }
  429 +
  430 + return ret;
  431 +}
  432 +
  433 +void SrsEncoder::on_unpublish()
  434 +{
  435 + if (tid) {
  436 + loop = false;
  437 + st_thread_interrupt(tid);
  438 + st_thread_join(tid, NULL);
  439 + tid = NULL;
  440 + }
  441 +
  442 + clear_engines();
  443 +}
  444 +
  445 +void SrsEncoder::clear_engines()
  446 +{
  447 + std::vector<SrsFFMPEG*>::iterator it;
  448 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  449 + SrsFFMPEG* ffmpeg = *it;
  450 + srs_freep(ffmpeg);
  451 + }
  452 + ffmpegs.clear();
  453 +}
  454 +
  455 +SrsFFMPEG* SrsEncoder::at(int index)
  456 +{
  457 + return ffmpegs[index];
  458 +}
  459 +
  460 +int SrsEncoder::parse_transcode(SrsConfDirective* conf)
  461 +{
  462 + int ret = ERROR_SUCCESS;
  463 +
  464 + srs_assert(conf);
  465 +
  466 + // enabled
  467 + if (!config->get_transcode_enabled(conf)) {
  468 + srs_trace("ignore the disabled transcode: %s",
  469 + conf->arg0().c_str());
  470 + return ret;
  471 + }
  472 +
  473 + // ffmpeg
  474 + std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
  475 + if (ffmpeg_bin.empty()) {
  476 + srs_trace("ignore the empty ffmpeg transcode: %s",
  477 + conf->arg0().c_str());
  478 + return ret;
  479 + }
  480 +
  481 + // get all engines.
  482 + std::vector<SrsConfDirective*> engines;
  483 + config->get_transcode_engines(conf, engines);
  484 + if (engines.empty()) {
  485 + srs_trace("ignore the empty transcode engine: %s",
  486 + conf->arg0().c_str());
  487 + return ret;
  488 + }
  489 +
  490 + // create engine
  491 + for (int i = 0; i < (int)engines.size(); i++) {
  492 + SrsConfDirective* engine = engines[i];
  493 + if (!config->get_engine_enabled(engine)) {
  494 + srs_trace("ignore the diabled transcode engine: %s %s",
  495 + conf->arg0().c_str(), engine->arg0().c_str());
  496 + continue;
  497 + }
  498 +
  499 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  500 +
  501 + if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
  502 + srs_freep(ffmpeg);
  503 +
  504 + // if got a loop, donot transcode the whole stream.
  505 + if (ret == ERROR_ENCODER_LOOP) {
  506 + clear_engines();
  507 + break;
  508 + }
  509 +
  510 + srs_error("invalid transcode engine: %s %s",
  511 + conf->arg0().c_str(), engine->arg0().c_str());
  512 + return ret;
  513 + }
  514 +
  515 + ffmpegs.push_back(ffmpeg);
  516 + }
  517 +
  518 + return ret;
  519 +}
  520 +
  521 +int SrsEncoder::cycle()
  522 +{
  523 + int ret = ERROR_SUCCESS;
  524 +
  525 + // start all ffmpegs.
  526 + std::vector<SrsFFMPEG*>::iterator it;
  527 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  528 + SrsFFMPEG* ffmpeg = *it;
  529 + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
  530 + srs_error("ffmpeg start failed. ret=%d", ret);
  531 + return ret;
  532 + }
  533 + }
  534 +
  535 + return ret;
  536 +}
  537 +
  538 +void SrsEncoder::encoder_cycle()
  539 +{
  540 + int ret = ERROR_SUCCESS;
  541 +
  542 + log_context->generate_id();
  543 + srs_trace("encoder cycle start");
  544 +
  545 + while (loop) {
  546 + if ((ret = cycle()) != ERROR_SUCCESS) {
  547 + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
  548 + } else {
  549 + srs_info("encoder cycle success, retry");
  550 + }
  551 +
  552 + if (!loop) {
  553 + break;
  554 + }
  555 +
  556 + st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
  557 + }
  558 +
  559 + // kill ffmpeg when finished and it alive
  560 + std::vector<SrsFFMPEG*>::iterator it;
  561 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  562 + SrsFFMPEG* ffmpeg = *it;
  563 + ffmpeg->stop();
  564 + }
  565 +
  566 + srs_trace("encoder cycle finished");
  567 +}
  568 +
  569 +void* SrsEncoder::encoder_thread(void* arg)
  570 +{
  571 + SrsEncoder* obj = (SrsEncoder*)arg;
  572 + srs_assert(obj != NULL);
  573 +
  574 + obj->loop = true;
  575 + obj->encoder_cycle();
  576 +
  577 + return NULL;
  578 +}
  579 +
1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_forward.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <sys/socket.h>  
28 -#include <netinet/in.h>  
29 -#include <arpa/inet.h>  
30 -#include <netdb.h>  
31 -  
32 -#include <srs_core_error.hpp>  
33 -#include <srs_core_rtmp.hpp>  
34 -#include <srs_core_log.hpp>  
35 -#include <srs_core_protocol.hpp>  
36 -#include <srs_core_pithy_print.hpp>  
37 -  
38 -#define SRS_PULSE_TIMEOUT_MS 100  
39 -#define SRS_FORWARDER_SLEEP_MS 2000  
40 -#define SRS_SEND_TIMEOUT_US 3000000L  
41 -#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US  
42 -  
43 -SrsForwarder::SrsForwarder()  
44 -{  
45 - client = NULL;  
46 - stfd = NULL;  
47 - stream_id = 0;  
48 -  
49 - tid = NULL;  
50 - loop = false;  
51 -}  
52 -  
53 -SrsForwarder::~SrsForwarder()  
54 -{  
55 - on_unpublish();  
56 -  
57 - std::vector<SrsSharedPtrMessage*>::iterator it;  
58 - for (it = msgs.begin(); it != msgs.end(); ++it) {  
59 - SrsSharedPtrMessage* msg = *it;  
60 - srs_freep(msg);  
61 - }  
62 - msgs.clear();  
63 -}  
64 -  
65 -int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)  
66 -{  
67 - int ret = ERROR_SUCCESS;  
68 -  
69 - app = _app;  
70 -  
71 - tc_url = "rtmp://";  
72 - tc_url += vhost;  
73 - tc_url += "/";  
74 - tc_url += app;  
75 -  
76 - stream_name = stream;  
77 - server = forward_server;  
78 - port = 1935;  
79 -  
80 - // TODO: dead loop check.  
81 -  
82 - size_t pos = forward_server.find(":");  
83 - if (pos != std::string::npos) {  
84 - port = ::atoi(forward_server.substr(pos + 1).c_str());  
85 - server = forward_server.substr(0, pos);  
86 - }  
87 -  
88 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
89 - return ret;  
90 - }  
91 -  
92 - srs_assert(!tid);  
93 - if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){  
94 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
95 - srs_error("st_thread_create failed. ret=%d", ret);  
96 - return ret;  
97 - }  
98 -  
99 - return ret;  
100 -}  
101 -  
102 -void SrsForwarder::on_unpublish()  
103 -{  
104 - if (tid) {  
105 - loop = false;  
106 - st_thread_interrupt(tid);  
107 - st_thread_join(tid, NULL);  
108 - tid = NULL;  
109 - }  
110 -  
111 - if (stfd) {  
112 - int fd = st_netfd_fileno(stfd);  
113 - st_netfd_close(stfd);  
114 - stfd = NULL;  
115 -  
116 - // st does not close it sometimes,  
117 - // close it manually.  
118 - close(fd);  
119 - }  
120 -  
121 - srs_freep(client);  
122 -}  
123 -  
124 -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)  
125 -{  
126 - int ret = ERROR_SUCCESS;  
127 -  
128 - msgs.push_back(metadata);  
129 -  
130 - return ret;  
131 -}  
132 -  
133 -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)  
134 -{  
135 - int ret = ERROR_SUCCESS;  
136 -  
137 - msgs.push_back(msg);  
138 -  
139 - return ret;  
140 -}  
141 -  
142 -int SrsForwarder::on_video(SrsSharedPtrMessage* msg)  
143 -{  
144 - int ret = ERROR_SUCCESS;  
145 -  
146 - msgs.push_back(msg);  
147 -  
148 - return ret;  
149 -}  
150 -  
151 -int SrsForwarder::open_socket()  
152 -{  
153 - int ret = ERROR_SUCCESS;  
154 -  
155 - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",  
156 - stream_name.c_str(), tc_url.c_str(), server.c_str(), port);  
157 -  
158 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
159 - if(sock == -1){  
160 - ret = ERROR_SOCKET_CREATE;  
161 - srs_error("create socket error. ret=%d", ret);  
162 - return ret;  
163 - }  
164 -  
165 - stfd = st_netfd_open_socket(sock);  
166 - if(stfd == NULL){  
167 - ret = ERROR_ST_OPEN_SOCKET;  
168 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
169 - return ret;  
170 - }  
171 -  
172 - srs_freep(client);  
173 - client = new SrsRtmpClient(stfd);  
174 -  
175 - return ret;  
176 -}  
177 -  
178 -int SrsForwarder::connect_server()  
179 -{  
180 - int ret = ERROR_SUCCESS;  
181 -  
182 - std::string ip = parse_server(server);  
183 - if (ip.empty()) {  
184 - ret = ERROR_SYSTEM_IP_INVALID;  
185 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
186 - return ret;  
187 - }  
188 -  
189 - sockaddr_in addr;  
190 - addr.sin_family = AF_INET;  
191 - addr.sin_port = htons(port);  
192 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
193 -  
194 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){  
195 - ret = ERROR_ST_CONNECT;  
196 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
197 - return ret;  
198 - }  
199 - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);  
200 -  
201 - return ret;  
202 -}  
203 -  
204 -std::string SrsForwarder::parse_server(std::string host)  
205 -{  
206 - if (inet_addr(host.c_str()) != INADDR_NONE) {  
207 - return host;  
208 - }  
209 -  
210 - hostent* answer = gethostbyname(host.c_str());  
211 - if (answer == NULL) {  
212 - srs_error("dns resolve host %s error.", host.c_str());  
213 - return "";  
214 - }  
215 -  
216 - char ipv4[16];  
217 - memset(ipv4, 0, sizeof(ipv4));  
218 - for (int i = 0; i < answer->h_length; i++) {  
219 - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));  
220 - srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);  
221 - break;  
222 - }  
223 -  
224 - return ipv4;  
225 -}  
226 -  
227 -int SrsForwarder::cycle()  
228 -{  
229 - int ret = ERROR_SUCCESS;  
230 -  
231 - client->set_recv_timeout(SRS_RECV_TIMEOUT_US);  
232 - client->set_send_timeout(SRS_SEND_TIMEOUT_US);  
233 -  
234 - if ((ret = connect_server()) != ERROR_SUCCESS) {  
235 - return ret;  
236 - }  
237 - srs_assert(client);  
238 -  
239 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
240 - srs_error("handshake with server failed. ret=%d", ret);  
241 - return ret;  
242 - }  
243 - if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {  
244 - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);  
245 - return ret;  
246 - }  
247 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
248 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);  
249 - return ret;  
250 - }  
251 - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {  
252 - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",  
253 - stream_name.c_str(), stream_id, ret);  
254 - return ret;  
255 - }  
256 -  
257 - if ((ret = forward()) != ERROR_SUCCESS) {  
258 - return ret;  
259 - }  
260 -  
261 - return ret;  
262 -}  
263 -  
264 -int SrsForwarder::forward()  
265 -{  
266 - int ret = ERROR_SUCCESS;  
267 -  
268 - client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);  
269 -  
270 - SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);  
271 -  
272 - while (loop) {  
273 - pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);  
274 -  
275 - // switch to other st-threads.  
276 - st_usleep(0);  
277 -  
278 - // read from client.  
279 - if (true) {  
280 - SrsCommonMessage* msg = NULL;  
281 - ret = client->recv_message(&msg);  
282 -  
283 - srs_verbose("play loop recv message. ret=%d", ret);  
284 - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {  
285 - srs_error("recv server control message failed. ret=%d", ret);  
286 - return ret;  
287 - }  
288 - }  
289 -  
290 - int count = (int)msgs.size();  
291 -  
292 - // reportable  
293 - if (pithy_print.can_print()) {  
294 - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
295 - pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());  
296 - }  
297 -  
298 - // all msgs to forward.  
299 - for (int i = 0; i < count; i++) {  
300 - SrsSharedPtrMessage* msg = msgs[i];  
301 - msgs[i] = NULL;  
302 -  
303 - if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {  
304 - srs_error("forwarder send message to server failed. ret=%d", ret);  
305 - return ret;  
306 - }  
307 - }  
308 - msgs.clear();  
309 - }  
310 -  
311 - return ret;  
312 -}  
313 -  
314 -void SrsForwarder::forward_cycle()  
315 -{  
316 - int ret = ERROR_SUCCESS;  
317 -  
318 - log_context->generate_id();  
319 - srs_trace("forward cycle start");  
320 -  
321 - while (loop) {  
322 - if ((ret = cycle()) != ERROR_SUCCESS) {  
323 - srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);  
324 - } else {  
325 - srs_info("forward cycle success, retry");  
326 - }  
327 -  
328 - if (!loop) {  
329 - break;  
330 - }  
331 -  
332 - st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);  
333 -  
334 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
335 - srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);  
336 - } else {  
337 - srs_info("forward cycle reopen success");  
338 - }  
339 - }  
340 - srs_trace("forward cycle finished");  
341 -}  
342 -  
343 -void* SrsForwarder::forward_thread(void* arg)  
344 -{  
345 - SrsForwarder* obj = (SrsForwarder*)arg;  
346 - srs_assert(obj != NULL);  
347 -  
348 - obj->loop = true;  
349 - obj->forward_cycle();  
350 -  
351 - return NULL;  
352 -}  
353 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_forward.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <sys/socket.h>
  28 +#include <netinet/in.h>
  29 +#include <arpa/inet.h>
  30 +#include <netdb.h>
  31 +
  32 +#include <srs_core_error.hpp>
  33 +#include <srs_core_rtmp.hpp>
  34 +#include <srs_core_log.hpp>
  35 +#include <srs_core_protocol.hpp>
  36 +#include <srs_core_pithy_print.hpp>
  37 +
  38 +#define SRS_PULSE_TIMEOUT_MS 100
  39 +#define SRS_FORWARDER_SLEEP_MS 2000
  40 +#define SRS_SEND_TIMEOUT_US 3000000L
  41 +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
  42 +
  43 +SrsForwarder::SrsForwarder()
  44 +{
  45 + client = NULL;
  46 + stfd = NULL;
  47 + stream_id = 0;
  48 +
  49 + tid = NULL;
  50 + loop = false;
  51 +}
  52 +
  53 +SrsForwarder::~SrsForwarder()
  54 +{
  55 + on_unpublish();
  56 +
  57 + std::vector<SrsSharedPtrMessage*>::iterator it;
  58 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  59 + SrsSharedPtrMessage* msg = *it;
  60 + srs_freep(msg);
  61 + }
  62 + msgs.clear();
  63 +}
  64 +
  65 +int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
  66 +{
  67 + int ret = ERROR_SUCCESS;
  68 +
  69 + app = _app;
  70 +
  71 + tc_url = "rtmp://";
  72 + tc_url += vhost;
  73 + tc_url += "/";
  74 + tc_url += app;
  75 +
  76 + stream_name = stream;
  77 + server = forward_server;
  78 + port = 1935;
  79 +
  80 + // TODO: dead loop check.
  81 +
  82 + size_t pos = forward_server.find(":");
  83 + if (pos != std::string::npos) {
  84 + port = ::atoi(forward_server.substr(pos + 1).c_str());
  85 + server = forward_server.substr(0, pos);
  86 + }
  87 +
  88 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  89 + return ret;
  90 + }
  91 +
  92 + srs_assert(!tid);
  93 + if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
  94 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  95 + srs_error("st_thread_create failed. ret=%d", ret);
  96 + return ret;
  97 + }
  98 +
  99 + return ret;
  100 +}
  101 +
  102 +void SrsForwarder::on_unpublish()
  103 +{
  104 + if (tid) {
  105 + loop = false;
  106 + st_thread_interrupt(tid);
  107 + st_thread_join(tid, NULL);
  108 + tid = NULL;
  109 + }
  110 +
  111 + if (stfd) {
  112 + int fd = st_netfd_fileno(stfd);
  113 + st_netfd_close(stfd);
  114 + stfd = NULL;
  115 +
  116 + // st does not close it sometimes,
  117 + // close it manually.
  118 + close(fd);
  119 + }
  120 +
  121 + srs_freep(client);
  122 +}
  123 +
  124 +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
  125 +{
  126 + int ret = ERROR_SUCCESS;
  127 +
  128 + msgs.push_back(metadata);
  129 +
  130 + return ret;
  131 +}
  132 +
  133 +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
  134 +{
  135 + int ret = ERROR_SUCCESS;
  136 +
  137 + msgs.push_back(msg);
  138 +
  139 + return ret;
  140 +}
  141 +
  142 +int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
  143 +{
  144 + int ret = ERROR_SUCCESS;
  145 +
  146 + msgs.push_back(msg);
  147 +
  148 + return ret;
  149 +}
  150 +
  151 +int SrsForwarder::open_socket()
  152 +{
  153 + int ret = ERROR_SUCCESS;
  154 +
  155 + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
  156 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
  157 +
  158 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  159 + if(sock == -1){
  160 + ret = ERROR_SOCKET_CREATE;
  161 + srs_error("create socket error. ret=%d", ret);
  162 + return ret;
  163 + }
  164 +
  165 + stfd = st_netfd_open_socket(sock);
  166 + if(stfd == NULL){
  167 + ret = ERROR_ST_OPEN_SOCKET;
  168 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  169 + return ret;
  170 + }
  171 +
  172 + srs_freep(client);
  173 + client = new SrsRtmpClient(stfd);
  174 +
  175 + return ret;
  176 +}
  177 +
  178 +int SrsForwarder::connect_server()
  179 +{
  180 + int ret = ERROR_SUCCESS;
  181 +
  182 + std::string ip = parse_server(server);
  183 + if (ip.empty()) {
  184 + ret = ERROR_SYSTEM_IP_INVALID;
  185 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  186 + return ret;
  187 + }
  188 +
  189 + sockaddr_in addr;
  190 + addr.sin_family = AF_INET;
  191 + addr.sin_port = htons(port);
  192 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  193 +
  194 + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
  195 + ret = ERROR_ST_CONNECT;
  196 + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
  197 + return ret;
  198 + }
  199 + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
  200 +
  201 + return ret;
  202 +}
  203 +
  204 +std::string SrsForwarder::parse_server(std::string host)
  205 +{
  206 + if (inet_addr(host.c_str()) != INADDR_NONE) {
  207 + return host;
  208 + }
  209 +
  210 + hostent* answer = gethostbyname(host.c_str());
  211 + if (answer == NULL) {
  212 + srs_error("dns resolve host %s error.", host.c_str());
  213 + return "";
  214 + }
  215 +
  216 + char ipv4[16];
  217 + memset(ipv4, 0, sizeof(ipv4));
  218 + for (int i = 0; i < answer->h_length; i++) {
  219 + inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
  220 + srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
  221 + break;
  222 + }
  223 +
  224 + return ipv4;
  225 +}
  226 +
  227 +int SrsForwarder::cycle()
  228 +{
  229 + int ret = ERROR_SUCCESS;
  230 +
  231 + client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  232 + client->set_send_timeout(SRS_SEND_TIMEOUT_US);
  233 +
  234 + if ((ret = connect_server()) != ERROR_SUCCESS) {
  235 + return ret;
  236 + }
  237 + srs_assert(client);
  238 +
  239 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  240 + srs_error("handshake with server failed. ret=%d", ret);
  241 + return ret;
  242 + }
  243 + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
  244 + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
  245 + return ret;
  246 + }
  247 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  248 + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  249 + return ret;
  250 + }
  251 + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
  252 + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
  253 + stream_name.c_str(), stream_id, ret);
  254 + return ret;
  255 + }
  256 +
  257 + if ((ret = forward()) != ERROR_SUCCESS) {
  258 + return ret;
  259 + }
  260 +
  261 + return ret;
  262 +}
  263 +
  264 +int SrsForwarder::forward()
  265 +{
  266 + int ret = ERROR_SUCCESS;
  267 +
  268 + client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
  269 +
  270 + SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
  271 +
  272 + while (loop) {
  273 + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
  274 +
  275 + // switch to other st-threads.
  276 + st_usleep(0);
  277 +
  278 + // read from client.
  279 + if (true) {
  280 + SrsCommonMessage* msg = NULL;
  281 + ret = client->recv_message(&msg);
  282 +
  283 + srs_verbose("play loop recv message. ret=%d", ret);
  284 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
  285 + srs_error("recv server control message failed. ret=%d", ret);
  286 + return ret;
  287 + }
  288 + }
  289 +
  290 + int count = (int)msgs.size();
  291 +
  292 + // reportable
  293 + if (pithy_print.can_print()) {
  294 + srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  295 + pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
  296 + }
  297 +
  298 + // all msgs to forward.
  299 + for (int i = 0; i < count; i++) {
  300 + SrsSharedPtrMessage* msg = msgs[i];
  301 + msgs[i] = NULL;
  302 +
  303 + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
  304 + srs_error("forwarder send message to server failed. ret=%d", ret);
  305 + return ret;
  306 + }
  307 + }
  308 + msgs.clear();
  309 + }
  310 +
  311 + return ret;
  312 +}
  313 +
  314 +void SrsForwarder::forward_cycle()
  315 +{
  316 + int ret = ERROR_SUCCESS;
  317 +
  318 + log_context->generate_id();
  319 + srs_trace("forward cycle start");
  320 +
  321 + while (loop) {
  322 + if ((ret = cycle()) != ERROR_SUCCESS) {
  323 + srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
  324 + } else {
  325 + srs_info("forward cycle success, retry");
  326 + }
  327 +
  328 + if (!loop) {
  329 + break;
  330 + }
  331 +
  332 + st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
  333 +
  334 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  335 + srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
  336 + } else {
  337 + srs_info("forward cycle reopen success");
  338 + }
  339 + }
  340 + srs_trace("forward cycle finished");
  341 +}
  342 +
  343 +void* SrsForwarder::forward_thread(void* arg)
  344 +{
  345 + SrsForwarder* obj = (SrsForwarder*)arg;
  346 + srs_assert(obj != NULL);
  347 +
  348 + obj->loop = true;
  349 + obj->forward_cycle();
  350 +
  351 + return NULL;
  352 +}
  353 +