winlin

support ingest hls live stream to RTMP.

@@ -562,6 +562,7 @@ Supported operating systems and hardware: @@ -562,6 +562,7 @@ Supported operating systems and hardware:
562 562
563 ### SRS 2.0 history 563 ### SRS 2.0 history
564 564
  565 +* v2.0, 2015-04-20, support ingest hls live stream to RTMP.
565 * v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161. 566 * v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161.
566 * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160. 567 * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160.
567 * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts. 568 * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts.
@@ -57,8 +57,6 @@ echo -e " | ${SrsGprofSummaryColor}rm -f gmon.out; ./objs/srs -c conf/co @@ -57,8 +57,6 @@ echo -e " | ${SrsGprofSummaryColor}rm -f gmon.out; ./objs/srs -c conf/co
57 echo -e " | ${SrsGprofSummaryColor}killall -2 srs # or CTRL+C to stop gprof\${BLACK}" 57 echo -e " | ${SrsGprofSummaryColor}killall -2 srs # or CTRL+C to stop gprof\${BLACK}"
58 echo -e " | ${SrsGprofSummaryColor}gprof -b ./objs/srs gmon.out > gprof.srs.log && rm -f gmon.out # gprof report to gprof.srs.log\${BLACK}" 58 echo -e " | ${SrsGprofSummaryColor}gprof -b ./objs/srs gmon.out > gprof.srs.log && rm -f gmon.out # gprof report to gprof.srs.log\${BLACK}"
59 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 59 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
60 -echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}"  
61 -echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"  
62 echo -e " |${SrsUtestSummaryColor}utest: ./objs/srs_utest, the utest for srs\${BLACK}" 60 echo -e " |${SrsUtestSummaryColor}utest: ./objs/srs_utest, the utest for srs\${BLACK}"
63 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 61 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
64 echo -e " |${SrsLibrtmpSummaryColor}librtmp @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_SrsLibrtmp\${BLACK}" 62 echo -e " |${SrsLibrtmpSummaryColor}librtmp @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_SrsLibrtmp\${BLACK}"
@@ -71,6 +69,12 @@ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/ @@ -71,6 +69,12 @@ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/
71 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_detect_rtmp\${BLACK}" 69 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_detect_rtmp\${BLACK}"
72 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_bandwidth_check\${BLACK}" 70 echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_bandwidth_check\${BLACK}"
73 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" 71 echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
  72 +echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}"
  73 +echo -e " | ${SrsResearchSummaryColor} @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}"
  74 +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
  75 +echo -e " |\${GREEN}tools: important tool, others @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}"
  76 +echo -e " | \${GREEN}./objs/srs_ingest_hls -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/livestream\${BLACK}"
  77 +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}"
74 echo -e " |\${GREEN}server: ./objs/srs -c conf/srs.conf, start the srs server\${BLACK}" 78 echo -e " |\${GREEN}server: ./objs/srs -c conf/srs.conf, start the srs server\${BLACK}"
75 echo -e " | ${SrsHlsSummaryColor}hls @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS\${BLACK}" 79 echo -e " | ${SrsHlsSummaryColor}hls @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS\${BLACK}"
76 echo -e " | ${SrsHlsSummaryColor}hls: generate m3u8 and ts from rtmp stream\${BLACK}" 80 echo -e " | ${SrsHlsSummaryColor}hls: generate m3u8 and ts from rtmp stream\${BLACK}"
@@ -121,4 +125,4 @@ echo -e "\${BLACK}Examples for srs-librtmp at:\${BLACK}" @@ -121,4 +125,4 @@ echo -e "\${BLACK}Examples for srs-librtmp at:\${BLACK}"
121 echo -e "\${GREEN} objs/research/librtmp\${BLACK}" 125 echo -e "\${GREEN} objs/research/librtmp\${BLACK}"
122 echo -e "\${GREEN} Examples: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}" 126 echo -e "\${GREEN} Examples: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}"
123 END 127 END
124 -fi  
  128 +fi
@@ -100,7 +100,7 @@ AR = ar @@ -100,7 +100,7 @@ AR = ar
100 LINK = g++ 100 LINK = g++
101 CXXFLAGS = ${CXXFLAGS} 101 CXXFLAGS = ${CXXFLAGS}
102 102
103 -.PHONY: default srs librtmp 103 +.PHONY: default srs srs_ingest_hls librtmp
104 104
105 default: 105 default:
106 106
@@ -200,7 +200,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -200,7 +200,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
200 MODULE_ID="MAIN" 200 MODULE_ID="MAIN"
201 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP" "APP") 201 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP" "APP")
202 ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibHttpParserRoot}) 202 ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibHttpParserRoot})
203 - MODULE_FILES=("srs_main_server") 203 + MODULE_FILES=("srs_main_server" "srs_main_ingest_hls")
204 # add each modules for main 204 # add each modules for main
205 for SRS_MODULE in ${SRS_MODULES[*]}; do 205 for SRS_MODULE in ${SRS_MODULES[*]}; do
206 . $SRS_MODULE/config 206 . $SRS_MODULE/config
@@ -217,7 +217,7 @@ fi @@ -217,7 +217,7 @@ fi
217 # disable all app when export librtmp 217 # disable all app when export librtmp
218 if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then 218 if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
219 # all main entrances 219 # all main entrances
220 - MAIN_ENTRANCES=("srs_main_server") 220 + MAIN_ENTRANCES=("srs_main_server" "srs_main_ingest_hls")
221 # add each modules for main 221 # add each modules for main
222 for SRS_MODULE in ${SRS_MODULES[*]}; do 222 for SRS_MODULE in ${SRS_MODULES[*]}; do
223 . $SRS_MODULE/config 223 . $SRS_MODULE/config
@@ -232,6 +232,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -232,6 +232,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
232 # 232 #
233 # srs: srs(simple rtmp server) over st(state-threads) 233 # srs: srs(simple rtmp server) over st(state-threads)
234 BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh 234 BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh
  235 + #
  236 + # srs_ingest_hls: to ingest hls stream to srs.
  237 + BUILD_KEY="srs_ingest_hls" APP_MAIN="srs_main_ingest_hls" APP_NAME="srs_ingest_hls" . auto/apps.sh
235 # add each modules for application 238 # add each modules for application
236 for SRS_MODULE in ${SRS_MODULES[*]}; do 239 for SRS_MODULE in ${SRS_MODULES[*]}; do
237 . $SRS_MODULE/config 240 . $SRS_MODULE/config
@@ -272,7 +275,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk @@ -272,7 +275,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk
272 275
273 # generate phony header 276 # generate phony header
274 cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE} 277 cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE}
275 -.PHONY: default _default install install-api help clean server librtmp utest _prepare_dir $__mphonys 278 +.PHONY: default _default install install-api help clean server srs_ingest_hls librtmp utest _prepare_dir $__mphonys
276 279
277 # install prefix. 280 # install prefix.
278 SRS_PREFIX=${SRS_PREFIX} 281 SRS_PREFIX=${SRS_PREFIX}
@@ -300,14 +303,15 @@ fi @@ -300,14 +303,15 @@ fi
300 # the server, librtmp and utest 303 # the server, librtmp and utest
301 # where the bellow will check and disable some entry by only echo. 304 # where the bellow will check and disable some entry by only echo.
302 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} 305 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE}
303 -_default: server librtmp utest $__mdefaults 306 +_default: server srs_ingest_hls librtmp utest $__mdefaults
304 @bash objs/_srs_build_summary.sh 307 @bash objs/_srs_build_summary.sh
305 308
306 help: 309 help:
307 - @echo "Usage: make <help>|<clean>|<server>|<librtmp>|<utest>|<install>|<install-api>|<uninstall>" 310 + @echo "Usage: make <help>|<clean>|<server>|<srs_ingest_hls>|<librtmp>|<utest>|<install>|<install-api>|<uninstall>"
308 @echo " help display this help menu" 311 @echo " help display this help menu"
309 @echo " clean cleanup project" 312 @echo " clean cleanup project"
310 @echo " server build the srs(simple rtmp server) over st(state-threads)" 313 @echo " server build the srs(simple rtmp server) over st(state-threads)"
  314 + @echo " srs_ingest_hls build the hls ingest tool of srs."
311 @echo " librtmp build the client publish/play library, and samples" 315 @echo " librtmp build the client publish/play library, and samples"
312 @echo " utest build the utest for srs" 316 @echo " utest build the utest for srs"
313 @echo " install install srs to the prefix path" 317 @echo " install install srs to the prefix path"
@@ -332,6 +336,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT != NO ]; then @@ -332,6 +336,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT != NO ]; then
332 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} 336 cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE}
333 server: _prepare_dir 337 server: _prepare_dir
334 @echo "donot build the srs(simple rtmp server) for srs-librtmp" 338 @echo "donot build the srs(simple rtmp server) for srs-librtmp"
  339 +srs_ingest_hls: _prepare_dir
  340 + @echo "donot build the srs_ingest_hls for srs-librtmp"
335 341
336 END 342 END
337 else 343 else
@@ -339,6 +345,9 @@ else @@ -339,6 +345,9 @@ else
339 server: _prepare_dir 345 server: _prepare_dir
340 @echo "build the srs(simple rtmp server) over st(state-threads)" 346 @echo "build the srs(simple rtmp server) over st(state-threads)"
341 \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs 347 \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs
  348 +srs_ingest_hls: _prepare_dir
  349 + @echo "build the srs_ingest_hls for srs"
  350 + \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs_ingest_hls
342 351
343 END 352 END
344 fi 353 fi
1 file 1 file
2 main readonly separator, 2 main readonly separator,
3 ../../src/main/srs_main_server.cpp, 3 ../../src/main/srs_main_server.cpp,
  4 + ../../src/main/srs_main_ingest_hls.cpp,
4 auto readonly separator, 5 auto readonly separator,
5 ../../objs/srs_auto_headers.hpp, 6 ../../objs/srs_auto_headers.hpp,
6 libs readonly separator, 7 libs readonly separator,
@@ -105,6 +105,7 @@ @@ -105,6 +105,7 @@
105 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; }; 105 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; };
106 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; }; 106 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; };
107 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; }; 107 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; };
  108 + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */; };
108 /* End PBXBuildFile section */ 109 /* End PBXBuildFile section */
109 110
110 /* Begin PBXCopyFilesBuildPhase section */ 111 /* Begin PBXCopyFilesBuildPhase section */
@@ -361,6 +362,7 @@ @@ -361,6 +362,7 @@
361 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = "<group>"; }; 362 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = "<group>"; };
362 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = "<group>"; }; 363 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = "<group>"; };
363 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = "<group>"; }; 364 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = "<group>"; };
  365 + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_main_ingest_hls.cpp; path = ../../../src/main/srs_main_ingest_hls.cpp; sourceTree = "<group>"; };
364 /* End PBXFileReference section */ 366 /* End PBXFileReference section */
365 367
366 /* Begin PBXFrameworksBuildPhase section */ 368 /* Begin PBXFrameworksBuildPhase section */
@@ -442,6 +444,7 @@ @@ -442,6 +444,7 @@
442 3C1232041AAE80CB00CE8F6C /* main */ = { 444 3C1232041AAE80CB00CE8F6C /* main */ = {
443 isa = PBXGroup; 445 isa = PBXGroup;
444 children = ( 446 children = (
  447 + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */,
445 3C1232051AAE812C00CE8F6C /* srs_main_server.cpp */, 448 3C1232051AAE812C00CE8F6C /* srs_main_server.cpp */,
446 ); 449 );
447 name = main; 450 name = main;
@@ -904,6 +907,7 @@ @@ -904,6 +907,7 @@
904 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */, 907 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */,
905 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */, 908 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */,
906 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */, 909 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */,
  910 + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */,
907 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */, 911 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */,
908 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */, 912 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */,
909 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */, 913 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */,
@@ -274,7 +274,7 @@ bool SrsFastLog::generate_header(bool error, const char* tag, int context_id, co @@ -274,7 +274,7 @@ bool SrsFastLog::generate_header(bool error, const char* tag, int context_id, co
274 274
275 // to calendar time 275 // to calendar time
276 struct tm* tm; 276 struct tm* tm;
277 - if (_srs_config->get_utc_time()) { 277 + if (_srs_config && _srs_config->get_utc_time()) {
278 if ((tm = gmtime(&tv.tv_sec)) == NULL) { 278 if ((tm = gmtime(&tv.tv_sec)) == NULL) {
279 return false; 279 return false;
280 } 280 }
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2015 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core.hpp>
  25 +
  26 +#include <string>
  27 +#include <vector>
  28 +using namespace std;
  29 +
  30 +#include <srs_kernel_error.hpp>
  31 +#include <srs_app_server.hpp>
  32 +#include <srs_app_config.hpp>
  33 +#include <srs_app_log.hpp>
  34 +#include <srs_kernel_utility.hpp>
  35 +#include <srs_rtmp_sdk.hpp>
  36 +#include <srs_kernel_buffer.hpp>
  37 +#include <srs_kernel_stream.hpp>
  38 +#include <srs_kernel_ts.hpp>
  39 +#include <srs_app_http_client.hpp>
  40 +#include <srs_app_http.hpp>
  41 +#include <srs_core_autofree.hpp>
  42 +#include <srs_app_st.hpp>
  43 +#include <srs_rtmp_utility.hpp>
  44 +#include <srs_app_st_socket.hpp>
  45 +#include <srs_app_utility.hpp>
  46 +#include <srs_rtmp_amf0.hpp>
  47 +#include <srs_raw_avc.hpp>
  48 +
  49 +// the retry timeout in ms.
  50 +#define SRS_INGEST_HLS_ERROR_RETRY_MS 3000
  51 +
  52 +// pre-declare
  53 +int proxy_hls2rtmp(std::string hls, std::string rtmp);
  54 +
  55 +// for the main objects(server, config, log, context),
  56 +// never subscribe handler in constructor,
  57 +// instead, subscribe handler in initialize method.
  58 +// kernel module.
  59 +ISrsLog* _srs_log = new SrsFastLog();
  60 +ISrsThreadContext* _srs_context = new ISrsThreadContext();
  61 +// app module.
  62 +SrsConfig* _srs_config = NULL;
  63 +SrsServer* _srs_server = NULL;
  64 +
  65 +/**
  66 +* main entrance.
  67 +*/
  68 +int main(int argc, char** argv)
  69 +{
  70 + // TODO: support both little and big endian.
  71 + srs_assert(srs_is_little_endian());
  72 +
  73 + // directly failed when compile limited.
  74 +#if !defined(SRS_AUTO_HTTP_PARSER)
  75 + srs_error("depends on http-parser.");
  76 + exit(-1);
  77 +#endif
  78 +
  79 +#if defined(SRS_AUTO_GPERF_MP) || defined(SRS_AUTO_GPERF_MP) \
  80 +|| defined(SRS_AUTO_GPERF_MC) || defined(SRS_AUTO_GPERF_MP)
  81 + srs_error("donot support gmc/gmp/gcp/gprof");
  82 + exit(-1);
  83 +#endif
  84 +
  85 + srs_trace("srs_ingest_hls base on %s, to ingest hls live to srs", RTMP_SIG_SRS_SERVER);
  86 +
  87 + // parse user options.
  88 + std::string in_hls_url, out_rtmp_url;
  89 + for (int opt = 0; opt < argc; opt++) {
  90 + srs_trace("argv[%d]=%s", opt, argv[opt]);
  91 + }
  92 +
  93 + // fill the options for mac
  94 + for (int opt = 0; opt < argc - 1; opt++) {
  95 + // ignore all options except -i and -y.
  96 + char* p = argv[opt];
  97 +
  98 + // only accept -x
  99 + if (p[0] != '-' || p[1] == 0 || p[2] != 0) {
  100 + continue;
  101 + }
  102 +
  103 + // parse according the option name.
  104 + switch (p[1]) {
  105 + case 'i': in_hls_url = argv[opt + 1]; break;
  106 + case 'y': out_rtmp_url = argv[opt + 1]; break;
  107 + default: break;
  108 + }
  109 + }
  110 +
  111 + if (in_hls_url.empty() || out_rtmp_url.empty()) {
  112 + printf("ingest hls live stream and publish to RTMP server\n"
  113 + "Usage: %s <-i in_hls_url> <-y out_rtmp_url>\n"
  114 + " in_hls_url input hls url, ingest from this m3u8.\n"
  115 + " out_rtmp_url output rtmp url, publish to this url.\n"
  116 + "For example:\n"
  117 + " %s -i http://127.0.0.1:8080/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n"
  118 + " %s -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n",
  119 + argv[0], argv[0], argv[0]);
  120 + exit(-1);
  121 + }
  122 +
  123 + srs_trace("input: %s", in_hls_url.c_str());
  124 + srs_trace("output: %s", out_rtmp_url.c_str());
  125 +
  126 + return proxy_hls2rtmp(in_hls_url, out_rtmp_url);
  127 +}
  128 +
  129 +// the context to ingest hls stream.
  130 +class SrsIngestSrsInput
  131 +{
  132 +private:
  133 + struct SrsTsPiece {
  134 + double duration;
  135 + std::string url;
  136 + std::string body;
  137 +
  138 + // should skip this ts?
  139 + bool skip;
  140 + // already sent to rtmp server?
  141 + bool sent;
  142 + // whether ts piece is dirty, remove if not update.
  143 + bool dirty;
  144 +
  145 + SrsTsPiece() {
  146 + skip = false;
  147 + sent = false;
  148 + dirty = false;
  149 + }
  150 +
  151 + int fetch(std::string m3u8, SrsHttpClient* client);
  152 + };
  153 +private:
  154 + SrsHttpUri* in_hls;
  155 + std::vector<SrsTsPiece*> pieces;
  156 + int64_t next_connect_time;
  157 +private:
  158 + SrsStream* stream;
  159 + SrsTsContext* context;
  160 +public:
  161 + SrsIngestSrsInput(SrsHttpUri* hls) {
  162 + in_hls = hls;
  163 + next_connect_time = 0;
  164 +
  165 + stream = new SrsStream();
  166 + context = new SrsTsContext();
  167 + }
  168 + virtual ~SrsIngestSrsInput() {
  169 + srs_freep(stream);
  170 + srs_freep(context);
  171 +
  172 + std::vector<SrsTsPiece*>::iterator it;
  173 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  174 + SrsTsPiece* tp = *it;
  175 + srs_freep(tp);
  176 + }
  177 + pieces.clear();
  178 + }
  179 + /**
  180 + * parse the input hls live m3u8 index.
  181 + */
  182 + virtual int connect();
  183 + /**
  184 + * parse the ts and use hanler to process the message.
  185 + */
  186 + virtual int parse(ISrsTsHandler* handler);
  187 +private:
  188 + /**
  189 + * find the ts piece by its url.
  190 + */
  191 + virtual SrsTsPiece* find_ts(string url);
  192 + /**
  193 + * set all ts to dirty.
  194 + */
  195 + virtual void dirty_all_ts();
  196 + /**
  197 + * fetch all ts body.
  198 + */
  199 + virtual void fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client);
  200 + /**
  201 + * remove all ts which is dirty.
  202 + */
  203 + virtual void remove_dirty();
  204 +};
  205 +
  206 +int SrsIngestSrsInput::connect()
  207 +{
  208 + int ret = ERROR_SUCCESS;
  209 +
  210 + int64_t now = srs_update_system_time_ms();
  211 + if (now < next_connect_time) {
  212 + st_usleep((next_connect_time - now) * 1000);
  213 + }
  214 +
  215 + SrsHttpClient client;
  216 + srs_trace("parse input hls %s", in_hls->get_url());
  217 +
  218 + if ((ret = client.initialize(in_hls->get_host(), in_hls->get_port())) != ERROR_SUCCESS) {
  219 + srs_error("connect to server failed. ret=%d", ret);
  220 + return ret;
  221 + }
  222 +
  223 + SrsHttpMessage* msg = NULL;
  224 + if ((ret = client.get(in_hls->get_path(), "", &msg)) != ERROR_SUCCESS) {
  225 + srs_error("HTTP GET %s failed. ret=%d", in_hls->get_url(), ret);
  226 + return ret;
  227 + }
  228 +
  229 + srs_assert(msg);
  230 + SrsAutoFree(SrsHttpMessage, msg);
  231 +
  232 + std::string body;
  233 + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
  234 + srs_error("read m3u8 failed. ret=%d", ret);
  235 + return ret;
  236 + }
  237 +
  238 + if (body.empty()) {
  239 + srs_warn("ignore empty m3u8");
  240 + return ret;
  241 + }
  242 +
  243 + // set all ts to dirty.
  244 + dirty_all_ts();
  245 +
  246 + std::string ptl;
  247 + double td = 0.0;
  248 + double duration = 0.0;
  249 + bool fresh_m3u8 = pieces.empty();
  250 + while (!body.empty()) {
  251 + size_t pos = string::npos;
  252 +
  253 + std::string line;
  254 + if ((pos = body.find("\n")) != string::npos) {
  255 + line = body.substr(0, pos);
  256 + body = body.substr(pos + 1);
  257 + } else {
  258 + line = body;
  259 + body = "";
  260 + }
  261 +
  262 + line = srs_string_replace(line, "\r", "");
  263 + line = srs_string_replace(line, " ", "");
  264 +
  265 + // #EXT-X-VERSION:3
  266 + // the version must be 3.0
  267 + if (srs_string_starts_with(line, "#EXT-X-VERSION:")) {
  268 + if (!srs_string_ends_with(line, ":3")) {
  269 + srs_warn("m3u8 3.0 required, actual is %s", line.c_str());
  270 + }
  271 + continue;
  272 + }
  273 +
  274 + // #EXT-X-PLAYLIST-TYPE:VOD
  275 + // the playlist type, vod or nothing.
  276 + if (srs_string_starts_with(line, "#EXT-X-PLAYLIST-TYPE:")) {
  277 + ptl = line;
  278 + continue;
  279 + }
  280 +
  281 + // #EXT-X-TARGETDURATION:12
  282 + // the target duration is required.
  283 + if (srs_string_starts_with(line, "#EXT-X-TARGETDURATION:")) {
  284 + td = ::atof(line.substr(string("#EXT-X-TARGETDURATION:").length()).c_str());
  285 + }
  286 +
  287 + // #EXT-X-ENDLIST
  288 + // parse completed.
  289 + if (line == "#EXT-X-ENDLIST") {
  290 + break;
  291 + }
  292 +
  293 + // #EXTINF:11.401,
  294 + // livestream-5.ts
  295 + // parse each ts entry, expect current line is inf.
  296 + if (!srs_string_starts_with(line, "#EXTINF:")) {
  297 + continue;
  298 + }
  299 +
  300 + // expect next line is url.
  301 + std::string ts_url;
  302 + if ((pos = body.find("\n")) != string::npos) {
  303 + ts_url = body.substr(0, pos);
  304 + body = body.substr(pos + 1);
  305 + } else {
  306 + srs_warn("ts entry unexpected eof, inf=%s", line.c_str());
  307 + break;
  308 + }
  309 +
  310 + // parse the ts duration.
  311 + line = line.substr(string("#EXTINF:").length());
  312 + if ((pos = line.find(",")) != string::npos) {
  313 + line = line.substr(0, pos);
  314 + }
  315 +
  316 + double ts_duration = ::atof(line.c_str());
  317 + duration += ts_duration;
  318 +
  319 + SrsTsPiece* tp = find_ts(ts_url);
  320 + if (!tp) {
  321 + tp = new SrsTsPiece();
  322 + tp->url = ts_url;
  323 + tp->duration = ts_duration;
  324 + pieces.push_back(tp);
  325 + } else {
  326 + tp->dirty = false;
  327 + }
  328 + }
  329 +
  330 + // fetch all ts.
  331 + fetch_all_ts(fresh_m3u8, &client);
  332 +
  333 + // remove all dirty ts.
  334 + remove_dirty();
  335 +
  336 + srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size());
  337 +
  338 + return ret;
  339 +}
  340 +
  341 +int SrsIngestSrsInput::parse(ISrsTsHandler* handler)
  342 +{
  343 + int ret = ERROR_SUCCESS;
  344 +
  345 + for (int i = 0; i < (int)pieces.size(); i++) {
  346 + SrsTsPiece* tp = pieces.at(i);
  347 + tp->sent = true;
  348 +
  349 + if (tp->body.empty()) {
  350 + continue;
  351 + }
  352 +
  353 + // use stream to parse ts packet.
  354 + int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE;
  355 + for (int i = 0; i < nb_packet; i++) {
  356 + char* p = (char*)tp->body.data() + (i * SRS_TS_PACKET_SIZE);
  357 + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
  358 + return ret;
  359 + }
  360 +
  361 + // process each ts packet
  362 + if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {
  363 + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);
  364 + continue;
  365 + }
  366 + srs_info("mpegts: parse ts packet completed");
  367 + }
  368 + srs_info("mpegts: parse udp packet completed");
  369 + }
  370 +
  371 + return ret;
  372 +}
  373 +
  374 +SrsIngestSrsInput::SrsTsPiece* SrsIngestSrsInput::find_ts(string url)
  375 +{
  376 + std::vector<SrsTsPiece*>::iterator it;
  377 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  378 + SrsTsPiece* tp = *it;
  379 + if (tp->url == url) {
  380 + return tp;
  381 + }
  382 + }
  383 + return NULL;
  384 +}
  385 +
  386 +void SrsIngestSrsInput::dirty_all_ts()
  387 +{
  388 + std::vector<SrsTsPiece*>::iterator it;
  389 + for (it = pieces.begin(); it != pieces.end(); ++it) {
  390 + SrsTsPiece* tp = *it;
  391 + tp->dirty = true;
  392 + }
  393 +}
  394 +
  395 +void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client)
  396 +{
  397 + int ret = ERROR_SUCCESS;
  398 +
  399 + for (int i = 0; i < (int)pieces.size(); i++) {
  400 + SrsTsPiece* tp = pieces.at(i);
  401 +
  402 + // when skipped, ignore.
  403 + if (tp->skip) {
  404 + continue;
  405 + }
  406 +
  407 + // for the fresh m3u8, skip except the last one.
  408 + if (fresh_m3u8 && i != (int)pieces.size() - 1) {
  409 + tp->skip = true;
  410 + continue;
  411 + }
  412 +
  413 + if ((ret = tp->fetch(in_hls->get_url(), client)) != ERROR_SUCCESS) {
  414 + srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret);
  415 + tp->skip = true;
  416 + continue;
  417 + }
  418 +
  419 + // set the next connect time.
  420 + if (next_connect_time <= 0) {
  421 + next_connect_time = srs_update_system_time_ms();
  422 + }
  423 + next_connect_time += (int)tp->duration * 1000;
  424 + }
  425 +}
  426 +
  427 +
  428 +void SrsIngestSrsInput::remove_dirty()
  429 +{
  430 + std::vector<SrsTsPiece*>::iterator it;
  431 + for (it = pieces.begin(); it != pieces.end();) {
  432 + SrsTsPiece* tp = *it;
  433 +
  434 + if (tp->dirty) {
  435 + srs_freep(tp);
  436 + it = pieces.erase(it);
  437 + } else {
  438 + ++it;
  439 + }
  440 + }
  441 +}
  442 +
  443 +int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client)
  444 +{
  445 + int ret = ERROR_SUCCESS;
  446 +
  447 + if (skip || sent || !body.empty()) {
  448 + return ret;
  449 + }
  450 +
  451 + size_t pos = string::npos;
  452 +
  453 + bool use_abs_client = false;
  454 + SrsHttpClient abs_client;
  455 +
  456 + std::string ts_url = url;
  457 + if (!srs_string_starts_with(ts_url, "http://")) {
  458 + std::string baseurl = m3u8;
  459 + if ((pos = m3u8.rfind("/")) != string::npos) {
  460 + baseurl = m3u8.substr(0, pos);
  461 + }
  462 + ts_url = baseurl + "/" + url;
  463 +
  464 + // use fresh client for absolute url.
  465 + client = &abs_client;
  466 + use_abs_client = true;
  467 + }
  468 +
  469 + SrsHttpUri uri;
  470 + if ((ret = uri.initialize(ts_url)) != ERROR_SUCCESS) {
  471 + return ret;
  472 + }
  473 +
  474 + // initialize the fresh http client.
  475 + if (use_abs_client && (ret = client->initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) {
  476 + return ret;
  477 + }
  478 +
  479 + SrsHttpMessage* msg = NULL;
  480 + if ((ret = client->get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
  481 + srs_error("HTTP GET %s failed. ret=%d", uri.get_url(), ret);
  482 + return ret;
  483 + }
  484 +
  485 + srs_assert(msg);
  486 + SrsAutoFree(SrsHttpMessage, msg);
  487 +
  488 + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
  489 + srs_error("read ts failed. ret=%d", ret);
  490 + return ret;
  491 + }
  492 +
  493 + srs_trace("fetch ts ok, duration=%.2f, url=%s, body=%dB", duration, url.c_str(), body.length());
  494 +
  495 + return ret;
  496 +}
  497 +
  498 +// the context to output to rtmp server
  499 +class SrsIngestSrsOutput : public ISrsTsHandler
  500 +{
  501 +private:
  502 + SrsHttpUri* out_rtmp;
  503 +private:
  504 + SrsRequest* req;
  505 + st_netfd_t stfd;
  506 + SrsStSocket* io;
  507 + SrsRtmpClient* client;
  508 + int stream_id;
  509 +private:
  510 + SrsRawH264Stream* avc;
  511 + std::string h264_sps;
  512 + bool h264_sps_changed;
  513 + std::string h264_pps;
  514 + bool h264_pps_changed;
  515 + bool h264_sps_pps_sent;
  516 +private:
  517 + SrsRawAacStream* aac;
  518 + std::string aac_specific_config;
  519 +public:
  520 + SrsIngestSrsOutput(SrsHttpUri* rtmp) {
  521 + out_rtmp = rtmp;
  522 +
  523 + req = NULL;
  524 + io = NULL;
  525 + client = NULL;
  526 + stfd = NULL;
  527 + stream_id = 0;
  528 +
  529 + avc = new SrsRawH264Stream();
  530 + aac = new SrsRawAacStream();
  531 + h264_sps_changed = false;
  532 + h264_pps_changed = false;
  533 + h264_sps_pps_sent = false;
  534 + }
  535 + virtual ~SrsIngestSrsOutput() {
  536 + close();
  537 +
  538 + srs_freep(avc);
  539 + srs_freep(aac);
  540 + }
  541 +// interface ISrsTsHandler
  542 +public:
  543 + virtual int on_ts_message(SrsTsMessage* msg);
  544 +private:
  545 + virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs);
  546 + virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts);
  547 + virtual int write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts);
  548 + virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs);
  549 + virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts);
  550 +private:
  551 + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
  552 +public:
  553 + /**
  554 + * connect to output rtmp server.
  555 + */
  556 + virtual int connect();
  557 +private:
  558 + virtual int connect_app(std::string ep_server, std::string ep_port);
  559 + // close the connected io and rtmp to ready to be re-connect.
  560 + virtual void close();
  561 +};
  562 +
  563 +int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
  564 +{
  565 + int ret = ERROR_SUCCESS;
  566 +
  567 + // about the bytes of msg, specified by elementary stream which indicates by PES_packet_data_byte and stream_id
  568 + // for example, when SrsTsStream of SrsTsChannel indicates stream_type is SrsTsStreamVideoMpeg4 and SrsTsStreamAudioMpeg4,
  569 + // the elementary stream can be mux in "2.11 Carriage of ISO/IEC 14496 data" in hls-mpeg-ts-iso13818-1.pdf, page 103
  570 + // @remark, the most popular stream_id is 0xe0 for h.264 over mpegts, which indicates the stream_id is video and
  571 + // stream_number is 0, where I guess the elementary is specified in annexb format(H.264-AVC-ISO_IEC_14496-10.pdf, page 211).
  572 + // because when audio stream_number is 0, the elementary is ADTS(aac-mp4a-format-ISO_IEC_14496-3+2001.pdf, page 75, 1.A.2.2 ADTS).
  573 +
  574 + // about the bytes of PES_packet_data_byte, defined in hls-mpeg-ts-iso13818-1.pdf, page 58
  575 + // PES_packet_data_byte ¨C PES_packet_data_bytes shall be contiguous bytes of data from the elementary stream
  576 + // indicated by the packet¡¯s stream_id or PID. When the elementary stream data conforms to ITU-T
  577 + // Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 13818-3, the PES_packet_data_bytes shall be byte aligned to the bytes of this
  578 + // Recommendation | International Standard. The byte-order of the elementary stream shall be preserved. The number of
  579 + // PES_packet_data_bytes, N, is specified by the PES_packet_length field. N shall be equal to the value indicated in the
  580 + // PES_packet_length minus the number of bytes between the last byte of the PES_packet_length field and the first
  581 + // PES_packet_data_byte.
  582 + //
  583 + // In the case of a private_stream_1, private_stream_2, ECM_stream, or EMM_stream, the contents of the
  584 + // PES_packet_data_byte field are user definable and will not be specified by ITU-T | ISO/IEC in the future.
  585 +
  586 + // about the bytes of stream_id, define in hls-mpeg-ts-iso13818-1.pdf, page 49
  587 + // stream_id ¨C In Program Streams, the stream_id specifies the type and number of the elementary stream as defined by the
  588 + // stream_id Table 2-18. In Transport Streams, the stream_id may be set to any valid value which correctly describes the
  589 + // elementary stream type as defined in Table 2-18. In Transport Streams, the elementary stream type is specified in the
  590 + // Program Specific Information as specified in 2.4.4.
  591 +
  592 + // about the stream_id table, define in Table 2-18 ¨C Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52.
  593 + //
  594 + // 110x xxxx
  595 + // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC
  596 + // 14496-3 audio stream number x xxxx
  597 + // ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudio
  598 + //
  599 + // 1110 xxxx
  600 + // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC
  601 + // 14496-2 video stream number xxxx
  602 + // ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo
  603 +
  604 + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
  605 + (msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(),
  606 + msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid,
  607 + msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
  608 +
  609 + // when not audio/video, or not adts/annexb format, donot support.
  610 + if (msg->stream_number() != 0) {
  611 + ret = ERROR_STREAM_CASTER_TS_ES;
  612 + srs_error("mpegts: unsupported stream format, sid=%#x(%s-%d). ret=%d",
  613 + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number(), ret);
  614 + return ret;
  615 + }
  616 +
  617 + // check supported codec
  618 + if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) {
  619 + ret = ERROR_STREAM_CASTER_TS_CODEC;
  620 + srs_error("mpegts: unsupported stream codec=%d. ret=%d", msg->channel->stream, ret);
  621 + return ret;
  622 + }
  623 +
  624 + // parse the stream.
  625 + SrsStream avs;
  626 + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
  627 + srs_error("mpegts: initialize av stream failed. ret=%d", ret);
  628 + return ret;
  629 + }
  630 +
  631 + // publish audio or video.
  632 + if (msg->channel->stream == SrsTsStreamVideoH264) {
  633 + return on_ts_video(msg, &avs);
  634 + }
  635 + if (msg->channel->stream == SrsTsStreamAudioAAC) {
  636 + return on_ts_audio(msg, &avs);
  637 + }
  638 +
  639 + // TODO: FIXME: implements it.
  640 + return ret;
  641 +}
  642 +
  643 +int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
  644 +{
  645 + int ret = ERROR_SUCCESS;
  646 +
  647 + // ensure rtmp connected.
  648 + if ((ret = connect()) != ERROR_SUCCESS) {
  649 + return ret;
  650 + }
  651 +
  652 + // ts tbn to flv tbn.
  653 + u_int32_t dts = (u_int32_t)(msg->dts / 90);
  654 + u_int32_t pts = (u_int32_t)(msg->dts / 90);
  655 +
  656 + // the whole ts pes video packet must be a flv frame packet.
  657 + char* ibpframe = avs->data() + avs->pos();
  658 + int ibpframe_size = avs->size() - avs->pos();
  659 +
  660 + // send each frame.
  661 + while (!avs->empty()) {
  662 + char* frame = NULL;
  663 + int frame_size = 0;
  664 + if ((ret = avc->annexb_demux(avs, &frame, &frame_size)) != ERROR_SUCCESS) {
  665 + return ret;
  666 + }
  667 +
  668 + // ignore invalid frame,
  669 + // * atleast 1bytes for SPS to decode the type
  670 + // * ignore the auth bytes '09f0'
  671 + if (frame_size <= 2) {
  672 + continue;
  673 + }
  674 +
  675 + // for sps
  676 + if (avc->is_sps(frame, frame_size)) {
  677 + std::string sps;
  678 + if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) {
  679 + return ret;
  680 + }
  681 +
  682 + if (h264_sps == sps) {
  683 + continue;
  684 + }
  685 + h264_sps_changed = true;
  686 + h264_sps = sps;
  687 +
  688 + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
  689 + return ret;
  690 + }
  691 + continue;
  692 + }
  693 +
  694 + // for pps
  695 + if (avc->is_pps(frame, frame_size)) {
  696 + std::string pps;
  697 + if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) {
  698 + return ret;
  699 + }
  700 +
  701 + if (h264_pps == pps) {
  702 + continue;
  703 + }
  704 + h264_pps_changed = true;
  705 + h264_pps = pps;
  706 +
  707 + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
  708 + return ret;
  709 + }
  710 + continue;
  711 + }
  712 +
  713 + break;
  714 + }
  715 +
  716 + // ibp frame.
  717 + srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", ibpframe_size, dts);
  718 + return write_h264_ipb_frame(ibpframe, ibpframe_size, dts, pts);
  719 +}
  720 +
  721 +int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
  722 +{
  723 + int ret = ERROR_SUCCESS;
  724 +
  725 + // only send when both sps and pps changed.
  726 + if (!h264_sps_changed || !h264_pps_changed) {
  727 + return ret;
  728 + }
  729 +
  730 + // h264 raw to h264 packet.
  731 + std::string sh;
  732 + if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) {
  733 + return ret;
  734 + }
  735 +
  736 + // h264 packet to flv packet.
  737 + int8_t frame_type = SrsCodecVideoAVCFrameKeyFrame;
  738 + int8_t avc_packet_type = SrsCodecVideoAVCTypeSequenceHeader;
  739 + char* flv = NULL;
  740 + int nb_flv = 0;
  741 + if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
  742 + return ret;
  743 + }
  744 +
  745 + // the timestamp in rtmp message header is dts.
  746 + u_int32_t timestamp = dts;
  747 + if ((ret = rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) {
  748 + return ret;
  749 + }
  750 +
  751 + // reset sps and pps.
  752 + h264_sps_changed = false;
  753 + h264_pps_changed = false;
  754 + h264_sps_pps_sent = true;
  755 +
  756 + return ret;
  757 +}
  758 +
  759 +int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts)
  760 +{
  761 + int ret = ERROR_SUCCESS;
  762 +
  763 + // when sps or pps not sent, ignore the packet.
  764 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/203
  765 + if (!h264_sps_pps_sent) {
  766 + return ERROR_H264_DROP_BEFORE_SPS_PPS;
  767 + }
  768 +
  769 + // 5bits, 7.3.1 NAL unit syntax,
  770 + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44.
  771 + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
  772 + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
  773 +
  774 + // for IDR frame, the frame is keyframe.
  775 + SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame;
  776 + if (nal_unit_type == SrsAvcNaluTypeIDR) {
  777 + frame_type = SrsCodecVideoAVCFrameKeyFrame;
  778 + }
  779 +
  780 + std::string ibp;
  781 + if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) {
  782 + return ret;
  783 + }
  784 +
  785 + int8_t avc_packet_type = SrsCodecVideoAVCTypeNALU;
  786 + char* flv = NULL;
  787 + int nb_flv = 0;
  788 + if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
  789 + return ret;
  790 + }
  791 +
  792 + // the timestamp in rtmp message header is dts.
  793 + u_int32_t timestamp = dts;
  794 + return rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv);
  795 +}
  796 +
  797 +int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
  798 +{
  799 + int ret = ERROR_SUCCESS;
  800 +
  801 + // ensure rtmp connected.
  802 + if ((ret = connect()) != ERROR_SUCCESS) {
  803 + return ret;
  804 + }
  805 +
  806 + // ts tbn to flv tbn.
  807 + u_int32_t dts = (u_int32_t)(msg->dts / 90);
  808 +
  809 + // send each frame.
  810 + while (!avs->empty()) {
  811 + char* frame = NULL;
  812 + int frame_size = 0;
  813 + SrsRawAacStreamCodec codec;
  814 + if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
  815 + return ret;
  816 + }
  817 +
  818 + // ignore invalid frame,
  819 + // * atleast 1bytes for aac to decode the data.
  820 + if (frame_size <= 0) {
  821 + continue;
  822 + }
  823 + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts);
  824 +
  825 + // generate sh.
  826 + if (aac_specific_config.empty()) {
  827 + std::string sh;
  828 + if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
  829 + return ret;
  830 + }
  831 + aac_specific_config = sh;
  832 +
  833 + codec.aac_packet_type = 0;
  834 +
  835 + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) {
  836 + return ret;
  837 + }
  838 + }
  839 +
  840 + // audio raw data.
  841 + codec.aac_packet_type = 1;
  842 + if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) {
  843 + return ret;
  844 + }
  845 + }
  846 +
  847 + return ret;
  848 +}
  849 +
  850 +int SrsIngestSrsOutput::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts)
  851 +{
  852 + int ret = ERROR_SUCCESS;
  853 +
  854 + char* data = NULL;
  855 + int size = 0;
  856 + if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) {
  857 + return ret;
  858 + }
  859 +
  860 + return rtmp_write_packet(SrsCodecFlvTagAudio, dts, data, size);
  861 +}
  862 +
  863 +int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
  864 +{
  865 + int ret = ERROR_SUCCESS;
  866 +
  867 + SrsSharedPtrMessage* msg = NULL;
  868 +
  869 + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
  870 + srs_error("mpegts: create shared ptr msg failed. ret=%d", ret);
  871 + return ret;
  872 + }
  873 + srs_assert(msg);
  874 +
  875 + // send out encoded msg.
  876 + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
  877 + return ret;
  878 + }
  879 +
  880 + return ret;
  881 +}
  882 +
  883 +int SrsIngestSrsOutput::connect()
  884 +{
  885 + int ret = ERROR_SUCCESS;
  886 +
  887 + // when ok, ignore.
  888 + // TODO: FIXME: should reconnect when disconnected.
  889 + if (io || client) {
  890 + return ret;
  891 + }
  892 +
  893 + // parse uri
  894 + if (!req) {
  895 + req = new SrsRequest();
  896 +
  897 + size_t pos = string::npos;
  898 + string uri = req->tcUrl = out_rtmp->get_url();
  899 +
  900 + // tcUrl, stream
  901 + if ((pos = uri.rfind("/")) != string::npos) {
  902 + req->stream = uri.substr(pos + 1);
  903 + req->tcUrl = uri = uri.substr(0, pos);
  904 + }
  905 +
  906 + srs_discovery_tc_url(req->tcUrl,
  907 + req->schema, req->host, req->vhost, req->app, req->port,
  908 + req->param);
  909 + }
  910 +
  911 + // connect host.
  912 + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
  913 + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret);
  914 + return ret;
  915 + }
  916 + io = new SrsStSocket(stfd);
  917 + client = new SrsRtmpClient(io);
  918 +
  919 + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  920 + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  921 +
  922 + // connect to vhost/app
  923 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  924 + srs_error("mpegts: handshake with server failed. ret=%d", ret);
  925 + return ret;
  926 + }
  927 + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
  928 + srs_error("mpegts: connect with server failed. ret=%d", ret);
  929 + return ret;
  930 + }
  931 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  932 + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  933 + return ret;
  934 + }
  935 +
  936 + // publish.
  937 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  938 + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
  939 + req->stream.c_str(), stream_id, ret);
  940 + return ret;
  941 + }
  942 +
  943 + return ret;
  944 +}
  945 +
  946 +// TODO: FIXME: refine the connect_app.
  947 +int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port)
  948 +{
  949 + int ret = ERROR_SUCCESS;
  950 +
  951 + // args of request takes the srs info.
  952 + if (req->args == NULL) {
  953 + req->args = SrsAmf0Any::object();
  954 + }
  955 +
  956 + // notify server the edge identity,
  957 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/147
  958 + SrsAmf0Object* data = req->args;
  959 + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
  960 + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  961 + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
  962 + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
  963 + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
  964 + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
  965 + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
  966 + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
  967 + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
  968 + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
  969 + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
  970 + // for edge to directly get the id of client.
  971 + data->set("srs_pid", SrsAmf0Any::number(getpid()));
  972 + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
  973 +
  974 + // local ip of edge
  975 + std::vector<std::string> ips = srs_get_local_ipv4_ips();
  976 + assert(0 < (int)ips.size());
  977 + std::string local_ip = ips[0];
  978 + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
  979 +
  980 + // generate the tcUrl
  981 + std::string param = "";
  982 + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
  983 +
  984 + // upnode server identity will show in the connect_app of client.
  985 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/160
  986 + // the debug_srs_upnode is config in vhost and default to true.
  987 + bool debug_srs_upnode = true;
  988 + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
  989 + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
  990 + tc_url.c_str(), debug_srs_upnode, ret);
  991 + return ret;
  992 + }
  993 +
  994 + return ret;
  995 +}
  996 +
  997 +void SrsIngestSrsOutput::close()
  998 +{
  999 + srs_freep(client);
  1000 + srs_freep(io);
  1001 + srs_freep(req);
  1002 + srs_close_stfd(stfd);
  1003 +}
  1004 +
  1005 +// the context for ingest hls stream.
  1006 +class SrsIngestSrsContext
  1007 +{
  1008 +private:
  1009 + SrsIngestSrsInput* ic;
  1010 + SrsIngestSrsOutput* oc;
  1011 +public:
  1012 + SrsIngestSrsContext(SrsHttpUri* hls, SrsHttpUri* rtmp) {
  1013 + ic = new SrsIngestSrsInput(hls);
  1014 + oc = new SrsIngestSrsOutput(rtmp);
  1015 + }
  1016 + virtual ~SrsIngestSrsContext() {
  1017 + srs_freep(ic);
  1018 + srs_freep(oc);
  1019 + }
  1020 + virtual int proxy() {
  1021 + int ret = ERROR_SUCCESS;
  1022 +
  1023 + if ((ret = ic->connect()) != ERROR_SUCCESS) {
  1024 + srs_warn("connect oc failed. ret=%d", ret);
  1025 + return ret;
  1026 + }
  1027 +
  1028 + if ((ret = oc->connect()) != ERROR_SUCCESS) {
  1029 + srs_warn("connect ic failed. ret=%d", ret);
  1030 + return ret;
  1031 + }
  1032 +
  1033 + if ((ret = ic->parse(oc)) != ERROR_SUCCESS) {
  1034 + srs_warn("proxy ts to rtmp failed. ret=%d", ret);
  1035 + return ret;
  1036 + }
  1037 +
  1038 + return ret;
  1039 + }
  1040 +};
  1041 +
  1042 +int proxy_hls2rtmp(string hls, string rtmp)
  1043 +{
  1044 + int ret = ERROR_SUCCESS;
  1045 +
  1046 + // init st.
  1047 + if ((ret = srs_init_st()) != ERROR_SUCCESS) {
  1048 + srs_error("init st failed. ret=%d", ret);
  1049 + return ret;
  1050 + }
  1051 +
  1052 + SrsHttpUri hls_uri, rtmp_uri;
  1053 + if ((ret = hls_uri.initialize(hls)) != ERROR_SUCCESS) {
  1054 + srs_error("hls uri invalid. ret=%d", ret);
  1055 + return ret;
  1056 + }
  1057 + if ((ret = rtmp_uri.initialize(rtmp)) != ERROR_SUCCESS) {
  1058 + srs_error("rtmp uri invalid. ret=%d", ret);
  1059 + return ret;
  1060 + }
  1061 +
  1062 + SrsIngestSrsContext context(&hls_uri, &rtmp_uri);
  1063 + for (;;) {
  1064 + if ((ret = context.proxy()) == ERROR_SUCCESS) {
  1065 + continue;
  1066 + }
  1067 +
  1068 + srs_warn("proxy hls to rtmp failed. ret=%d", ret);
  1069 + st_usleep(SRS_INGEST_HLS_ERROR_RETRY_MS * 1000);
  1070 + }
  1071 +
  1072 + return ret;
  1073 +}
  1074 +