winlin

refine hls ingester, quit when error.

@@ -48,9 +48,6 @@ using namespace std; @@ -48,9 +48,6 @@ using namespace std;
48 #include <srs_rtmp_amf0.hpp> 48 #include <srs_rtmp_amf0.hpp>
49 #include <srs_raw_avc.hpp> 49 #include <srs_raw_avc.hpp>
50 50
51 -// the retry timeout in ms.  
52 -#define SRS_INGEST_HLS_ERROR_RETRY_MS 3000  
53 -  
54 // pre-declare 51 // pre-declare
55 int proxy_hls2rtmp(std::string hls, std::string rtmp); 52 int proxy_hls2rtmp(std::string hls, std::string rtmp);
56 53
@@ -217,7 +214,7 @@ private: @@ -217,7 +214,7 @@ private:
217 /** 214 /**
218 * fetch all ts body. 215 * fetch all ts body.
219 */ 216 */
220 - virtual void fetch_all_ts(bool fresh_m3u8); 217 + virtual int fetch_all_ts(bool fresh_m3u8);
221 /** 218 /**
222 * remove all ts which is dirty. 219 * remove all ts which is dirty.
223 */ 220 */
@@ -245,7 +242,10 @@ int SrsIngestSrsInput::connect() @@ -245,7 +242,10 @@ int SrsIngestSrsInput::connect()
245 } 242 }
246 243
247 // fetch all ts. 244 // fetch all ts.
248 - fetch_all_ts(fresh_m3u8); 245 + if ((ret = fetch_all_ts(fresh_m3u8)) != ERROR_SUCCESS) {
  246 + srs_error("fetch all ts failed. ret=%d", ret);
  247 + return ret;
  248 + }
249 249
250 // remove all dirty ts. 250 // remove all dirty ts.
251 remove_dirty(); 251 remove_dirty();
@@ -304,14 +304,8 @@ int SrsIngestSrsInput::parseTs(ISrsTsHandler* handler, char* body, int nb_body) @@ -304,14 +304,8 @@ int SrsIngestSrsInput::parseTs(ISrsTsHandler* handler, char* body, int nb_body)
304 304
305 // process each ts packet 305 // process each ts packet
306 if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) { 306 if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {
307 - // when peer closed, must interrupt parse and reconnect.  
308 - if (srs_is_client_gracefully_close(ret)) {  
309 - srs_warn("interrupt parse for peer closed. ret=%d", ret);  
310 - return ret;  
311 - }  
312 -  
313 - srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);  
314 - continue; 307 + srs_error("mpegts: ignore parse ts packet failed. ret=%d", ret);
  308 + return ret;
315 } 309 }
316 srs_info("mpegts: parse ts packet completed"); 310 srs_info("mpegts: parse ts packet completed");
317 } 311 }
@@ -536,7 +530,7 @@ void SrsIngestSrsInput::dirty_all_ts() @@ -536,7 +530,7 @@ void SrsIngestSrsInput::dirty_all_ts()
536 } 530 }
537 } 531 }
538 532
539 -void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8) 533 +int SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8)
540 { 534 {
541 int ret = ERROR_SUCCESS; 535 int ret = ERROR_SUCCESS;
542 536
@@ -555,9 +549,9 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8) @@ -555,9 +549,9 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8)
555 } 549 }
556 550
557 if ((ret = tp->fetch(in_hls->get_url())) != ERROR_SUCCESS) { 551 if ((ret = tp->fetch(in_hls->get_url())) != ERROR_SUCCESS) {
558 - srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret); 552 + srs_error("fetch ts %s for error. ret=%d", tp->url.c_str(), ret);
559 tp->skip = true; 553 tp->skip = true;
560 - continue; 554 + return ret;
561 } 555 }
562 556
563 // only wait for a duration of last piece. 557 // only wait for a duration of last piece.
@@ -565,6 +559,8 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8) @@ -565,6 +559,8 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8)
565 next_connect_time = srs_update_system_time_ms() + (int)tp->duration * 1000; 559 next_connect_time = srs_update_system_time_ms() + (int)tp->duration * 1000;
566 } 560 }
567 } 561 }
  562 +
  563 + return ret;
568 } 564 }
569 565
570 566
@@ -779,10 +775,6 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) @@ -779,10 +775,6 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
779 // we must use queue to cache the msg, then parse it if possible. 775 // we must use queue to cache the msg, then parse it if possible.
780 queue.insert(std::make_pair(msg->dts, msg->detach())); 776 queue.insert(std::make_pair(msg->dts, msg->detach()));
781 if ((ret = parse_message_queue()) != ERROR_SUCCESS) { 777 if ((ret = parse_message_queue()) != ERROR_SUCCESS) {
782 - // when peer closed, close the output and reconnect.  
783 - if (srs_is_client_gracefully_close(ret)) {  
784 - close();  
785 - }  
786 return ret; 778 return ret;
787 } 779 }
788 780
@@ -1204,6 +1196,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* @@ -1204,6 +1196,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
1204 1196
1205 // send out encoded msg. 1197 // send out encoded msg.
1206 if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { 1198 if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
  1199 + srs_error("send RTMP type=%d, dts=%d, size=%d failed. ret=%d", type, timestamp, size, ret);
1207 return ret; 1200 return ret;
1208 } 1201 }
1209 1202
@@ -1355,22 +1348,22 @@ public: @@ -1355,22 +1348,22 @@ public:
1355 int ret = ERROR_SUCCESS; 1348 int ret = ERROR_SUCCESS;
1356 1349
1357 if ((ret = ic->connect()) != ERROR_SUCCESS) { 1350 if ((ret = ic->connect()) != ERROR_SUCCESS) {
1358 - srs_warn("connect oc failed. ret=%d", ret); 1351 + srs_error("connect oc failed. ret=%d", ret);
1359 return ret; 1352 return ret;
1360 } 1353 }
1361 1354
1362 if ((ret = oc->connect()) != ERROR_SUCCESS) { 1355 if ((ret = oc->connect()) != ERROR_SUCCESS) {
1363 - srs_warn("connect ic failed. ret=%d", ret); 1356 + srs_error("connect ic failed. ret=%d", ret);
1364 return ret; 1357 return ret;
1365 } 1358 }
1366 1359
1367 if ((ret = ic->parse(oc, oc)) != ERROR_SUCCESS) { 1360 if ((ret = ic->parse(oc, oc)) != ERROR_SUCCESS) {
1368 - srs_warn("proxy ts to rtmp failed. ret=%d", ret); 1361 + srs_error("proxy ts to rtmp failed. ret=%d", ret);
1369 return ret; 1362 return ret;
1370 } 1363 }
1371 1364
1372 if ((ret = oc->flush_message_queue()) != ERROR_SUCCESS) { 1365 if ((ret = oc->flush_message_queue()) != ERROR_SUCCESS) {
1373 - srs_warn("flush oc message failed. ret=%d", ret); 1366 + srs_error("flush oc message failed. ret=%d", ret);
1374 return ret; 1367 return ret;
1375 } 1368 }
1376 1369
@@ -1401,11 +1394,9 @@ int proxy_hls2rtmp(string hls, string rtmp) @@ -1401,11 +1394,9 @@ int proxy_hls2rtmp(string hls, string rtmp)
1401 SrsIngestSrsContext context(&hls_uri, &rtmp_uri); 1394 SrsIngestSrsContext context(&hls_uri, &rtmp_uri);
1402 for (;;) { 1395 for (;;) {
1403 if ((ret = context.proxy()) == ERROR_SUCCESS) { 1396 if ((ret = context.proxy()) == ERROR_SUCCESS) {
1404 - continue; 1397 + srs_error("proxy hls to rtmp failed. ret=%d", ret);
  1398 + return ret;
1405 } 1399 }
1406 -  
1407 - srs_warn("proxy hls to rtmp failed. ret=%d", ret);  
1408 - st_usleep(SRS_INGEST_HLS_ERROR_RETRY_MS * 1000);  
1409 } 1400 }
1410 1401
1411 return ret; 1402 return ret;