winlin

for #250, decode the ts packet header and adaptation field.

@@ -33,16 +33,21 @@ using namespace std; @@ -33,16 +33,21 @@ using namespace std;
33 #include <srs_kernel_log.hpp> 33 #include <srs_kernel_log.hpp>
34 #include <srs_app_config.hpp> 34 #include <srs_app_config.hpp>
35 #include <srs_kernel_ts.hpp> 35 #include <srs_kernel_ts.hpp>
  36 +#include <srs_kernel_stream.hpp>
  37 +#include <srs_kernel_ts.hpp>
  38 +#include <srs_core_autofree.hpp>
36 39
37 #ifdef SRS_AUTO_STREAM_CASTER 40 #ifdef SRS_AUTO_STREAM_CASTER
38 41
39 SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) 42 SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
40 { 43 {
  44 + stream = new SrsStream();
41 output = _srs_config->get_stream_caster_output(c); 45 output = _srs_config->get_stream_caster_output(c);
42 } 46 }
43 47
44 SrsMpegtsOverUdp::~SrsMpegtsOverUdp() 48 SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
45 { 49 {
  50 + srs_freep(stream);
46 } 51 }
47 52
48 int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) 53 int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
@@ -59,21 +64,35 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) @@ -59,21 +64,35 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
59 } 64 }
60 srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); 65 srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf);
61 66
62 - // process each ts packet 67 + // use stream to parse ts packet.
63 for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) { 68 for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) {
64 - char* ts_packet = buf + i;  
65 - if ((ret = on_ts_packet(ts_packet)) != ERROR_SUCCESS) {  
66 - srs_warn("mpegts: ignore ts packet error. ret=%d", ret);  
67 - continue; 69 + if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
  70 + return ret;
  71 + }
  72 +
  73 + // process each ts packet
  74 + if ((ret = on_ts_packet(stream)) != ERROR_SUCCESS) {
  75 + break;
68 } 76 }
  77 + srs_info("mpegts: parse ts packet completed");
69 } 78 }
  79 + srs_info("mpegts: parse udp packet completed");
70 80
71 return ret; 81 return ret;
72 } 82 }
73 83
74 -int SrsMpegtsOverUdp::on_ts_packet(char* ts_packet) 84 +int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream)
75 { 85 {
76 int ret = ERROR_SUCCESS; 86 int ret = ERROR_SUCCESS;
  87 +
  88 + SrsTsPacket* packet = new SrsTsPacket();
  89 + SrsAutoFree(SrsTsPacket, packet);
  90 +
  91 + if ((ret = packet->decode(stream)) != ERROR_SUCCESS) {
  92 + srs_error("mpegts: decode ts packet failed. ret=%d", ret);
  93 + return ret;
  94 + }
  95 +
77 return ret; 96 return ret;
78 } 97 }
79 98
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 class sockaddr_in; 33 class sockaddr_in;
34 #include <string> 34 #include <string>
35 35
  36 +class SrsStream;
36 class SrsConfDirective; 37 class SrsConfDirective;
37 38
38 #ifdef SRS_AUTO_STREAM_CASTER 39 #ifdef SRS_AUTO_STREAM_CASTER
@@ -43,6 +44,7 @@ class SrsConfDirective; @@ -43,6 +44,7 @@ class SrsConfDirective;
43 class SrsMpegtsOverUdp 44 class SrsMpegtsOverUdp
44 { 45 {
45 private: 46 private:
  47 + SrsStream* stream;
46 std::string output; 48 std::string output;
47 public: 49 public:
48 SrsMpegtsOverUdp(SrsConfDirective* c); 50 SrsMpegtsOverUdp(SrsConfDirective* c);
@@ -60,9 +62,9 @@ public: @@ -60,9 +62,9 @@ public:
60 virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); 62 virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
61 private: 63 private:
62 /** 64 /**
63 - * when got a ts packet, in size TS_PACKET_SIZE. 65 + * the stream contains the ts packet to parse.
64 */ 66 */
65 - virtual int on_ts_packet(char* ts_packet); 67 + virtual int on_ts_packet(SrsStream* stream);
66 }; 68 };
67 69
68 #endif 70 #endif
@@ -219,6 +219,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -219,6 +219,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
219 #define ERROR_MP3_DECODE_ERROR 4009 219 #define ERROR_MP3_DECODE_ERROR 4009
220 #define ERROR_STREAM_CASTER_ENGINE 4010 220 #define ERROR_STREAM_CASTER_ENGINE 4010
221 #define ERROR_STREAM_CASTER_PORT 4011 221 #define ERROR_STREAM_CASTER_PORT 4011
  222 +#define ERROR_STREAM_CASTER_TS_HEADER 4012
  223 +#define ERROR_STREAM_CASTER_TS_SYNC_BYTE 4013
  224 +#define ERROR_STREAM_CASTER_TS_AF 4014
222 225
223 /** 226 /**
224 * whether the error code is an system control error. 227 * whether the error code is an system control error.
@@ -38,6 +38,7 @@ using namespace std; @@ -38,6 +38,7 @@ using namespace std;
38 #include <srs_kernel_avc.hpp> 38 #include <srs_kernel_avc.hpp>
39 #include <srs_kernel_buffer.hpp> 39 #include <srs_kernel_buffer.hpp>
40 #include <srs_kernel_utility.hpp> 40 #include <srs_kernel_utility.hpp>
  41 +#include <srs_kernel_stream.hpp>
41 42
42 // in ms, for HLS aac sync time. 43 // in ms, for HLS aac sync time.
43 #define SRS_CONF_DEFAULT_AAC_SYNC 100 44 #define SRS_CONF_DEFAULT_AAC_SYNC 100
@@ -418,8 +419,65 @@ SrsTsPacket::~SrsTsPacket() @@ -418,8 +419,65 @@ SrsTsPacket::~SrsTsPacket()
418 srs_freep(adaptation_field); 419 srs_freep(adaptation_field);
419 } 420 }
420 421
421 -SrsTsAdaptationField::SrsTsAdaptationField() 422 +int SrsTsPacket::decode(SrsStream* stream)
422 { 423 {
  424 + int ret = ERROR_SUCCESS;
  425 +
  426 + int pos = stream->pos();
  427 +
  428 + // 4B ts packet header.
  429 + if (!stream->require(4)) {
  430 + ret = ERROR_STREAM_CASTER_TS_HEADER;
  431 + srs_error("ts: demux header failed. ret=%d", ret);
  432 + return ret;
  433 + }
  434 +
  435 + sync_byte = stream->read_1bytes();
  436 + if (sync_byte != 0x47) {
  437 + ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE;
  438 + srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret);
  439 + return ret;
  440 + }
  441 +
  442 + int16_t pidv = stream->read_2bytes();
  443 + transport_error_indicator = (pidv >> 15) & 0x01;
  444 + payload_unit_start_indicator = (pidv >> 14) & 0x01;
  445 + transport_priority = (pidv >> 13) & 0x01;
  446 + pid = (SrsTsPid)(pidv & 0x1FFF);
  447 +
  448 + int8_t ccv = stream->read_1bytes();
  449 + transport_scrambling_control = (SrsTsScrambled)((ccv >> 6) & 0x03);
  450 + adaption_field_control = (SrsTsAdaptationFieldType)((ccv >> 4) & 0x03);
  451 + continuity_counter = (SrsTsPid)(ccv & 0x0F);
  452 +
  453 + // TODO: FIXME: create pids map when got new pid.
  454 +
  455 + srs_info("ts: header sync=%#x error=%d unit_start=%d priotiry=%d pid=%d scrambling=%d adaption=%d counter=%d",
  456 + sync_byte, transport_error_indicator, payload_unit_start_indicator, transport_priority, pid,
  457 + transport_scrambling_control, adaption_field_control, continuity_counter);
  458 +
  459 + // optional: adaptation field
  460 + if (adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) {
  461 + srs_freep(adaptation_field);
  462 + adaptation_field = new SrsTsAdaptationField(this);
  463 +
  464 + if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) {
  465 + srs_error("ts: demux af faield. ret=%d", ret);
  466 + return ret;
  467 + }
  468 + srs_verbose("ts: demux af ok.");
  469 + }
  470 +
  471 + // calc the user defined data size for payload.
  472 + int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos);
  473 +
  474 + return ret;
  475 +}
  476 +
  477 +SrsTsAdaptationField::SrsTsAdaptationField(SrsTsPacket* pkt)
  478 +{
  479 + packet = pkt;
  480 +
423 adaption_field_length = 0; 481 adaption_field_length = 0;
424 discontinuity_indicator = 0; 482 discontinuity_indicator = 0;
425 random_access_indicator = 0; 483 random_access_indicator = 0;
@@ -456,6 +514,203 @@ SrsTsAdaptationField::SrsTsAdaptationField() @@ -456,6 +514,203 @@ SrsTsAdaptationField::SrsTsAdaptationField()
456 514
457 SrsTsAdaptationField::~SrsTsAdaptationField() 515 SrsTsAdaptationField::~SrsTsAdaptationField()
458 { 516 {
  517 + srs_freep(transport_private_data);
  518 +}
  519 +
  520 +int SrsTsAdaptationField::decode(SrsStream* stream)
  521 +{
  522 + int ret = ERROR_SUCCESS;
  523 +
  524 + if (!stream->require(2)) {
  525 + ret = ERROR_STREAM_CASTER_TS_AF;
  526 + srs_error("ts: demux af failed. ret=%d", ret);
  527 + return ret;
  528 + }
  529 + adaption_field_length = stream->read_1bytes();
  530 +
  531 + // When the adaptation_field_control value is '11', the value of the adaptation_field_length shall
  532 + // be in the range 0 to 182.
  533 + if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) {
  534 + ret = ERROR_STREAM_CASTER_TS_AF;
  535 + srs_error("ts: demux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret);
  536 + return ret;
  537 + }
  538 + // When the adaptation_field_control value is '10', the value of the adaptation_field_length shall
  539 + // be 183.
  540 + if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) {
  541 + ret = ERROR_STREAM_CASTER_TS_AF;
  542 + srs_error("ts: demux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret);
  543 + return ret;
  544 + }
  545 +
  546 + // no adaptation field.
  547 + if (adaption_field_length == 0) {
  548 + srs_info("ts: demux af empty.");
  549 + return ret;
  550 + }
  551 +
  552 + // the adaptation field start at here.
  553 + int pos_af = stream->pos();
  554 + int8_t tmpv = stream->read_1bytes();
  555 +
  556 + discontinuity_indicator = (tmpv >> 7) & 0x01;
  557 + random_access_indicator = (tmpv >> 6) & 0x01;
  558 + elementary_stream_priority_indicator = (tmpv >> 5) & 0x01;
  559 + PCR_flag = (tmpv >> 4) & 0x01;
  560 + OPCR_flag = (tmpv >> 3) & 0x01;
  561 + splicing_point_flag = (tmpv >> 2) & 0x01;
  562 + transport_private_data_flag = (tmpv >> 1) & 0x01;
  563 + adaptation_field_extension_flag = (tmpv >> 0) & 0x01;
  564 +
  565 + if (PCR_flag) {
  566 + if (!stream->require(6)) {
  567 + ret = ERROR_STREAM_CASTER_TS_AF;
  568 + srs_error("ts: demux af PCR_flag failed. ret=%d", ret);
  569 + return ret;
  570 + }
  571 +
  572 + char* pp = NULL;
  573 + char* p = stream->data();
  574 + stream->skip(6);
  575 +
  576 + pp = (char*)&program_clock_reference_base;
  577 + pp[5] = *p++;
  578 + pp[4] = *p++;
  579 + pp[3] = *p++;
  580 + pp[2] = *p++;
  581 + pp[1] = *p++;
  582 + pp[0] = *p++;
  583 +
  584 + // @remark, use pcr base and ignore the extension
  585 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
  586 + program_clock_reference_extension = program_clock_reference_base & 0x1ff;
  587 + program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
  588 + }
  589 +
  590 + if (OPCR_flag) {
  591 + if (!stream->require(6)) {
  592 + ret = ERROR_STREAM_CASTER_TS_AF;
  593 + srs_error("ts: demux af OPCR_flag failed. ret=%d", ret);
  594 + return ret;
  595 + }
  596 +
  597 + char* pp = NULL;
  598 + char* p = stream->data();
  599 + stream->skip(6);
  600 +
  601 + pp = (char*)&original_program_clock_reference_base;
  602 + pp[5] = *p++;
  603 + pp[4] = *p++;
  604 + pp[3] = *p++;
  605 + pp[2] = *p++;
  606 + pp[1] = *p++;
  607 + pp[0] = *p++;
  608 +
  609 + // @remark, use pcr base and ignore the extension
  610 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
  611 + original_program_clock_reference_extension = program_clock_reference_base & 0x1ff;
  612 + original_program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
  613 + }
  614 +
  615 + if (splicing_point_flag) {
  616 + if (!stream->require(1)) {
  617 + ret = ERROR_STREAM_CASTER_TS_AF;
  618 + srs_error("ts: demux af splicing_point_flag failed. ret=%d", ret);
  619 + return ret;
  620 + }
  621 + splice_countdown = stream->read_1bytes();
  622 + }
  623 +
  624 + if (transport_private_data_flag) {
  625 + if (!stream->require(1)) {
  626 + ret = ERROR_STREAM_CASTER_TS_AF;
  627 + srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret);
  628 + return ret;
  629 + }
  630 + transport_private_data_length = (u_int8_t)stream->read_1bytes();
  631 +
  632 + if (!stream->require(transport_private_data_length)) {
  633 + ret = ERROR_STREAM_CASTER_TS_AF;
  634 + srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret);
  635 + return ret;
  636 + }
  637 + srs_freep(transport_private_data);
  638 + transport_private_data = new char[transport_private_data_length];
  639 + stream->read_bytes(transport_private_data, transport_private_data_length);
  640 + }
  641 +
  642 + if (adaptation_field_extension_flag) {
  643 + int pos_af_ext = stream->pos();
  644 +
  645 + if (!stream->require(2)) {
  646 + ret = ERROR_STREAM_CASTER_TS_AF;
  647 + srs_error("ts: demux af adaptation_field_extension_flag failed. ret=%d", ret);
  648 + return ret;
  649 + }
  650 + adaptation_field_extension_length = (u_int8_t)stream->read_1bytes();
  651 + ltw_flag = stream->read_1bytes();
  652 +
  653 + piecewise_rate_flag = (ltw_flag >> 6) & 0x01;
  654 + seamless_splice_flag = (ltw_flag >> 5) & 0x01;
  655 + ltw_flag = (ltw_flag >> 7) & 0x01;
  656 +
  657 + if (ltw_flag) {
  658 + if (!stream->require(2)) {
  659 + ret = ERROR_STREAM_CASTER_TS_AF;
  660 + srs_error("ts: demux af ltw_flag failed. ret=%d", ret);
  661 + return ret;
  662 + }
  663 + ltw_offset = stream->read_2bytes();
  664 +
  665 + ltw_valid_flag = (ltw_offset >> 15) &0x01;
  666 + ltw_offset &= 0x7FFF;
  667 + }
  668 +
  669 + if (piecewise_rate_flag) {
  670 + if (!stream->require(3)) {
  671 + ret = ERROR_STREAM_CASTER_TS_AF;
  672 + srs_error("ts: demux af piecewise_rate_flag failed. ret=%d", ret);
  673 + return ret;
  674 + }
  675 + piecewise_rate = stream->read_3bytes();
  676 +
  677 + piecewise_rate &= 0x3FFFFF;
  678 + }
  679 +
  680 + if (seamless_splice_flag) {
  681 + if (!stream->require(5)) {
  682 + ret = ERROR_STREAM_CASTER_TS_AF;
  683 + srs_error("ts: demux af seamless_splice_flag failed. ret=%d", ret);
  684 + return ret;
  685 + }
  686 + marker_bit0 = stream->read_1bytes();
  687 + DTS_next_AU1 = stream->read_2bytes();
  688 + DTS_next_AU2 = stream->read_2bytes();
  689 +
  690 + splice_type = (marker_bit0 >> 4) & 0x0F;
  691 + DTS_next_AU0 = (marker_bit0 >> 1) & 0x07;
  692 + marker_bit0 &= 0x01;
  693 +
  694 + marker_bit1 = DTS_next_AU1 & 0x01;
  695 + DTS_next_AU1 = (DTS_next_AU1 >> 1) & 0x7FFF;
  696 +
  697 + marker_bit2 = DTS_next_AU2 & 0x01;
  698 + DTS_next_AU2 = (DTS_next_AU2 >> 1) & 0x7FFF;
  699 + }
  700 +
  701 + nb_af_ext_reserved = adaptation_field_extension_length - (stream->pos() - pos_af_ext);
  702 + stream->skip(nb_af_ext_reserved);
  703 + }
  704 +
  705 + nb_af_reserved = adaption_field_length - (stream->pos() - pos_af);
  706 + stream->skip(nb_af_reserved);
  707 +
  708 + srs_info("ts: af parsed, discontinuity=%d random=%d priority=%d PCR=%d OPCR=%d slicing=%d private=%d extension=%d/%d pcr=%"PRId64"/%d opcr=%"PRId64"/%d",
  709 + discontinuity_indicator, random_access_indicator, elementary_stream_priority_indicator, PCR_flag, OPCR_flag, splicing_point_flag,
  710 + transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base,
  711 + program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension);
  712 +
  713 + return ret;
459 } 714 }
460 715
461 SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w) 716 SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w)
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 33
34 #include <srs_kernel_codec.hpp> 34 #include <srs_kernel_codec.hpp>
35 35
  36 +class SrsStream;
36 class SrsTsCache; 37 class SrsTsCache;
37 class SrsTSMuxer; 38 class SrsTSMuxer;
38 class SrsFileWriter; 39 class SrsFileWriter;
@@ -114,7 +115,7 @@ enum SrsTsAdaptationFieldType @@ -114,7 +115,7 @@ enum SrsTsAdaptationFieldType
114 */ 115 */
115 class SrsTsPacket 116 class SrsTsPacket
116 { 117 {
117 -private: 118 +public:
118 // 1B 119 // 1B
119 /** 120 /**
120 * The sync_byte is a fixed 8-bit field whose value is '0100 0111' (0x47). Sync_byte emulation in the choice of 121 * The sync_byte is a fixed 8-bit field whose value is '0100 0111' (0x47). Sync_byte emulation in the choice of
@@ -206,6 +207,11 @@ public: @@ -206,6 +207,11 @@ public:
206 SrsTsPacket(); 207 SrsTsPacket();
207 virtual ~SrsTsPacket(); 208 virtual ~SrsTsPacket();
208 public: 209 public:
  210 + /**
  211 + * the stream contains only one ts packet.
  212 + * @remark we will consume all bytes in stream.
  213 + */
  214 + virtual int decode(SrsStream* stream);
209 }; 215 };
210 216
211 /** 217 /**
@@ -215,7 +221,7 @@ public: @@ -215,7 +221,7 @@ public:
215 */ 221 */
216 class SrsTsAdaptationField 222 class SrsTsAdaptationField
217 { 223 {
218 -private: 224 +public:
219 // 1B 225 // 1B
220 /** 226 /**
221 * The adaptation_field_length is an 8-bit field specifying the number of bytes in the 227 * The adaptation_field_length is an 8-bit field specifying the number of bytes in the
@@ -500,10 +506,13 @@ private: @@ -500,10 +506,13 @@ private:
500 * decoder. 506 * decoder.
501 */ 507 */
502 int nb_af_reserved; 508 int nb_af_reserved;
  509 +private:
  510 + SrsTsPacket* packet;
503 public: 511 public:
504 - SrsTsAdaptationField(); 512 + SrsTsAdaptationField(SrsTsPacket* pkt);
505 virtual ~SrsTsAdaptationField(); 513 virtual ~SrsTsAdaptationField();
506 public: 514 public:
  515 + virtual int decode(SrsStream* stream);
507 }; 516 };
508 517
509 /** 518 /**