winlin

Merge branch '2.0release' into develop

@@ -566,6 +566,7 @@ Supported operating systems and hardware: @@ -566,6 +566,7 @@ Supported operating systems and hardware:
566 566
567 ### SRS 2.0 history 567 ### SRS 2.0 history
568 568
  569 +* v2.0, 2015-04-20, support ingest hls live stream to RTMP.
569 * v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161. 570 * v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161.
570 * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160. 571 * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160.
571 * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts. 572 * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts.
@@ -57,8 +57,6 @@ echo -e " | ${SrsGprofSummaryColor}rm -f gmon.out; ./objs/srs -c conf/co @@ -57,8 +57,6 @@ echo -e " | ${SrsGprofSummaryColor}rm -f gmon.out; ./objs/srs -c conf/co
57 echo -e " | ${SrsGprofSummaryColor}killall -2 srs # or CTRL+C to stop gprof\${BLACK}" 57 echo -e " | ${SrsGprofSummaryColor}killall -2 srs # or CTRL+C to stop gprof\${BLACK}"
58 echo -e " | ${SrsGprofSummaryColor}gprof -b ./objs/srs gmon.out > gprof.srs.log && rm -f gmon.out # gprof report to gprof.srs.log\${BLACK}" 58 echo -e " | ${SrsGprofSummaryColor}gprof -b ./objs/srs gmon.out > gprof.srs.log && rm -f gmon.out # gprof report to gprof.srs.log\${BLACK}"
59 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 59 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
60 -echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}"  
61 -echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"  
62 echo -e " |${SrsUtestSummaryColor}utest: ./objs/srs_utest, the utest for srs\${BLACK}" 60 echo -e " |${SrsUtestSummaryColor}utest: ./objs/srs_utest, the utest for srs\${BLACK}"
63 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 61 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
64 echo -e " |${SrsLibrtmpSummaryColor}librtmp @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_SrsLibrtmp\${BLACK}" 62 echo -e " |${SrsLibrtmpSummaryColor}librtmp @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_SrsLibrtmp\${BLACK}"
@@ -71,6 +69,12 @@ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/ @@ -71,6 +69,12 @@ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/
71 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_detect_rtmp\${BLACK}" 69 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_detect_rtmp\${BLACK}"
72 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_bandwidth_check\${BLACK}" 70 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_bandwidth_check\${BLACK}"
73 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 71 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
  72 +echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}"
  73 +echo -e " | ${SrsResearchSummaryColor} @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}"
  74 +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
  75 +echo -e " |\${GREEN}tools: important tool, others @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}"
  76 +echo -e " | \${GREEN}./objs/srs_ingest_hls -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/livestream\${BLACK}"
  77 +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
74 echo -e " |\${GREEN}server: ./objs/srs -c conf/srs.conf, start the srs server\${BLACK}" 78 echo -e " |\${GREEN}server: ./objs/srs -c conf/srs.conf, start the srs server\${BLACK}"
75 echo -e " | ${SrsHlsSummaryColor}hls @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS\${BLACK}" 79 echo -e " | ${SrsHlsSummaryColor}hls @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS\${BLACK}"
76 echo -e " | ${SrsHlsSummaryColor}hls: generate m3u8 and ts from rtmp stream\${BLACK}" 80 echo -e " | ${SrsHlsSummaryColor}hls: generate m3u8 and ts from rtmp stream\${BLACK}"
@@ -100,7 +100,7 @@ AR = ar @@ -100,7 +100,7 @@ AR = ar
100 LINK = g++ 100 LINK = g++
101 CXXFLAGS = ${CXXFLAGS} 101 CXXFLAGS = ${CXXFLAGS}
102 102
103 -.PHONY: default srs librtmp 103 +.PHONY: default srs srs_ingest_hls librtmp
104 104
105 default: 105 default:
106 106
@@ -200,7 +200,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -200,7 +200,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
200 MODULE_ID="MAIN" 200 MODULE_ID="MAIN"
201 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP" "APP") 201 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP" "APP")
202 ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibHttpParserRoot}) 202 ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibHttpParserRoot})
203 - MODULE_FILES=("srs_main_server") 203 + MODULE_FILES=("srs_main_server" "srs_main_ingest_hls")
204 # add each modules for main 204 # add each modules for main
205 for SRS_MODULE in ${SRS_MODULES[*]}; do 205 for SRS_MODULE in ${SRS_MODULES[*]}; do
206 . $SRS_MODULE/config 206 . $SRS_MODULE/config
@@ -217,7 +217,7 @@ fi @@ -217,7 +217,7 @@ fi
217 # disable all app when export librtmp 217 # disable all app when export librtmp
218 if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then 218 if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
219 # all main entrances 219 # all main entrances
220 - MAIN_ENTRANCES=("srs_main_server") 220 + MAIN_ENTRANCES=("srs_main_server" "srs_main_ingest_hls")
221 # add each modules for main 221 # add each modules for main
222 for SRS_MODULE in ${SRS_MODULES[*]}; do 222 for SRS_MODULE in ${SRS_MODULES[*]}; do
223 . $SRS_MODULE/config 223 . $SRS_MODULE/config
@@ -232,6 +232,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -232,6 +232,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
232 # 232 #
233 # srs: srs(simple rtmp server) over st(state-threads) 233 # srs: srs(simple rtmp server) over st(state-threads)
234 BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh 234 BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh
  235 + #
  236 + # srs_ingest_hls: to ingest hls stream to srs.
  237 + BUILD_KEY="srs_ingest_hls" APP_MAIN="srs_main_ingest_hls" APP_NAME="srs_ingest_hls" . auto/apps.sh
235 # add each modules for application 238 # add each modules for application
236 for SRS_MODULE in ${SRS_MODULES[*]}; do 239 for SRS_MODULE in ${SRS_MODULES[*]}; do
237 . $SRS_MODULE/config 240 . $SRS_MODULE/config
@@ -272,7 +275,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk @@ -272,7 +275,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk
272 275
273 # generate phony header 276 # generate phony header
274 cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE} 277 cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE}
275 -.PHONY: default _default install install-api help clean server librtmp utest _prepare_dir $__mphonys 278 +.PHONY: default _default install install-api help clean server srs_ingest_hls librtmp utest _prepare_dir $__mphonys
276 279
277 # install prefix. 280 # install prefix.
278 SRS_PREFIX=${SRS_PREFIX} 281 SRS_PREFIX=${SRS_PREFIX}
@@ -300,14 +303,15 @@ fi @@ -300,14 +303,15 @@ fi
300 # the server, librtmp and utest 303 # the server, librtmp and utest
301 # where the bellow will check and disable some entry by only echo. 304 # where the bellow will check and disable some entry by only echo.
302 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} 305 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE}
303 -_default: server librtmp utest $__mdefaults 306 +_default: server srs_ingest_hls librtmp utest $__mdefaults
304 @bash objs/_srs_build_summary.sh 307 @bash objs/_srs_build_summary.sh
305 308
306 help: 309 help:
307 - @echo "Usage: make <help>|<clean>|<server>|<librtmp>|<utest>|<install>|<install-api>|<uninstall>" 310 + @echo "Usage: make <help>|<clean>|<server>|<srs_ingest_hls>|<librtmp>|<utest>|<install>|<install-api>|<uninstall>"
308 @echo " help display this help menu" 311 @echo " help display this help menu"
309 @echo " clean cleanup project" 312 @echo " clean cleanup project"
310 @echo " server build the srs(simple rtmp server) over st(state-threads)" 313 @echo " server build the srs(simple rtmp server) over st(state-threads)"
  314 + @echo " srs_ingest_hls build the hls ingest tool of srs."
311 @echo " librtmp build the client publish/play library, and samples" 315 @echo " librtmp build the client publish/play library, and samples"
312 @echo " utest build the utest for srs" 316 @echo " utest build the utest for srs"
313 @echo " install install srs to the prefix path" 317 @echo " install install srs to the prefix path"
@@ -332,6 +336,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT != NO ]; then @@ -332,6 +336,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT != NO ]; then
332 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} 336 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE}
333 server: _prepare_dir 337 server: _prepare_dir
334 @echo "donot build the srs(simple rtmp server) for srs-librtmp" 338 @echo "donot build the srs(simple rtmp server) for srs-librtmp"
  339 +srs_ingest_hls: _prepare_dir
  340 + @echo "donot build the srs_ingest_hls for srs-librtmp"
335 341
336 END 342 END
337 else 343 else
@@ -339,6 +345,9 @@ else @@ -339,6 +345,9 @@ else
339 server: _prepare_dir 345 server: _prepare_dir
340 @echo "build the srs(simple rtmp server) over st(state-threads)" 346 @echo "build the srs(simple rtmp server) over st(state-threads)"
341 \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs 347 \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs
  348 +srs_ingest_hls: _prepare_dir
  349 + @echo "build the srs_ingest_hls for srs"
  350 + \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs_ingest_hls
342 351
343 END 352 END
344 fi 353 fi
1 file 1 file
2 main readonly separator, 2 main readonly separator,
3 ../../src/main/srs_main_server.cpp, 3 ../../src/main/srs_main_server.cpp,
  4 + ../../src/main/srs_main_ingest_hls.cpp,
4 auto readonly separator, 5 auto readonly separator,
5 ../../objs/srs_auto_headers.hpp, 6 ../../objs/srs_auto_headers.hpp,
6 libs readonly separator, 7 libs readonly separator,
@@ -105,6 +105,7 @@ @@ -105,6 +105,7 @@
105 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; }; 105 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; };
106 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; }; 106 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; };
107 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; }; 107 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; };
  108 + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */; };
108 /* End PBXBuildFile section */ 109 /* End PBXBuildFile section */
109 110
110 /* Begin PBXCopyFilesBuildPhase section */ 111 /* Begin PBXCopyFilesBuildPhase section */
@@ -361,6 +362,7 @@ @@ -361,6 +362,7 @@
361 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = "<group>"; }; 362 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = "<group>"; };
362 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = "<group>"; }; 363 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = "<group>"; };
363 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = "<group>"; }; 364 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = "<group>"; };
  365 + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_main_ingest_hls.cpp; path = ../../../src/main/srs_main_ingest_hls.cpp; sourceTree = "<group>"; };
364 /* End PBXFileReference section */ 366 /* End PBXFileReference section */
365 367
366 /* Begin PBXFrameworksBuildPhase section */ 368 /* Begin PBXFrameworksBuildPhase section */
@@ -442,6 +444,7 @@ @@ -442,6 +444,7 @@
442 3C1232041AAE80CB00CE8F6C /* main */ = { 444 3C1232041AAE80CB00CE8F6C /* main */ = {
443 isa = PBXGroup; 445 isa = PBXGroup;
444 children = ( 446 children = (
  447 + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */,
445 3C1232051AAE812C00CE8F6C /* srs_main_server.cpp */, 448 3C1232051AAE812C00CE8F6C /* srs_main_server.cpp */,
446 ); 449 );
447 name = main; 450 name = main;
@@ -904,6 +907,7 @@ @@ -904,6 +907,7 @@
904 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */, 907 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */,
905 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */, 908 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */,
906 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */, 909 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */,
  910 + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */,
907 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */, 911 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */,
908 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */, 912 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */,
909 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */, 913 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */,
@@ -257,7 +257,7 @@ void re_update(int64_t re, int32_t starttime, u_int32_t time) @@ -257,7 +257,7 @@ void re_update(int64_t re, int32_t starttime, u_int32_t time)
257 int64_t now = srs_utils_time_ms(); 257 int64_t now = srs_utils_time_ms();
258 int64_t diff = time - starttime - (now -re); 258 int64_t diff = time - starttime - (now -re);
259 if (diff > RE_PULSE_MS) { 259 if (diff > RE_PULSE_MS) {
260 - usleep(diff * 1000); 260 + usleep((useconds_t)(diff * 1000));
261 } 261 }
262 } 262 }
263 void re_cleanup(int64_t re, int32_t starttime, u_int32_t time) 263 void re_cleanup(int64_t re, int32_t starttime, u_int32_t time)
@@ -269,6 +269,6 @@ void re_cleanup(int64_t re, int32_t starttime, u_int32_t time) @@ -269,6 +269,6 @@ void re_cleanup(int64_t re, int32_t starttime, u_int32_t time)
269 if (diff > 0) { 269 if (diff > 0) {
270 srs_human_trace("re_cleanup, diff=%d, start=%d, last=%d ms", 270 srs_human_trace("re_cleanup, diff=%d, start=%d, last=%d ms",
271 (int)diff, starttime, time); 271 (int)diff, starttime, time);
272 - usleep(diff * 1000); 272 + usleep((useconds_t)(diff * 1000));
273 } 273 }
274 } 274 }
@@ -330,7 +330,8 @@ int SrsFFMPEG::start() @@ -330,7 +330,8 @@ int SrsFFMPEG::start()
330 } 330 }
331 331
332 // the codec params is disabled when copy 332 // the codec params is disabled when copy
333 - if (acodec != SRS_RTMP_ENCODER_COPY && acodec != SRS_RTMP_ENCODER_NO_AUDIO) { 333 + if (acodec != SRS_RTMP_ENCODER_NO_AUDIO) {
  334 + if (acodec != SRS_RTMP_ENCODER_COPY) {
334 params.push_back("-b:a"); 335 params.push_back("-b:a");
335 snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000); 336 snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
336 params.push_back(tmp); 337 params.push_back(tmp);
@@ -344,7 +345,6 @@ int SrsFFMPEG::start() @@ -344,7 +345,6 @@ int SrsFFMPEG::start()
344 params.push_back(tmp); 345 params.push_back(tmp);
345 346
346 // aparams 347 // aparams
347 - if (!aparams.empty()) {  
348 std::vector<std::string>::iterator it; 348 std::vector<std::string>::iterator it;
349 for (it = aparams.begin(); it != aparams.end(); ++it) { 349 for (it = aparams.begin(); it != aparams.end(); ++it) {
350 std::string p = *it; 350 std::string p = *it;
@@ -352,6 +352,20 @@ int SrsFFMPEG::start() @@ -352,6 +352,20 @@ int SrsFFMPEG::start()
352 params.push_back(p); 352 params.push_back(p);
353 } 353 }
354 } 354 }
  355 + } else {
  356 + // for audio copy.
  357 + for (int i = 0; i < (int)aparams.size();) {
  358 + std::string pn = aparams[i++];
  359 +
  360 + // aparams, the adts to asc filter "-bsf:a aac_adtstoasc"
  361 + if (pn == "-bsf:a" && i < (int)aparams.size()) {
  362 + std::string pv = aparams[i++];
  363 + if (pv == "aac_adtstoasc") {
  364 + params.push_back(pn);
  365 + params.push_back(pv);
  366 + }
  367 + }
  368 + }
355 } 369 }
356 } 370 }
357 371
@@ -1401,7 +1401,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) @@ -1401,7 +1401,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
1401 header = http_parser(); 1401 header = http_parser();
1402 url = ""; 1402 url = "";
1403 headers.clear(); 1403 headers.clear();
1404 - body_parsed = 0; 1404 + header_parsed = 0;
1405 1405
1406 // do parse 1406 // do parse
1407 if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) { 1407 if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) {
@@ -1437,12 +1437,12 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) @@ -1437,12 +1437,12 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
1437 // when buffer not empty, parse it. 1437 // when buffer not empty, parse it.
1438 if (buffer->size() > 0) { 1438 if (buffer->size() > 0) {
1439 nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), buffer->size()); 1439 nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), buffer->size());
1440 - srs_info("buffer=%d, nparsed=%d, body=%d", buffer->size(), (int)nparsed, body_parsed); 1440 + srs_info("buffer=%d, nparsed=%d, header=%d", buffer->size(), (int)nparsed, header_parsed);
1441 } 1441 }
1442 1442
1443 // consume the parsed bytes. 1443 // consume the parsed bytes.
1444 - if (nparsed && nparsed - body_parsed > 0) {  
1445 - buffer->read_slice((int)nparsed - (int)body_parsed); 1444 + if (nparsed && header_parsed) {
  1445 + buffer->read_slice(header_parsed);
1446 } 1446 }
1447 1447
1448 // ok atleast header completed, 1448 // ok atleast header completed,
@@ -1491,6 +1491,7 @@ int SrsHttpParser::on_headers_complete(http_parser* parser) @@ -1491,6 +1491,7 @@ int SrsHttpParser::on_headers_complete(http_parser* parser)
1491 obj->header = *parser; 1491 obj->header = *parser;
1492 // save the parser when header parse completed. 1492 // save the parser when header parse completed.
1493 obj->state = SrsHttpParseStateHeaderComplete; 1493 obj->state = SrsHttpParseStateHeaderComplete;
  1494 + obj->header_parsed = (int)parser->nread;
1494 1495
1495 srs_info("***HEADERS COMPLETE***"); 1496 srs_info("***HEADERS COMPLETE***");
1496 1497
@@ -1567,8 +1568,6 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length) @@ -1567,8 +1568,6 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length)
1567 SrsHttpParser* obj = (SrsHttpParser*)parser->data; 1568 SrsHttpParser* obj = (SrsHttpParser*)parser->data;
1568 srs_assert(obj); 1569 srs_assert(obj);
1569 1570
1570 - obj->body_parsed += length;  
1571 -  
1572 srs_info("Body: %.*s", (int)length, at); 1571 srs_info("Body: %.*s", (int)length, at);
1573 1572
1574 return 0; 1573 return 0;
@@ -599,7 +599,7 @@ private: @@ -599,7 +599,7 @@ private:
599 http_parser header; 599 http_parser header;
600 std::string url; 600 std::string url;
601 std::vector<SrsHttpHeaderField> headers; 601 std::vector<SrsHttpHeaderField> headers;
602 - int body_parsed; 602 + int header_parsed;
603 public: 603 public:
604 SrsHttpParser(); 604 SrsHttpParser();
605 virtual ~SrsHttpParser(); 605 virtual ~SrsHttpParser();
@@ -274,7 +274,7 @@ bool SrsFastLog::generate_header(bool error, const char* tag, int context_id, co @@ -274,7 +274,7 @@ bool SrsFastLog::generate_header(bool error, const char* tag, int context_id, co
274 274
275 // to calendar time 275 // to calendar time
276 struct tm* tm; 276 struct tm* tm;
277 - if (_srs_config->get_utc_time()) { 277 + if (_srs_config && _srs_config->get_utc_time()) {
278 if ((tm = gmtime(&tv.tv_sec)) == NULL) { 278 if ((tm = gmtime(&tv.tv_sec)) == NULL) {
279 return false; 279 return false;
280 } 280 }
@@ -169,6 +169,23 @@ int SrsTsMessage::stream_number() @@ -169,6 +169,23 @@ int SrsTsMessage::stream_number()
169 return -1; 169 return -1;
170 } 170 }
171 171
  172 +SrsTsMessage* SrsTsMessage::detach()
  173 +{
  174 + // @remark the packet cannot be used, but channel is ok.
  175 + SrsTsMessage* cp = new SrsTsMessage(channel, NULL);
  176 + cp->start_pts = start_pts;
  177 + cp->write_pcr = write_pcr;
  178 + cp->is_discontinuity = is_discontinuity;
  179 + cp->dts = dts;
  180 + cp->pts = pts;
  181 + cp->sid = sid;
  182 + cp->PES_packet_length = PES_packet_length;
  183 + cp->continuity_counter = continuity_counter;
  184 + cp->payload = payload;
  185 + payload = NULL;
  186 + return cp;
  187 +}
  188 +
172 ISrsTsHandler::ISrsTsHandler() 189 ISrsTsHandler::ISrsTsHandler()
173 { 190 {
174 } 191 }
@@ -309,6 +309,13 @@ public: @@ -309,6 +309,13 @@ public:
309 * @return the stream number for audio/video; otherwise, -1. 309 * @return the stream number for audio/video; otherwise, -1.
310 */ 310 */
311 virtual int stream_number(); 311 virtual int stream_number();
  312 +public:
  313 + /**
  314 + * detach the ts message,
  315 + * for user maybe need to parse the message by queue.
  316 + * @remark we always use the payload of original message.
  317 + */
  318 + virtual SrsTsMessage* detach();
312 }; 319 };
313 320
314 /** 321 /**
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2015 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <string>
  28 +#include <vector>
  29 +#include <map>
  30 +using namespace std;
  31 +
  32 +#include <srs_kernel_error.hpp>
  33 +#include <srs_app_server.hpp>
  34 +#include <srs_app_config.hpp>
  35 +#include <srs_app_log.hpp>
  36 +#include <srs_kernel_utility.hpp>
  37 +#include <srs_rtmp_sdk.hpp>
  38 +#include <srs_kernel_buffer.hpp>
  39 +#include <srs_kernel_stream.hpp>
  40 +#include <srs_kernel_ts.hpp>
  41 +#include <srs_app_http_client.hpp>
  42 +#include <srs_app_http.hpp>
  43 +#include <srs_core_autofree.hpp>
  44 +#include <srs_app_st.hpp>
  45 +#include <srs_rtmp_utility.hpp>
  46 +#include <srs_app_st_socket.hpp>
  47 +#include <srs_app_utility.hpp>
  48 +#include <srs_rtmp_amf0.hpp>
  49 +#include <srs_raw_avc.hpp>
  50 +
  51 +// the retry timeout in ms.
  52 +#define SRS_INGEST_HLS_ERROR_RETRY_MS 3000
  53 +
  54 +// pre-declare
  55 +int proxy_hls2rtmp(std::string hls, std::string rtmp);
  56 +
  57 +// for the main objects(server, config, log, context),
  58 +// never subscribe handler in constructor,
  59 +// instead, subscribe handler in initialize method.
  60 +// kernel module.
  61 +ISrsLog* _srs_log = new SrsFastLog();
  62 +ISrsThreadContext* _srs_context = new ISrsThreadContext();
  63 +// app module.
  64 +SrsConfig* _srs_config = NULL;
  65 +SrsServer* _srs_server = NULL;
  66 +
  67 +/**
  68 +* main entrance.
  69 +*/
  70 +int main(int argc, char** argv)
  71 +{
  72 + // TODO: support both little and big endian.
  73 + srs_assert(srs_is_little_endian());
  74 +
  75 + // directly failed when compile limited.
  76 +#if !defined(SRS_AUTO_HTTP_PARSER)
  77 + srs_error("depends on http-parser.");
  78 + exit(-1);
  79 +#endif
  80 +
  81 +#if defined(SRS_AUTO_GPERF_MP) || defined(SRS_AUTO_GPERF_MP) \
  82 +|| defined(SRS_AUTO_GPERF_MC) || defined(SRS_AUTO_GPERF_MP)
  83 + srs_error("donot support gmc/gmp/gcp/gprof");
  84 + exit(-1);
  85 +#endif
  86 +
  87 + srs_trace("srs_ingest_hls base on %s, to ingest hls live to srs", RTMP_SIG_SRS_SERVER);
  88 +
  89 + // parse user options.
  90 + std::string in_hls_url, out_rtmp_url;
  91 + for (int opt = 0; opt < argc; opt++) {
  92 + srs_trace("argv[%d]=%s", opt, argv[opt]);
  93 + }
  94 +
  95 + // fill the options for mac
  96 + for (int opt = 0; opt < argc - 1; opt++) {
  97 + // ignore all options except -i and -y.
  98 + char* p = argv[opt];
  99 +
  100 + // only accept -x
  101 + if (p[0] != '-' || p[1] == 0 || p[2] != 0) {
  102 + continue;
  103 + }
  104 +
  105 + // parse according the option name.
  106 + switch (p[1]) {
  107 + case 'i': in_hls_url = argv[opt + 1]; break;
  108 + case 'y': out_rtmp_url = argv[opt + 1]; break;
  109 + default: break;
  110 + }
  111 + }
  112 +
  113 + if (in_hls_url.empty() || out_rtmp_url.empty()) {
  114 + printf("ingest hls live stream and publish to RTMP server\n"
  115 + "Usage: %s <-i in_hls_url> <-y out_rtmp_url>\n"
  116 + " in_hls_url input hls url, ingest from this m3u8.\n"
  117 + " out_rtmp_url output rtmp url, publish to this url.\n"
  118 + "For example:\n"
  119 + " %s -i http://127.0.0.1:8080/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n"
  120 + " %s -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n",
  121 + argv[0], argv[0], argv[0]);
  122 + exit(-1);
  123 + }
  124 +
  125 + srs_trace("input: %s", in_hls_url.c_str());
  126 + srs_trace("output: %s", out_rtmp_url.c_str());
  127 +
  128 + return proxy_hls2rtmp(in_hls_url, out_rtmp_url);
  129 +}
  130 +
  131 +// the context to ingest hls stream.
  132 +class SrsIngestSrsInput
  133 +{
  134 +private:
  135 + struct SrsTsPiece {
  136 + double duration;
  137 + std::string url;
  138 + std::string body;
  139 +
  140 + // should skip this ts?
  141 + bool skip;
  142 + // already sent to rtmp server?
  143 + bool sent;
  144 + // whether ts piece is dirty, remove if not update.
  145 + bool dirty;
  146 +
  147 + SrsTsPiece() {
  148 + skip = false;
  149 + sent = false;
  150 + dirty = false;
  151 + }
  152 +
  153 + int fetch(std::string m3u8);
  154 + };
  155 +private:
  156 + SrsHttpUri* in_hls;
  157 + std::vector<SrsTsPiece*> pieces;
  158 + int64_t next_connect_time;
  159 +private:
  160 + SrsStream* stream;
  161 + SrsTsContext* context;
  162 +public:
  163 + SrsIngestSrsInput(SrsHttpUri* hls) {
  164 + in_hls = hls;
  165 + next_connect_time = 0;
  166 +
  167 + stream = new SrsStream();
  168 + context = new SrsTsContext();
  169 + }
  170 + virtual ~SrsIngestSrsInput() {
  171 + srs_freep(stream);
  172 + srs_freep(context);
  173 +
  174 + std::vector<SrsTsPiece*>::iterator it;
  175 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  176 + SrsTsPiece* tp = *it;
  177 + srs_freep(tp);
  178 + }
  179 + pieces.clear();
  180 + }
  181 + /**
  182 + * parse the input hls live m3u8 index.
  183 + */
  184 + virtual int connect();
  185 + /**
  186 + * parse the ts and use hanler to process the message.
  187 + */
  188 + virtual int parse(ISrsTsHandler* handler);
  189 +private:
  190 + /**
  191 + * find the ts piece by its url.
  192 + */
  193 + virtual SrsTsPiece* find_ts(string url);
  194 + /**
  195 + * set all ts to dirty.
  196 + */
  197 + virtual void dirty_all_ts();
  198 + /**
  199 + * fetch all ts body.
  200 + */
  201 + virtual void fetch_all_ts(bool fresh_m3u8);
  202 + /**
  203 + * remove all ts which is dirty.
  204 + */
  205 + virtual void remove_dirty();
  206 +};
  207 +
  208 +int SrsIngestSrsInput::connect()
  209 +{
  210 + int ret = ERROR_SUCCESS;
  211 +
  212 + int64_t now = srs_update_system_time_ms();
  213 + if (now < next_connect_time) {
  214 + srs_trace("input hls wait for %dms", next_connect_time - now);
  215 + st_usleep((next_connect_time - now) * 1000);
  216 + }
  217 +
  218 + SrsHttpClient client;
  219 + srs_trace("parse input hls %s", in_hls->get_url());
  220 +
  221 + if ((ret = client.initialize(in_hls->get_host(), in_hls->get_port())) != ERROR_SUCCESS) {
  222 + srs_error("connect to server failed. ret=%d", ret);
  223 + return ret;
  224 + }
  225 +
  226 + SrsHttpMessage* msg = NULL;
  227 + if ((ret = client.get(in_hls->get_path(), "", &msg)) != ERROR_SUCCESS) {
  228 + srs_error("HTTP GET %s failed. ret=%d", in_hls->get_url(), ret);
  229 + return ret;
  230 + }
  231 +
  232 + srs_assert(msg);
  233 + SrsAutoFree(SrsHttpMessage, msg);
  234 +
  235 + std::string body;
  236 + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
  237 + srs_error("read m3u8 failed. ret=%d", ret);
  238 + return ret;
  239 + }
  240 +
  241 + if (body.empty()) {
  242 + srs_warn("ignore empty m3u8");
  243 + return ret;
  244 + }
  245 +
  246 + // set all ts to dirty.
  247 + dirty_all_ts();
  248 +
  249 + std::string ptl;
  250 + double td = 0.0;
  251 + double duration = 0.0;
  252 + bool fresh_m3u8 = pieces.empty();
  253 + while (!body.empty()) {
  254 + size_t pos = string::npos;
  255 +
  256 + std::string line;
  257 + if ((pos = body.find("\n")) != string::npos) {
  258 + line = body.substr(0, pos);
  259 + body = body.substr(pos + 1);
  260 + } else {
  261 + line = body;
  262 + body = "";
  263 + }
  264 +
  265 + line = srs_string_replace(line, "\r", "");
  266 + line = srs_string_replace(line, " ", "");
  267 +
  268 + // #EXT-X-VERSION:3
  269 + // the version must be 3.0
  270 + if (srs_string_starts_with(line, "#EXT-X-VERSION:")) {
  271 + if (!srs_string_ends_with(line, ":3")) {
  272 + srs_warn("m3u8 3.0 required, actual is %s", line.c_str());
  273 + }
  274 + continue;
  275 + }
  276 +
  277 + // #EXT-X-PLAYLIST-TYPE:VOD
  278 + // the playlist type, vod or nothing.
  279 + if (srs_string_starts_with(line, "#EXT-X-PLAYLIST-TYPE:")) {
  280 + ptl = line;
  281 + continue;
  282 + }
  283 +
  284 + // #EXT-X-TARGETDURATION:12
  285 + // the target duration is required.
  286 + if (srs_string_starts_with(line, "#EXT-X-TARGETDURATION:")) {
  287 + td = ::atof(line.substr(string("#EXT-X-TARGETDURATION:").length()).c_str());
  288 + }
  289 +
  290 + // #EXT-X-ENDLIST
  291 + // parse completed.
  292 + if (line == "#EXT-X-ENDLIST") {
  293 + break;
  294 + }
  295 +
  296 + // #EXTINF:11.401,
  297 + // livestream-5.ts
  298 + // parse each ts entry, expect current line is inf.
  299 + if (!srs_string_starts_with(line, "#EXTINF:")) {
  300 + continue;
  301 + }
  302 +
  303 + // expect next line is url.
  304 + std::string ts_url;
  305 + if ((pos = body.find("\n")) != string::npos) {
  306 + ts_url = body.substr(0, pos);
  307 + body = body.substr(pos + 1);
  308 + } else {
  309 + srs_warn("ts entry unexpected eof, inf=%s", line.c_str());
  310 + break;
  311 + }
  312 +
  313 + // parse the ts duration.
  314 + line = line.substr(string("#EXTINF:").length());
  315 + if ((pos = line.find(",")) != string::npos) {
  316 + line = line.substr(0, pos);
  317 + }
  318 +
  319 + double ts_duration = ::atof(line.c_str());
  320 + duration += ts_duration;
  321 +
  322 + SrsTsPiece* tp = find_ts(ts_url);
  323 + if (!tp) {
  324 + tp = new SrsTsPiece();
  325 + tp->url = ts_url;
  326 + tp->duration = ts_duration;
  327 + pieces.push_back(tp);
  328 + } else {
  329 + tp->dirty = false;
  330 + }
  331 + }
  332 +
  333 + // fetch all ts.
  334 + fetch_all_ts(fresh_m3u8);
  335 +
  336 + // remove all dirty ts.
  337 + remove_dirty();
  338 +
  339 + srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size());
  340 +
  341 + return ret;
  342 +}
  343 +
  344 +int SrsIngestSrsInput::parse(ISrsTsHandler* handler)
  345 +{
  346 + int ret = ERROR_SUCCESS;
  347 +
  348 + for (int i = 0; i < (int)pieces.size(); i++) {
  349 + SrsTsPiece* tp = pieces.at(i);
  350 +
  351 + // sent only once.
  352 + if (tp->sent) {
  353 + continue;
  354 + }
  355 + tp->sent = true;
  356 +
  357 + if (tp->body.empty()) {
  358 + continue;
  359 + }
  360 +
  361 + srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration);
  362 +
  363 + // use stream to parse ts packet.
  364 + int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE;
  365 + for (int i = 0; i < nb_packet; i++) {
  366 + char* p = (char*)tp->body.data() + (i * SRS_TS_PACKET_SIZE);
  367 + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
  368 + return ret;
  369 + }
  370 +
  371 + // process each ts packet
  372 + if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {
  373 + // when peer closed, must interrupt parse and reconnect.
  374 + if (srs_is_client_gracefully_close(ret)) {
  375 + srs_warn("interrupt parse for peer closed. ret=%d", ret);
  376 + return ret;
  377 + }
  378 +
  379 + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);
  380 + continue;
  381 + }
  382 + srs_info("mpegts: parse ts packet completed");
  383 + }
  384 + srs_info("mpegts: parse udp packet completed");
  385 + }
  386 +
  387 + return ret;
  388 +}
  389 +
  390 +SrsIngestSrsInput::SrsTsPiece* SrsIngestSrsInput::find_ts(string url)
  391 +{
  392 + std::vector<SrsTsPiece*>::iterator it;
  393 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  394 + SrsTsPiece* tp = *it;
  395 + if (tp->url == url) {
  396 + return tp;
  397 + }
  398 + }
  399 + return NULL;
  400 +}
  401 +
  402 +void SrsIngestSrsInput::dirty_all_ts()
  403 +{
  404 + std::vector<SrsTsPiece*>::iterator it;
  405 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  406 + SrsTsPiece* tp = *it;
  407 + tp->dirty = true;
  408 + }
  409 +}
  410 +
  411 +void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8)
  412 +{
  413 + int ret = ERROR_SUCCESS;
  414 +
  415 + for (int i = 0; i < (int)pieces.size(); i++) {
  416 + SrsTsPiece* tp = pieces.at(i);
  417 +
  418 + // when skipped, ignore.
  419 + if (tp->skip) {
  420 + continue;
  421 + }
  422 +
  423 + // for the fresh m3u8, skip except the last one.
  424 + if (fresh_m3u8 && i != (int)pieces.size() - 1) {
  425 + tp->skip = true;
  426 + continue;
  427 + }
  428 +
  429 + if ((ret = tp->fetch(in_hls->get_url())) != ERROR_SUCCESS) {
  430 + srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret);
  431 + tp->skip = true;
  432 + continue;
  433 + }
  434 +
  435 + // only wait for a duration of last piece.
  436 + if (i == pieces.size() - 1) {
  437 + next_connect_time = srs_update_system_time_ms() + (int)tp->duration * 1000;
  438 + }
  439 + }
  440 +}
  441 +
  442 +
  443 +void SrsIngestSrsInput::remove_dirty()
  444 +{
  445 + std::vector<SrsTsPiece*>::iterator it;
  446 + for (it = pieces.begin(); it != pieces.end();) {
  447 + SrsTsPiece* tp = *it;
  448 +
  449 + if (tp->dirty) {
  450 + srs_trace("erase dirty ts, url=%s, duration=%.2f", tp->url.c_str(), tp->duration);
  451 + srs_freep(tp);
  452 + it = pieces.erase(it);
  453 + } else {
  454 + ++it;
  455 + }
  456 + }
  457 +}
  458 +
  459 +int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8)
  460 +{
  461 + int ret = ERROR_SUCCESS;
  462 +
  463 + if (skip || sent || !body.empty()) {
  464 + return ret;
  465 + }
  466 +
  467 + size_t pos = string::npos;
  468 +
  469 + SrsHttpClient client;
  470 +
  471 + std::string ts_url = url;
  472 + if (!srs_string_starts_with(ts_url, "http://")) {
  473 + std::string baseurl = m3u8;
  474 + if ((pos = m3u8.rfind("/")) != string::npos) {
  475 + baseurl = m3u8.substr(0, pos);
  476 + }
  477 + ts_url = baseurl + "/" + url;
  478 + }
  479 +
  480 + SrsHttpUri uri;
  481 + if ((ret = uri.initialize(ts_url)) != ERROR_SUCCESS) {
  482 + return ret;
  483 + }
  484 +
  485 + // initialize the fresh http client.
  486 + if ((ret = client.initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) {
  487 + return ret;
  488 + }
  489 +
  490 + SrsHttpMessage* msg = NULL;
  491 + if ((ret = client.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
  492 + srs_error("HTTP GET %s failed. ret=%d", uri.get_url(), ret);
  493 + return ret;
  494 + }
  495 +
  496 + srs_assert(msg);
  497 + SrsAutoFree(SrsHttpMessage, msg);
  498 +
  499 + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
  500 + srs_error("read ts failed. ret=%d", ret);
  501 + return ret;
  502 + }
  503 +
  504 + srs_trace("fetch ts ok, duration=%.2f, url=%s, body=%dB", duration, url.c_str(), body.length());
  505 +
  506 + return ret;
  507 +}
  508 +
  509 +// the context to output to rtmp server
  510 +class SrsIngestSrsOutput : public ISrsTsHandler
  511 +{
  512 +private:
  513 + SrsHttpUri* out_rtmp;
  514 +private:
  515 + bool disconnected;
  516 + std::multimap<int64_t, SrsTsMessage*> queue;
  517 +private:
  518 + SrsRequest* req;
  519 + st_netfd_t stfd;
  520 + SrsStSocket* io;
  521 + SrsRtmpClient* client;
  522 + int stream_id;
  523 +private:
  524 + SrsRawH264Stream* avc;
  525 + std::string h264_sps;
  526 + bool h264_sps_changed;
  527 + std::string h264_pps;
  528 + bool h264_pps_changed;
  529 + bool h264_sps_pps_sent;
  530 +private:
  531 + SrsRawAacStream* aac;
  532 + std::string aac_specific_config;
  533 +public:
  534 + SrsIngestSrsOutput(SrsHttpUri* rtmp) {
  535 + out_rtmp = rtmp;
  536 + disconnected = false;
  537 +
  538 + req = NULL;
  539 + io = NULL;
  540 + client = NULL;
  541 + stfd = NULL;
  542 + stream_id = 0;
  543 +
  544 + avc = new SrsRawH264Stream();
  545 + aac = new SrsRawAacStream();
  546 + h264_sps_changed = false;
  547 + h264_pps_changed = false;
  548 + h264_sps_pps_sent = false;
  549 + }
  550 + virtual ~SrsIngestSrsOutput() {
  551 + close();
  552 +
  553 + srs_freep(avc);
  554 + srs_freep(aac);
  555 +
  556 + std::multimap<int64_t, SrsTsMessage*>::iterator it;
  557 + for (it = queue.begin(); it != queue.end(); ++it) {
  558 + SrsTsMessage* msg = it->second;
  559 + srs_freep(msg);
  560 + }
  561 + queue.clear();
  562 + }
  563 +// interface ISrsTsHandler
  564 +public:
  565 + virtual int on_ts_message(SrsTsMessage* msg);
  566 +private:
  567 + virtual int parse_message_queue();
  568 + virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs);
  569 + virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts);
  570 + virtual int write_h264_ipb_frame(std::string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts);
  571 + virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs);
  572 + virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts);
  573 +private:
  574 + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
  575 +public:
  576 + /**
  577 + * connect to output rtmp server.
  578 + */
  579 + virtual int connect();
  580 + /**
  581 + * flush the message queue when all ts parsed.
  582 + */
  583 + virtual int flush_message_queue();
  584 +private:
  585 + virtual int connect_app(std::string ep_server, std::string ep_port);
  586 + // close the connected io and rtmp to ready to be re-connect.
  587 + virtual void close();
  588 +};
  589 +
  590 +int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
  591 +{
  592 + int ret = ERROR_SUCCESS;
  593 +
  594 + // about the bytes of msg, specified by elementary stream which indicates by PES_packet_data_byte and stream_id
  595 + // for example, when SrsTsStream of SrsTsChannel indicates stream_type is SrsTsStreamVideoMpeg4 and SrsTsStreamAudioMpeg4,
  596 + // the elementary stream can be mux in "2.11 Carriage of ISO/IEC 14496 data" in hls-mpeg-ts-iso13818-1.pdf, page 103
  597 + // @remark, the most popular stream_id is 0xe0 for h.264 over mpegts, which indicates the stream_id is video and
  598 + // stream_number is 0, where I guess the elementary is specified in annexb format(H.264-AVC-ISO_IEC_14496-10.pdf, page 211).
  599 + // because when audio stream_number is 0, the elementary is ADTS(aac-mp4a-format-ISO_IEC_14496-3+2001.pdf, page 75, 1.A.2.2 ADTS).
  600 +
  601 + // about the bytes of PES_packet_data_byte, defined in hls-mpeg-ts-iso13818-1.pdf, page 58
  602 + // PES_packet_data_byte ¨C PES_packet_data_bytes shall be contiguous bytes of data from the elementary stream
  603 + // indicated by the packet¡¯s stream_id or PID. When the elementary stream data conforms to ITU-T
  604 + // Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 13818-3, the PES_packet_data_bytes shall be byte aligned to the bytes of this
  605 + // Recommendation | International Standard. The byte-order of the elementary stream shall be preserved. The number of
  606 + // PES_packet_data_bytes, N, is specified by the PES_packet_length field. N shall be equal to the value indicated in the
  607 + // PES_packet_length minus the number of bytes between the last byte of the PES_packet_length field and the first
  608 + // PES_packet_data_byte.
  609 + //
  610 + // In the case of a private_stream_1, private_stream_2, ECM_stream, or EMM_stream, the contents of the
  611 + // PES_packet_data_byte field are user definable and will not be specified by ITU-T | ISO/IEC in the future.
  612 +
  613 + // about the bytes of stream_id, define in hls-mpeg-ts-iso13818-1.pdf, page 49
  614 + // stream_id ¨C In Program Streams, the stream_id specifies the type and number of the elementary stream as defined by the
  615 + // stream_id Table 2-18. In Transport Streams, the stream_id may be set to any valid value which correctly describes the
  616 + // elementary stream type as defined in Table 2-18. In Transport Streams, the elementary stream type is specified in the
  617 + // Program Specific Information as specified in 2.4.4.
  618 +
  619 + // about the stream_id table, define in Table 2-18 ¨C Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52.
  620 + //
  621 + // 110x xxxx
  622 + // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC
  623 + // 14496-3 audio stream number x xxxx
  624 + // ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudio
  625 + //
  626 + // 1110 xxxx
  627 + // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC
  628 + // 14496-2 video stream number xxxx
  629 + // ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo
  630 +
  631 + srs_info("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
  632 + (msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(),
  633 + msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid,
  634 + msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
  635 +
  636 + // when not audio/video, or not adts/annexb format, donot support.
  637 + if (msg->stream_number() != 0) {
  638 + ret = ERROR_STREAM_CASTER_TS_ES;
  639 + srs_error("mpegts: unsupported stream format, sid=%#x(%s-%d). ret=%d",
  640 + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number(), ret);
  641 + return ret;
  642 + }
  643 +
  644 + // check supported codec
  645 + if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) {
  646 + ret = ERROR_STREAM_CASTER_TS_CODEC;
  647 + srs_error("mpegts: unsupported stream codec=%d. ret=%d", msg->channel->stream, ret);
  648 + return ret;
  649 + }
  650 +
  651 + // we must use queue to cache the msg, then parse it if possible.
  652 + queue.insert(std::make_pair(msg->dts, msg->detach()));
  653 + if ((ret = parse_message_queue()) != ERROR_SUCCESS) {
  654 + // when peer closed, close the output and reconnect.
  655 + if (srs_is_client_gracefully_close(ret)) {
  656 + close();
  657 + }
  658 + return ret;
  659 + }
  660 +
  661 + return ret;
  662 +}
  663 +
  664 +int SrsIngestSrsOutput::parse_message_queue()
  665 +{
  666 + int ret = ERROR_SUCCESS;
  667 +
  668 + int nb_videos = 0;
  669 + int nb_audios = 0;
  670 + std::multimap<int64_t, SrsTsMessage*>::iterator it;
  671 + for (it = queue.begin(); it != queue.end(); ++it) {
  672 + SrsTsMessage* msg = it->second;
  673 +
  674 + // publish audio or video.
  675 + if (msg->channel->stream == SrsTsStreamVideoH264) {
  676 + nb_videos++;
  677 + } else {
  678 + nb_audios++;
  679 + }
  680 + }
  681 +
  682 + // always wait 2+ videos, to left one video in the queue.
  683 + // TODO: FIXME: support pure audio hls.
  684 + if (nb_videos <= 1) {
  685 + return ret;
  686 + }
  687 +
  688 + // parse messages util the last video.
  689 + while (nb_videos > 1 && queue.size() > 0) {
  690 + std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
  691 +
  692 + SrsTsMessage* msg = it->second;
  693 + if (msg->channel->stream == SrsTsStreamVideoH264) {
  694 + nb_videos--;
  695 + }
  696 + queue.erase(it);
  697 +
  698 + // parse the stream.
  699 + SrsStream avs;
  700 + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
  701 + srs_error("mpegts: initialize av stream failed. ret=%d", ret);
  702 + return ret;
  703 + }
  704 +
  705 + // publish audio or video.
  706 + if (msg->channel->stream == SrsTsStreamVideoH264) {
  707 + if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) {
  708 + return ret;
  709 + }
  710 + }
  711 + if (msg->channel->stream == SrsTsStreamAudioAAC) {
  712 + if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) {
  713 + return ret;
  714 + }
  715 + }
  716 + }
  717 +
  718 + return ret;
  719 +}
  720 +
  721 +int SrsIngestSrsOutput::flush_message_queue()
  722 +{
  723 + int ret = ERROR_SUCCESS;
  724 +
  725 + // parse messages util the last video.
  726 + while (!queue.empty()) {
  727 + std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
  728 +
  729 + SrsTsMessage* msg = it->second;
  730 + queue.erase(it);
  731 +
  732 + // parse the stream.
  733 + SrsStream avs;
  734 + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
  735 + srs_error("mpegts: initialize av stream failed. ret=%d", ret);
  736 + return ret;
  737 + }
  738 +
  739 + // publish audio or video.
  740 + if (msg->channel->stream == SrsTsStreamVideoH264) {
  741 + if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) {
  742 + return ret;
  743 + }
  744 + }
  745 + if (msg->channel->stream == SrsTsStreamAudioAAC) {
  746 + if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) {
  747 + return ret;
  748 + }
  749 + }
  750 + }
  751 +
  752 + return ret;
  753 +}
  754 +
  755 +int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
  756 +{
  757 + int ret = ERROR_SUCCESS;
  758 +
  759 + // ts tbn to flv tbn.
  760 + u_int32_t dts = (u_int32_t)(msg->dts / 90);
  761 + u_int32_t pts = (u_int32_t)(msg->dts / 90);
  762 +
  763 + std::string ibps;
  764 + SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame;
  765 +
  766 + // send each frame.
  767 + while (!avs->empty()) {
  768 + char* frame = NULL;
  769 + int frame_size = 0;
  770 + if ((ret = avc->annexb_demux(avs, &frame, &frame_size)) != ERROR_SUCCESS) {
  771 + return ret;
  772 + }
  773 +
  774 + // 5bits, 7.3.1 NAL unit syntax,
  775 + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44.
  776 + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
  777 + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
  778 +
  779 + // for IDR frame, the frame is keyframe.
  780 + if (nal_unit_type == SrsAvcNaluTypeIDR) {
  781 + frame_type = SrsCodecVideoAVCFrameKeyFrame;
  782 + }
  783 +
  784 + // ignore the nalu type aud(9)
  785 + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) {
  786 + continue;
  787 + }
  788 +
  789 + // for sps
  790 + if (avc->is_sps(frame, frame_size)) {
  791 + std::string sps;
  792 + if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) {
  793 + return ret;
  794 + }
  795 +
  796 + if (h264_sps == sps) {
  797 + continue;
  798 + }
  799 + h264_sps_changed = true;
  800 + h264_sps = sps;
  801 + continue;
  802 + }
  803 +
  804 + // for pps
  805 + if (avc->is_pps(frame, frame_size)) {
  806 + std::string pps;
  807 + if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) {
  808 + return ret;
  809 + }
  810 +
  811 + if (h264_pps == pps) {
  812 + continue;
  813 + }
  814 + h264_pps_changed = true;
  815 + h264_pps = pps;
  816 + continue;
  817 + }
  818 +
  819 + // ibp frame.
  820 + std::string ibp;
  821 + if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) {
  822 + return ret;
  823 + }
  824 + ibps.append(ibp);
  825 + }
  826 +
  827 + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
  828 + return ret;
  829 + }
  830 +
  831 + if ((ret = write_h264_ipb_frame(ibps, frame_type, dts, pts)) != ERROR_SUCCESS) {
  832 + // drop the ts message.
  833 + if (ret == ERROR_H264_DROP_BEFORE_SPS_PPS) {
  834 + return ERROR_SUCCESS;
  835 + }
  836 + return ret;
  837 + }
  838 +
  839 + return ret;
  840 +}
  841 +
  842 +int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
  843 +{
  844 + int ret = ERROR_SUCCESS;
  845 +
  846 + // when sps or pps changed, update the sequence header,
  847 + // for the pps maybe not changed while sps changed.
  848 + // so, we must check when each video ts message frame parsed.
  849 + if (h264_sps_pps_sent && !h264_sps_changed && !h264_pps_changed) {
  850 + return ret;
  851 + }
  852 +
  853 + // when not got sps/pps, wait.
  854 + if (h264_pps.empty() || h264_sps.empty()) {
  855 + return ret;
  856 + }
  857 +
  858 + // h264 raw to h264 packet.
  859 + std::string sh;
  860 + if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) {
  861 + return ret;
  862 + }
  863 +
  864 + // h264 packet to flv packet.
  865 + int8_t frame_type = SrsCodecVideoAVCFrameKeyFrame;
  866 + int8_t avc_packet_type = SrsCodecVideoAVCTypeSequenceHeader;
  867 + char* flv = NULL;
  868 + int nb_flv = 0;
  869 + if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
  870 + return ret;
  871 + }
  872 +
  873 + // the timestamp in rtmp message header is dts.
  874 + u_int32_t timestamp = dts;
  875 + if ((ret = rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) {
  876 + return ret;
  877 + }
  878 +
  879 + // reset sps and pps.
  880 + h264_sps_changed = false;
  881 + h264_pps_changed = false;
  882 + h264_sps_pps_sent = true;
  883 + srs_trace("hls: h264 sps/pps sent, sps=%dB, pps=%dB", h264_sps.length(), h264_pps.length());
  884 +
  885 + return ret;
  886 +}
  887 +
  888 +int SrsIngestSrsOutput::write_h264_ipb_frame(string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts)
  889 +{
  890 + int ret = ERROR_SUCCESS;
  891 +
  892 + // when sps or pps not sent, ignore the packet.
  893 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/203
  894 + if (!h264_sps_pps_sent) {
  895 + return ERROR_H264_DROP_BEFORE_SPS_PPS;
  896 + }
  897 +
  898 + int8_t avc_packet_type = SrsCodecVideoAVCTypeNALU;
  899 + char* flv = NULL;
  900 + int nb_flv = 0;
  901 + if ((ret = avc->mux_avc2flv(ibps, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
  902 + return ret;
  903 + }
  904 +
  905 + // the timestamp in rtmp message header is dts.
  906 + u_int32_t timestamp = dts;
  907 + return rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv);
  908 +}
  909 +
  910 +int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
  911 +{
  912 + int ret = ERROR_SUCCESS;
  913 +
  914 + // ts tbn to flv tbn.
  915 + u_int32_t dts = (u_int32_t)(msg->dts / 90);
  916 +
  917 + // got the next video to calc the delta duration for each audio.
  918 + u_int32_t duration = 0;
  919 + if (!queue.empty()) {
  920 + SrsTsMessage* nm = queue.begin()->second;
  921 + duration = (u_int32_t)(srs_max(0, nm->dts - msg->dts) / 90);
  922 + }
  923 + u_int32_t max_dts = dts + duration;
  924 +
  925 + // send each frame.
  926 + while (!avs->empty()) {
  927 + char* frame = NULL;
  928 + int frame_size = 0;
  929 + SrsRawAacStreamCodec codec;
  930 + if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
  931 + return ret;
  932 + }
  933 +
  934 + // ignore invalid frame,
  935 + // * atleast 1bytes for aac to decode the data.
  936 + if (frame_size <= 0) {
  937 + continue;
  938 + }
  939 + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts);
  940 +
  941 + // generate sh.
  942 + if (aac_specific_config.empty()) {
  943 + std::string sh;
  944 + if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
  945 + return ret;
  946 + }
  947 + aac_specific_config = sh;
  948 +
  949 + codec.aac_packet_type = 0;
  950 +
  951 + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) {
  952 + return ret;
  953 + }
  954 + }
  955 +
  956 + // audio raw data.
  957 + codec.aac_packet_type = 1;
  958 + if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) {
  959 + return ret;
  960 + }
  961 +
  962 + // calc the delta of dts, when previous frame output.
  963 + u_int32_t delta = duration / (msg->payload->length() / frame_size);
  964 + dts = (u_int32_t)(srs_min(max_dts, dts + delta));
  965 + }
  966 +
  967 + return ret;
  968 +}
  969 +
  970 +int SrsIngestSrsOutput::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts)
  971 +{
  972 + int ret = ERROR_SUCCESS;
  973 +
  974 + char* data = NULL;
  975 + int size = 0;
  976 + if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) {
  977 + return ret;
  978 + }
  979 +
  980 + return rtmp_write_packet(SrsCodecFlvTagAudio, dts, data, size);
  981 +}
  982 +
  983 +int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
  984 +{
  985 + int ret = ERROR_SUCCESS;
  986 +
  987 + SrsSharedPtrMessage* msg = NULL;
  988 +
  989 + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
  990 + srs_error("mpegts: create shared ptr msg failed. ret=%d", ret);
  991 + return ret;
  992 + }
  993 + srs_assert(msg);
  994 +
  995 + // send out encoded msg.
  996 + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
  997 + return ret;
  998 + }
  999 +
  1000 + return ret;
  1001 +}
  1002 +
  1003 +int SrsIngestSrsOutput::connect()
  1004 +{
  1005 + int ret = ERROR_SUCCESS;
  1006 +
  1007 + // when ok, ignore.
  1008 + // TODO: FIXME: should reconnect when disconnected.
  1009 + if (io || client) {
  1010 + return ret;
  1011 + }
  1012 +
  1013 + srs_trace("connect output=%s", out_rtmp->get_url());
  1014 +
  1015 + // parse uri
  1016 + if (!req) {
  1017 + req = new SrsRequest();
  1018 +
  1019 + size_t pos = string::npos;
  1020 + string uri = req->tcUrl = out_rtmp->get_url();
  1021 +
  1022 + // tcUrl, stream
  1023 + if ((pos = uri.rfind("/")) != string::npos) {
  1024 + req->stream = uri.substr(pos + 1);
  1025 + req->tcUrl = uri = uri.substr(0, pos);
  1026 + }
  1027 +
  1028 + srs_discovery_tc_url(req->tcUrl,
  1029 + req->schema, req->host, req->vhost, req->app, req->port,
  1030 + req->param);
  1031 + }
  1032 +
  1033 + // connect host.
  1034 + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
  1035 + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret);
  1036 + return ret;
  1037 + }
  1038 + io = new SrsStSocket(stfd);
  1039 + client = new SrsRtmpClient(io);
  1040 +
  1041 + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  1042 + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  1043 +
  1044 + // connect to vhost/app
  1045 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  1046 + srs_error("mpegts: handshake with server failed. ret=%d", ret);
  1047 + return ret;
  1048 + }
  1049 + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
  1050 + srs_error("mpegts: connect with server failed. ret=%d", ret);
  1051 + return ret;
  1052 + }
  1053 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  1054 + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  1055 + return ret;
  1056 + }
  1057 +
  1058 + // publish.
  1059 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  1060 + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
  1061 + req->stream.c_str(), stream_id, ret);
  1062 + return ret;
  1063 + }
  1064 +
  1065 + return ret;
  1066 +}
  1067 +
  1068 +// TODO: FIXME: refine the connect_app.
  1069 +int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port)
  1070 +{
  1071 + int ret = ERROR_SUCCESS;
  1072 +
  1073 + // args of request takes the srs info.
  1074 + if (req->args == NULL) {
  1075 + req->args = SrsAmf0Any::object();
  1076 + }
  1077 +
  1078 + // notify server the edge identity,
  1079 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/147
  1080 + SrsAmf0Object* data = req->args;
  1081 + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
  1082 + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  1083 + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
  1084 + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
  1085 + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
  1086 + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
  1087 + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
  1088 + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
  1089 + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
  1090 + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
  1091 + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
  1092 + // for edge to directly get the id of client.
  1093 + data->set("srs_pid", SrsAmf0Any::number(getpid()));
  1094 + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
  1095 +
  1096 + // local ip of edge
  1097 + std::vector<std::string> ips = srs_get_local_ipv4_ips();
  1098 + assert(0 < (int)ips.size());
  1099 + std::string local_ip = ips[0];
  1100 + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
  1101 +
  1102 + // generate the tcUrl
  1103 + std::string param = "";
  1104 + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
  1105 +
  1106 + // upnode server identity will show in the connect_app of client.
  1107 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/160
  1108 + // the debug_srs_upnode is config in vhost and default to true.
  1109 + bool debug_srs_upnode = true;
  1110 + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
  1111 + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
  1112 + tc_url.c_str(), debug_srs_upnode, ret);
  1113 + return ret;
  1114 + }
  1115 +
  1116 + return ret;
  1117 +}
  1118 +
  1119 +void SrsIngestSrsOutput::close()
  1120 +{
  1121 + srs_trace("close output=%s", out_rtmp->get_url());
  1122 + h264_sps_pps_sent = false;
  1123 +
  1124 + srs_freep(client);
  1125 + srs_freep(io);
  1126 + srs_freep(req);
  1127 + srs_close_stfd(stfd);
  1128 +}
  1129 +
  1130 +// the context for ingest hls stream.
  1131 +class SrsIngestSrsContext
  1132 +{
  1133 +private:
  1134 + SrsIngestSrsInput* ic;
  1135 + SrsIngestSrsOutput* oc;
  1136 +public:
  1137 + SrsIngestSrsContext(SrsHttpUri* hls, SrsHttpUri* rtmp) {
  1138 + ic = new SrsIngestSrsInput(hls);
  1139 + oc = new SrsIngestSrsOutput(rtmp);
  1140 + }
  1141 + virtual ~SrsIngestSrsContext() {
  1142 + srs_freep(ic);
  1143 + srs_freep(oc);
  1144 + }
  1145 + virtual int proxy() {
  1146 + int ret = ERROR_SUCCESS;
  1147 +
  1148 + if ((ret = ic->connect()) != ERROR_SUCCESS) {
  1149 + srs_warn("connect oc failed. ret=%d", ret);
  1150 + return ret;
  1151 + }
  1152 +
  1153 + if ((ret = oc->connect()) != ERROR_SUCCESS) {
  1154 + srs_warn("connect ic failed. ret=%d", ret);
  1155 + return ret;
  1156 + }
  1157 +
  1158 + if ((ret = ic->parse(oc)) != ERROR_SUCCESS) {
  1159 + srs_warn("proxy ts to rtmp failed. ret=%d", ret);
  1160 + return ret;
  1161 + }
  1162 +
  1163 + if ((ret = oc->flush_message_queue()) != ERROR_SUCCESS) {
  1164 + srs_warn("flush oc message failed. ret=%d", ret);
  1165 + return ret;
  1166 + }
  1167 +
  1168 + return ret;
  1169 + }
  1170 +};
  1171 +
  1172 +int proxy_hls2rtmp(string hls, string rtmp)
  1173 +{
  1174 + int ret = ERROR_SUCCESS;
  1175 +
  1176 + // init st.
  1177 + if ((ret = srs_init_st()) != ERROR_SUCCESS) {
  1178 + srs_error("init st failed. ret=%d", ret);
  1179 + return ret;
  1180 + }
  1181 +
  1182 + SrsHttpUri hls_uri, rtmp_uri;
  1183 + if ((ret = hls_uri.initialize(hls)) != ERROR_SUCCESS) {
  1184 + srs_error("hls uri invalid. ret=%d", ret);
  1185 + return ret;
  1186 + }
  1187 + if ((ret = rtmp_uri.initialize(rtmp)) != ERROR_SUCCESS) {
  1188 + srs_error("rtmp uri invalid. ret=%d", ret);
  1189 + return ret;
  1190 + }
  1191 +
  1192 + SrsIngestSrsContext context(&hls_uri, &rtmp_uri);
  1193 + for (;;) {
  1194 + if ((ret = context.proxy()) == ERROR_SUCCESS) {
  1195 + continue;
  1196 + }
  1197 +
  1198 + srs_warn("proxy hls to rtmp failed. ret=%d", ret);
  1199 + st_usleep(SRS_INGEST_HLS_ERROR_RETRY_MS * 1000);
  1200 + }
  1201 +
  1202 + return ret;
  1203 +}
  1204 +