winlin

support ingest hls with m3u8 in m3u8.

@@ -254,6 +254,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -254,6 +254,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
254 #define ERROR_HTTP_RESPONSE_EOF 4025 254 #define ERROR_HTTP_RESPONSE_EOF 4025
255 #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 255 #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026
256 #define ERROR_AVC_NALU_UEV 4027 256 #define ERROR_AVC_NALU_UEV 4027
  257 +#define ERROR_AAC_BYTES_INVALID 4028
257 258
258 /////////////////////////////////////////////////////// 259 ///////////////////////////////////////////////////////
259 // user-define error. 260 // user-define error.
@@ -278,6 +278,11 @@ bool srs_string_starts_with(string str, string flag) @@ -278,6 +278,11 @@ bool srs_string_starts_with(string str, string flag)
278 return str.find(flag) == 0; 278 return str.find(flag) == 0;
279 } 279 }
280 280
  281 +bool srs_string_contains(string str, string flag)
  282 +{
  283 + return str.find(flag) != string::npos;
  284 +}
  285 +
281 int srs_do_create_dir_recursively(string dir) 286 int srs_do_create_dir_recursively(string dir)
282 { 287 {
283 int ret = ERROR_SUCCESS; 288 int ret = ERROR_SUCCESS;
@@ -356,6 +361,22 @@ string srs_path_dirname(string path) @@ -356,6 +361,22 @@ string srs_path_dirname(string path)
356 return dirname; 361 return dirname;
357 } 362 }
358 363
  364 +string srs_path_basename(string path)
  365 +{
  366 + std::string dirname = path;
  367 + size_t pos = string::npos;
  368 +
  369 + if ((pos = dirname.rfind("/")) != string::npos) {
  370 + // the basename("/") is "/"
  371 + if (dirname.length() == 1) {
  372 + return dirname;
  373 + }
  374 + dirname = dirname.substr(pos + 1);
  375 + }
  376 +
  377 + return dirname;
  378 +}
  379 +
359 bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code) 380 bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code)
360 { 381 {
361 char* bytes = stream->data() + stream->pos(); 382 char* bytes = stream->data() + stream->pos();
@@ -67,6 +67,8 @@ extern std::string srs_string_remove(std::string str, std::string remove_chars); @@ -67,6 +67,8 @@ extern std::string srs_string_remove(std::string str, std::string remove_chars);
67 extern bool srs_string_ends_with(std::string str, std::string flag); 67 extern bool srs_string_ends_with(std::string str, std::string flag);
68 // whether string starts with 68 // whether string starts with
69 extern bool srs_string_starts_with(std::string str, std::string flag); 69 extern bool srs_string_starts_with(std::string str, std::string flag);
  70 +// whether string contains with
  71 +extern bool srs_string_contains(std::string str, std::string flag);
70 72
71 // create dir recursively 73 // create dir recursively
72 extern int srs_create_dir_recursively(std::string dir); 74 extern int srs_create_dir_recursively(std::string dir);
@@ -75,6 +77,8 @@ extern int srs_create_dir_recursively(std::string dir); @@ -75,6 +77,8 @@ extern int srs_create_dir_recursively(std::string dir);
75 extern bool srs_path_exists(std::string path); 77 extern bool srs_path_exists(std::string path);
76 // get the dirname of path 78 // get the dirname of path
77 extern std::string srs_path_dirname(std::string path); 79 extern std::string srs_path_dirname(std::string path);
  80 +// get the basename of path
  81 +extern std::string srs_path_basename(std::string path);
78 82
79 /** 83 /**
80 * whether stream starts with the avc NALU in "AnnexB" 84 * whether stream starts with the avc NALU in "AnnexB"
@@ -128,6 +128,16 @@ int main(int argc, char** argv) @@ -128,6 +128,16 @@ int main(int argc, char** argv)
128 return proxy_hls2rtmp(in_hls_url, out_rtmp_url); 128 return proxy_hls2rtmp(in_hls_url, out_rtmp_url);
129 } 129 }
130 130
  131 +class ISrsAacHandler
  132 +{
  133 +public:
  134 + /**
  135 + * handle the aac frame, which in ADTS format(starts with FFFx).
  136 + * @param duration the duration in seconds of frames.
  137 +*/
  138 +virtual int on_aac_frame(char* frame, int frame_size, double duration) = 0;
  139 +};
  140 +
131 // the context to ingest hls stream. 141 // the context to ingest hls stream.
132 class SrsIngestSrsInput 142 class SrsIngestSrsInput
133 { 143 {
@@ -185,9 +195,18 @@ public: @@ -185,9 +195,18 @@ public:
185 /** 195 /**
186 * parse the ts and use hanler to process the message. 196 * parse the ts and use hanler to process the message.
187 */ 197 */
188 - virtual int parse(ISrsTsHandler* handler); 198 + virtual int parse(ISrsTsHandler* ts, ISrsAacHandler* aac);
189 private: 199 private:
190 /** 200 /**
  201 + * parse the ts pieces body.
  202 + */
  203 + virtual int parseAac(ISrsAacHandler* handler, char* body, int nb_body, double duration);
  204 + virtual int parseTs(ISrsTsHandler* handler, char* body, int nb_body);
  205 + /**
  206 + * parse the m3u8 specified by url.
  207 + */
  208 + virtual int parseM3u8(SrsHttpUri* url, double& td, double& duration);
  209 + /**
191 * find the ts piece by its url. 210 * find the ts piece by its url.
192 */ 211 */
193 virtual SrsTsPiece* find_ts(string url); 212 virtual SrsTsPiece* find_ts(string url);
@@ -215,17 +234,164 @@ int SrsIngestSrsInput::connect() @@ -215,17 +234,164 @@ int SrsIngestSrsInput::connect()
215 st_usleep((next_connect_time - now) * 1000); 234 st_usleep((next_connect_time - now) * 1000);
216 } 235 }
217 236
  237 + // set all ts to dirty.
  238 + dirty_all_ts();
  239 +
  240 + bool fresh_m3u8 = pieces.empty();
  241 + double td = 0.0;
  242 + double duration = 0.0;
  243 + if ((ret = parseM3u8(in_hls, td, duration)) != ERROR_SUCCESS) {
  244 + return ret;
  245 + }
  246 +
  247 + // fetch all ts.
  248 + fetch_all_ts(fresh_m3u8);
  249 +
  250 + // remove all dirty ts.
  251 + remove_dirty();
  252 +
  253 + srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size());
  254 +
  255 + return ret;
  256 +}
  257 +
  258 +int SrsIngestSrsInput::parse(ISrsTsHandler* ts, ISrsAacHandler* aac)
  259 +{
  260 + int ret = ERROR_SUCCESS;
  261 +
  262 + for (int i = 0; i < (int)pieces.size(); i++) {
  263 + SrsTsPiece* tp = pieces.at(i);
  264 +
  265 + // sent only once.
  266 + if (tp->sent) {
  267 + continue;
  268 + }
  269 + tp->sent = true;
  270 +
  271 + if (tp->body.empty()) {
  272 + continue;
  273 + }
  274 +
  275 + srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration);
  276 +
  277 + if (srs_string_ends_with(tp->url, ".ts")) {
  278 + if ((ret = parseTs(ts, (char*)tp->body.data(), (int)tp->body.length())) != ERROR_SUCCESS) {
  279 + return ret;
  280 + }
  281 + } else if (srs_string_ends_with(tp->url, ".aac")) {
  282 + if ((ret = parseAac(aac, (char*)tp->body.data(), (int)tp->body.length(), tp->duration)) != ERROR_SUCCESS) {
  283 + return ret;
  284 + }
  285 + } else {
  286 + srs_warn("ignore unkown piece %s", tp->url.c_str());
  287 + }
  288 + }
  289 +
  290 + return ret;
  291 +}
  292 +
  293 +int SrsIngestSrsInput::parseTs(ISrsTsHandler* handler, char* body, int nb_body)
  294 +{
  295 + int ret = ERROR_SUCCESS;
  296 +
  297 + // use stream to parse ts packet.
  298 + int nb_packet = (int)nb_body / SRS_TS_PACKET_SIZE;
  299 + for (int i = 0; i < nb_packet; i++) {
  300 + char* p = (char*)body + (i * SRS_TS_PACKET_SIZE);
  301 + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
  302 + return ret;
  303 + }
  304 +
  305 + // process each ts packet
  306 + if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {
  307 + // when peer closed, must interrupt parse and reconnect.
  308 + if (srs_is_client_gracefully_close(ret)) {
  309 + srs_warn("interrupt parse for peer closed. ret=%d", ret);
  310 + return ret;
  311 + }
  312 +
  313 + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);
  314 + continue;
  315 + }
  316 + srs_info("mpegts: parse ts packet completed");
  317 + }
  318 + srs_info("mpegts: parse udp packet completed");
  319 +
  320 + return ret;
  321 +}
  322 +
  323 +int SrsIngestSrsInput::parseAac(ISrsAacHandler* handler, char* body, int nb_body, double duration)
  324 +{
  325 + int ret = ERROR_SUCCESS;
  326 +
  327 + if ((ret = stream->initialize(body, nb_body)) != ERROR_SUCCESS) {
  328 + return ret;
  329 + }
  330 +
  331 + // atleast 2bytes.
  332 + if (!stream->require(3)) {
  333 + ret = ERROR_AAC_BYTES_INVALID;
  334 + srs_error("invalid aac, atleast 3bytes. ret=%d", ret);
  335 + return ret;
  336 + }
  337 +
  338 + u_int8_t id0 = (u_int8_t)body[0];
  339 + u_int8_t id1 = (u_int8_t)body[1];
  340 + u_int8_t id2 = (u_int8_t)body[2];
  341 +
  342 + // skip ID3.
  343 + if (id0 == 0x49 && id1 == 0x44 && id2 == 0x33) {
  344 + /*char id3[] = {
  345 + (char)0x49, (char)0x44, (char)0x33, // ID3
  346 + (char)0x03, (char)0x00, // version
  347 + (char)0x00, // flags
  348 + (char)0x00, (char)0x00, (char)0x00, (char)0x0a, // size
  349 +
  350 + (char)0x00, (char)0x00, (char)0x00, (char)0x00, // FrameID
  351 + (char)0x00, (char)0x00, (char)0x00, (char)0x00, // FrameSize
  352 + (char)0x00, (char)0x00 // Flags
  353 + };*/
  354 + // atleast 10 bytes.
  355 + if (!stream->require(10)) {
  356 + ret = ERROR_AAC_BYTES_INVALID;
  357 + srs_error("invalid aac ID3, atleast 10bytes. ret=%d", ret);
  358 + return ret;
  359 + }
  360 +
  361 + // ignore ID3 + version + flag.
  362 + stream->skip(6);
  363 + // read the size of ID3.
  364 + u_int32_t nb_id3 = stream->read_4bytes();
  365 +
  366 + // read body of ID3
  367 + if (!stream->require(nb_id3)) {
  368 + ret = ERROR_AAC_BYTES_INVALID;
  369 + srs_error("invalid aac ID3 body, required %dbytes. ret=%d", nb_id3, ret);
  370 + return ret;
  371 + }
  372 + stream->skip(nb_id3);
  373 + }
  374 +
  375 + char* frame = body + stream->pos();
  376 + int frame_size = nb_body - stream->pos();
  377 + return handler->on_aac_frame(frame, frame_size, duration);
  378 +}
  379 +
  380 +int SrsIngestSrsInput::parseM3u8(SrsHttpUri* url, double& td, double& duration)
  381 +{
  382 + int ret = ERROR_SUCCESS;
  383 +
218 SrsHttpClient client; 384 SrsHttpClient client;
219 - srs_trace("parse input hls %s", in_hls->get_url()); 385 + srs_trace("parse input hls %s", url->get_url());
220 386
221 - if ((ret = client.initialize(in_hls->get_host(), in_hls->get_port())) != ERROR_SUCCESS) { 387 + if ((ret = client.initialize(url->get_host(), url->get_port())) != ERROR_SUCCESS) {
222 srs_error("connect to server failed. ret=%d", ret); 388 srs_error("connect to server failed. ret=%d", ret);
223 return ret; 389 return ret;
224 } 390 }
225 391
226 SrsHttpMessage* msg = NULL; 392 SrsHttpMessage* msg = NULL;
227 - if ((ret = client.get(in_hls->get_path(), "", &msg)) != ERROR_SUCCESS) {  
228 - srs_error("HTTP GET %s failed. ret=%d", in_hls->get_url(), ret); 393 + if ((ret = client.get(url->get_path(), "", &msg)) != ERROR_SUCCESS) {
  394 + srs_error("HTTP GET %s failed. ret=%d", url->get_url(), ret);
229 return ret; 395 return ret;
230 } 396 }
231 397
@@ -243,13 +409,7 @@ int SrsIngestSrsInput::connect() @@ -243,13 +409,7 @@ int SrsIngestSrsInput::connect()
243 return ret; 409 return ret;
244 } 410 }
245 411
246 - // set all ts to dirty.  
247 - dirty_all_ts();  
248 -  
249 std::string ptl; 412 std::string ptl;
250 - double td = 0.0;  
251 - double duration = 0.0;  
252 - bool fresh_m3u8 = pieces.empty();  
253 while (!body.empty()) { 413 while (!body.empty()) {
254 size_t pos = string::npos; 414 size_t pos = string::npos;
255 415
@@ -293,6 +453,28 @@ int SrsIngestSrsInput::connect() @@ -293,6 +453,28 @@ int SrsIngestSrsInput::connect()
293 break; 453 break;
294 } 454 }
295 455
  456 + // #EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=73207,CODECS="mp4a.40.2"
  457 + if (srs_string_starts_with(line, "#EXT-X-STREAM-INF:")) {
  458 + if ((pos = body.find("\n")) == string::npos) {
  459 + srs_warn("m3u8 entry unexpected eof, inf=%s", line.c_str());
  460 + break;
  461 + }
  462 +
  463 + std::string m3u8_url = body.substr(0, pos);
  464 + body = body.substr(pos + 1);
  465 +
  466 + if (!srs_string_starts_with(m3u8_url, "http://")) {
  467 + m3u8_url = srs_path_dirname(url->get_url()) + "/" + m3u8_url;
  468 + }
  469 + srs_trace("parse sub m3u8, url=%s", m3u8_url.c_str());
  470 +
  471 + if ((ret = url->initialize(m3u8_url)) != ERROR_SUCCESS) {
  472 + return ret;
  473 + }
  474 +
  475 + return parseM3u8(url, td, duration);
  476 + }
  477 +
296 // #EXTINF:11.401, 478 // #EXTINF:11.401,
297 // livestream-5.ts 479 // livestream-5.ts
298 // parse each ts entry, expect current line is inf. 480 // parse each ts entry, expect current line is inf.
@@ -330,60 +512,6 @@ int SrsIngestSrsInput::connect() @@ -330,60 +512,6 @@ int SrsIngestSrsInput::connect()
330 } 512 }
331 } 513 }
332 514
333 - // fetch all ts.  
334 - fetch_all_ts(fresh_m3u8);  
335 -  
336 - // remove all dirty ts.  
337 - remove_dirty();  
338 -  
339 - srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size());  
340 -  
341 - return ret;  
342 -}  
343 -  
344 -int SrsIngestSrsInput::parse(ISrsTsHandler* handler)  
345 -{  
346 - int ret = ERROR_SUCCESS;  
347 -  
348 - for (int i = 0; i < (int)pieces.size(); i++) {  
349 - SrsTsPiece* tp = pieces.at(i);  
350 -  
351 - // sent only once.  
352 - if (tp->sent) {  
353 - continue;  
354 - }  
355 - tp->sent = true;  
356 -  
357 - if (tp->body.empty()) {  
358 - continue;  
359 - }  
360 -  
361 - srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration);  
362 -  
363 - // use stream to parse ts packet.  
364 - int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE;  
365 - for (int i = 0; i < nb_packet; i++) {  
366 - char* p = (char*)tp->body.data() + (i * SRS_TS_PACKET_SIZE);  
367 - if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {  
368 - return ret;  
369 - }  
370 -  
371 - // process each ts packet  
372 - if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {  
373 - // when peer closed, must interrupt parse and reconnect.  
374 - if (srs_is_client_gracefully_close(ret)) {  
375 - srs_warn("interrupt parse for peer closed. ret=%d", ret);  
376 - return ret;  
377 - }  
378 -  
379 - srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);  
380 - continue;  
381 - }  
382 - srs_info("mpegts: parse ts packet completed");  
383 - }  
384 - srs_info("mpegts: parse udp packet completed");  
385 - }  
386 -  
387 return ret; 515 return ret;
388 } 516 }
389 517
@@ -464,17 +592,11 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8) @@ -464,17 +592,11 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8)
464 return ret; 592 return ret;
465 } 593 }
466 594
467 - size_t pos = string::npos;  
468 -  
469 SrsHttpClient client; 595 SrsHttpClient client;
470 596
471 std::string ts_url = url; 597 std::string ts_url = url;
472 if (!srs_string_starts_with(ts_url, "http://")) { 598 if (!srs_string_starts_with(ts_url, "http://")) {
473 - std::string baseurl = m3u8;  
474 - if ((pos = m3u8.rfind("/")) != string::npos) {  
475 - baseurl = m3u8.substr(0, pos);  
476 - }  
477 - ts_url = baseurl + "/" + url; 599 + ts_url = srs_path_dirname(m3u8) + "/" + url;
478 } 600 }
479 601
480 SrsHttpUri uri; 602 SrsHttpUri uri;
@@ -507,13 +629,14 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8) @@ -507,13 +629,14 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8)
507 } 629 }
508 630
509 // the context to output to rtmp server 631 // the context to output to rtmp server
510 -class SrsIngestSrsOutput : public ISrsTsHandler 632 +class SrsIngestSrsOutput : virtual public ISrsTsHandler, virtual public ISrsAacHandler
511 { 633 {
512 private: 634 private:
513 SrsHttpUri* out_rtmp; 635 SrsHttpUri* out_rtmp;
514 private: 636 private:
515 bool disconnected; 637 bool disconnected;
516 std::multimap<int64_t, SrsTsMessage*> queue; 638 std::multimap<int64_t, SrsTsMessage*> queue;
  639 + int64_t raw_aac_dts;
517 private: 640 private:
518 SrsRequest* req; 641 SrsRequest* req;
519 st_netfd_t stfd; 642 st_netfd_t stfd;
@@ -534,6 +657,7 @@ public: @@ -534,6 +657,7 @@ public:
534 SrsIngestSrsOutput(SrsHttpUri* rtmp) { 657 SrsIngestSrsOutput(SrsHttpUri* rtmp) {
535 out_rtmp = rtmp; 658 out_rtmp = rtmp;
536 disconnected = false; 659 disconnected = false;
  660 + raw_aac_dts = 0;
537 661
538 req = NULL; 662 req = NULL;
539 io = NULL; 663 io = NULL;
@@ -563,7 +687,11 @@ public: @@ -563,7 +687,11 @@ public:
563 // interface ISrsTsHandler 687 // interface ISrsTsHandler
564 public: 688 public:
565 virtual int on_ts_message(SrsTsMessage* msg); 689 virtual int on_ts_message(SrsTsMessage* msg);
  690 +// interface IAacHandler
  691 +public:
  692 + virtual int on_aac_frame(char* frame, int frame_size, double duration);
566 private: 693 private:
  694 + virtual int do_on_aac_frame(SrsStream* avs, double duration);
567 virtual int parse_message_queue(); 695 virtual int parse_message_queue();
568 virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs); 696 virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs);
569 virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts); 697 virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts);
@@ -661,6 +789,76 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) @@ -661,6 +789,76 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
661 return ret; 789 return ret;
662 } 790 }
663 791
  792 +int SrsIngestSrsOutput::on_aac_frame(char* frame, int frame_size, double duration)
  793 +{
  794 + int ret = ERROR_SUCCESS;
  795 +
  796 + srs_trace("handle aac frames, size=%dB, duration=%.2f, dts=%"PRId64, frame_size, duration, raw_aac_dts);
  797 +
  798 + SrsStream stream;
  799 + if ((ret = stream.initialize(frame, frame_size)) != ERROR_SUCCESS) {
  800 + return ret;
  801 + }
  802 +
  803 + return do_on_aac_frame(&stream, duration);
  804 +}
  805 +
  806 +int SrsIngestSrsOutput::do_on_aac_frame(SrsStream* avs, double duration)
  807 +{
  808 + int ret = ERROR_SUCCESS;
  809 +
  810 + // ts tbn to flv tbn.
  811 + u_int32_t dts = (u_int32_t)raw_aac_dts;
  812 + raw_aac_dts += (int64_t)(duration * 1000);
  813 +
  814 + // got the next msg to calc the delta duration for each audio.
  815 + u_int32_t max_dts = dts + (u_int32_t)(duration * 1000);
  816 +
  817 + // send each frame.
  818 + while (!avs->empty()) {
  819 + char* frame = NULL;
  820 + int frame_size = 0;
  821 + SrsRawAacStreamCodec codec;
  822 + if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
  823 + return ret;
  824 + }
  825 +
  826 + // ignore invalid frame,
  827 + // * atleast 1bytes for aac to decode the data.
  828 + if (frame_size <= 0) {
  829 + continue;
  830 + }
  831 + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts);
  832 +
  833 + // generate sh.
  834 + if (aac_specific_config.empty()) {
  835 + std::string sh;
  836 + if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
  837 + return ret;
  838 + }
  839 + aac_specific_config = sh;
  840 +
  841 + codec.aac_packet_type = 0;
  842 +
  843 + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) {
  844 + return ret;
  845 + }
  846 + }
  847 +
  848 + // audio raw data.
  849 + codec.aac_packet_type = 1;
  850 + if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) {
  851 + return ret;
  852 + }
  853 +
  854 + // calc the delta of dts, when previous frame output.
  855 + u_int32_t delta = (duration * 1000) / (avs->size() / frame_size);
  856 + dts = (u_int32_t)(srs_min(max_dts, dts + delta));
  857 + }
  858 +
  859 + return ret;
  860 +}
  861 +
664 int SrsIngestSrsOutput::parse_message_queue() 862 int SrsIngestSrsOutput::parse_message_queue()
665 { 863 {
666 int ret = ERROR_SUCCESS; 864 int ret = ERROR_SUCCESS;
@@ -914,7 +1112,7 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs) @@ -914,7 +1112,7 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
914 // ts tbn to flv tbn. 1112 // ts tbn to flv tbn.
915 u_int32_t dts = (u_int32_t)(msg->dts / 90); 1113 u_int32_t dts = (u_int32_t)(msg->dts / 90);
916 1114
917 - // got the next video to calc the delta duration for each audio. 1115 + // got the next msg to calc the delta duration for each audio.
918 u_int32_t duration = 0; 1116 u_int32_t duration = 0;
919 if (!queue.empty()) { 1117 if (!queue.empty()) {
920 SrsTsMessage* nm = queue.begin()->second; 1118 SrsTsMessage* nm = queue.begin()->second;
@@ -992,6 +1190,8 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* @@ -992,6 +1190,8 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
992 } 1190 }
993 srs_assert(msg); 1191 srs_assert(msg);
994 1192
  1193 + srs_info("RTMP type=%d, dts=%d, size=%d", type, timestamp, size);
  1194 +
995 // send out encoded msg. 1195 // send out encoded msg.
996 if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { 1196 if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
997 return ret; 1197 return ret;
@@ -1016,13 +1216,12 @@ int SrsIngestSrsOutput::connect() @@ -1016,13 +1216,12 @@ int SrsIngestSrsOutput::connect()
1016 if (!req) { 1216 if (!req) {
1017 req = new SrsRequest(); 1217 req = new SrsRequest();
1018 1218
1019 - size_t pos = string::npos;  
1020 string uri = req->tcUrl = out_rtmp->get_url(); 1219 string uri = req->tcUrl = out_rtmp->get_url();
1021 1220
1022 // tcUrl, stream 1221 // tcUrl, stream
1023 - if ((pos = uri.rfind("/")) != string::npos) {  
1024 - req->stream = uri.substr(pos + 1);  
1025 - req->tcUrl = uri = uri.substr(0, pos); 1222 + if (srs_string_contains(uri, "/")) {
  1223 + req->stream = srs_path_basename(uri);
  1224 + req->tcUrl = uri = srs_path_dirname(uri);
1026 } 1225 }
1027 1226
1028 srs_discovery_tc_url(req->tcUrl, 1227 srs_discovery_tc_url(req->tcUrl,
@@ -1155,7 +1354,7 @@ public: @@ -1155,7 +1354,7 @@ public:
1155 return ret; 1354 return ret;
1156 } 1355 }
1157 1356
1158 - if ((ret = ic->parse(oc)) != ERROR_SUCCESS) { 1357 + if ((ret = ic->parse(oc, oc)) != ERROR_SUCCESS) {
1159 srs_warn("proxy ts to rtmp failed. ret=%d", ret); 1358 srs_warn("proxy ts to rtmp failed. ret=%d", ret);
1160 return ret; 1359 return ret;
1161 } 1360 }
@@ -362,9 +362,12 @@ int SrsRawAacStream::adts_demux(SrsStream* stream, char** pframe, int* pnb_frame @@ -362,9 +362,12 @@ int SrsRawAacStream::adts_demux(SrsStream* stream, char** pframe, int* pnb_frame
362 * and set to ‘0’ if the audio data are MPEG-4. See also ISO/IEC 11172-3, subclause 2.4.2.3. 362 * and set to ‘0’ if the audio data are MPEG-4. See also ISO/IEC 11172-3, subclause 2.4.2.3.
363 */ 363 */
364 if (id != 0x01) { 364 if (id != 0x01) {
365 - ret = ERROR_ADTS_ID_NOT_AAC;  
366 - srs_warn("adts: id must be 1(aac), actual 0(mp4a). ret=%d", ret);  
367 - return ret; 365 + srs_info("adts: id must be 1(aac), actual 0(mp4a). ret=%d", ret);
  366 +
  367 + // well, some system always use 0, but actually is aac format.
  368 + // for example, houjian vod ts always set the aac id to 0, actually 1.
  369 + // we just ignore it, and alwyas use 1(aac) to demux.
  370 + id = 0x01;
368 } 371 }
369 372
370 int16_t sfiv = stream->read_2bytes(); 373 int16_t sfiv = stream->read_2bytes();