winlin

fix the ts parse bug, should never complete message when PES packet length is 0.

@@ -563,11 +563,11 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file) @@ -563,11 +563,11 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
563 563
564 // #EXTM3U\n 564 // #EXTM3U\n
565 // #EXT-X-VERSION:3\n 565 // #EXT-X-VERSION:3\n
566 - // #EXT-X-ALLOW-CACHE:NO\n 566 + // #EXT-X-ALLOW-CACHE:YES\n
567 std::stringstream ss; 567 std::stringstream ss;
568 ss << "#EXTM3U" << SRS_CONSTS_LF 568 ss << "#EXTM3U" << SRS_CONSTS_LF
569 << "#EXT-X-VERSION:3" << SRS_CONSTS_LF 569 << "#EXT-X-VERSION:3" << SRS_CONSTS_LF
570 - << "#EXT-X-ALLOW-CACHE:NO" << SRS_CONSTS_LF; 570 + << "#EXT-X-ALLOW-CACHE:YES" << SRS_CONSTS_LF;
571 srs_verbose("write m3u8 header success."); 571 srs_verbose("write m3u8 header success.");
572 572
573 // #EXT-X-MEDIA-SEQUENCE:4294967295\n 573 // #EXT-X-MEDIA-SEQUENCE:4294967295\n
@@ -1265,10 +1265,15 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1265,10 +1265,15 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1265 msg = new SrsTsMessage(channel, packet); 1265 msg = new SrsTsMessage(channel, packet);
1266 channel->msg = msg; 1266 channel->msg = msg;
1267 } 1267 }
  1268 +
  1269 + // we must cache the fresh state of msg,
  1270 + // for the PES_packet_length is 0, the first payload_unit_start_indicator always 1,
  1271 + // so should check for the fresh and not completed it.
  1272 + bool is_fresh_msg = msg->fresh();
1268 1273
1269 // check when fresh, the payload_unit_start_indicator 1274 // check when fresh, the payload_unit_start_indicator
1270 // should be 1 for the fresh msg. 1275 // should be 1 for the fresh msg.
1271 - if (msg->fresh() && !packet->payload_unit_start_indicator) { 1276 + if (is_fresh_msg && !packet->payload_unit_start_indicator) {
1272 ret = ERROR_STREAM_CASTER_TS_PSE; 1277 ret = ERROR_STREAM_CASTER_TS_PSE;
1273 srs_error("ts: PES fresh packet length=%d, us=%d, cc=%d. ret=%d", 1278 srs_error("ts: PES fresh packet length=%d, us=%d, cc=%d. ret=%d",
1274 msg->PES_packet_length, packet->payload_unit_start_indicator, packet->continuity_counter, 1279 msg->PES_packet_length, packet->payload_unit_start_indicator, packet->continuity_counter,
@@ -1278,7 +1283,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1278,7 +1283,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1278 1283
1279 // check when not fresh and PES_packet_length>0, 1284 // check when not fresh and PES_packet_length>0,
1280 // the payload_unit_start_indicator should never be 1 when not completed. 1285 // the payload_unit_start_indicator should never be 1 when not completed.
1281 - if (!msg->fresh() && msg->PES_packet_length > 0 1286 + if (!is_fresh_msg && msg->PES_packet_length > 0
1282 && !msg->completed(packet->payload_unit_start_indicator) 1287 && !msg->completed(packet->payload_unit_start_indicator)
1283 && packet->payload_unit_start_indicator 1288 && packet->payload_unit_start_indicator
1284 ) { 1289 ) {
@@ -1295,7 +1300,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1295,7 +1300,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1295 } 1300 }
1296 1301
1297 // check the continuity counter 1302 // check the continuity counter
1298 - if (!msg->fresh()) { 1303 + if (!is_fresh_msg) {
1299 // late-incoming or duplicated continuity, drop message. 1304 // late-incoming or duplicated continuity, drop message.
1300 // @remark check overflow, the counter plus 1 should greater when invalid. 1305 // @remark check overflow, the counter plus 1 should greater when invalid.
1301 if (msg->continuity_counter >= packet->continuity_counter 1306 if (msg->continuity_counter >= packet->continuity_counter
@@ -1322,7 +1327,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1322,7 +1327,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1322 msg->continuity_counter = packet->continuity_counter; 1327 msg->continuity_counter = packet->continuity_counter;
1323 1328
1324 // for the PES_packet_length(0), reap when completed. 1329 // for the PES_packet_length(0), reap when completed.
1325 - if (!msg->fresh() && msg->completed(packet->payload_unit_start_indicator)) { 1330 + if (!is_fresh_msg && msg->completed(packet->payload_unit_start_indicator)) {
1326 // reap previous PES packet. 1331 // reap previous PES packet.
1327 *ppmsg = msg; 1332 *ppmsg = msg;
1328 channel->msg = NULL; 1333 channel->msg = NULL;
@@ -1358,7 +1363,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1358,7 +1363,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1358 packet_start_code_prefix &= 0xFFFFFF; 1363 packet_start_code_prefix &= 0xFFFFFF;
1359 if (packet_start_code_prefix != 0x01) { 1364 if (packet_start_code_prefix != 0x01) {
1360 ret = ERROR_STREAM_CASTER_TS_PSE; 1365 ret = ERROR_STREAM_CASTER_TS_PSE;
1361 - srs_error("ts: demux PSE start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret); 1366 + srs_error("ts: demux PES start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret);
1362 return ret; 1367 return ret;
1363 } 1368 }
1364 int pos_packet = stream->pos(); 1369 int pos_packet = stream->pos();
@@ -1380,7 +1385,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1380,7 +1385,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1380 // 3B flags. 1385 // 3B flags.
1381 if (!stream->require(3)) { 1386 if (!stream->require(3)) {
1382 ret = ERROR_STREAM_CASTER_TS_PSE; 1387 ret = ERROR_STREAM_CASTER_TS_PSE;
1383 - srs_error("ts: demux PSE flags failed. ret=%d", ret); 1388 + srs_error("ts: demux PES flags failed. ret=%d", ret);
1384 return ret; 1389 return ret;
1385 } 1390 }
1386 // 1B 1391 // 1B
@@ -1419,7 +1424,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1419,7 +1424,7 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1419 nb_required += PES_extension_flag? 1:0; 1424 nb_required += PES_extension_flag? 1:0;
1420 if (!stream->require(nb_required)) { 1425 if (!stream->require(nb_required)) {
1421 ret = ERROR_STREAM_CASTER_TS_PSE; 1426 ret = ERROR_STREAM_CASTER_TS_PSE;
1422 - srs_error("ts: demux PSE payload failed. ret=%d", ret); 1427 + srs_error("ts: demux PES payload failed. ret=%d", ret);
1423 return ret; 1428 return ret;
1424 } 1429 }
1425 1430
@@ -1638,6 +1643,13 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) @@ -1638,6 +1643,13 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
1638 srs_warn("ts: drop the pes packet %dB for stream_id=%#x", nb_drop, stream_id); 1643 srs_warn("ts: drop the pes packet %dB for stream_id=%#x", nb_drop, stream_id);
1639 } 1644 }
1640 } 1645 }
  1646 +
  1647 + // when fresh and the PES_packet_length is 0,
  1648 + // the payload_unit_start_indicator always be 1,
  1649 + // the message should never EOF for the first packet.
  1650 + if (is_fresh_msg && msg->PES_packet_length == 0) {
  1651 + return ret;
  1652 + }
1641 1653
1642 // check msg, reap when completed. 1654 // check msg, reap when completed.
1643 if (msg->completed(packet->payload_unit_start_indicator)) { 1655 if (msg->completed(packet->payload_unit_start_indicator)) {