winlin

rewrite the ts remux of hls. 2.0.117

... ... @@ -313,7 +313,7 @@ int SrsHlsMuxer::update_acodec(SrsCodecAudio ac)
return current->muxer->update_acodec(ac);
}
int SrsHlsMuxer::flush_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab)
int SrsHlsMuxer::flush_audio(SrsTsCache* cache)
{
int ret = ERROR_SUCCESS;
... ... @@ -323,24 +323,24 @@ int SrsHlsMuxer::flush_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab)
return ret;
}
if (ab->length() <= 0) {
if (!cache->audio || cache->audio->payload->length() <= 0) {
return ret;
}
// update the duration of segment.
current->update_duration(af->pts);
current->update_duration(cache->audio->pts);
if ((ret = current->muxer->write_audio(af, ab)) != ERROR_SUCCESS) {
if ((ret = current->muxer->write_audio(cache->audio)) != ERROR_SUCCESS) {
return ret;
}
// write success, clear and free the buffer
ab->erase(ab->length());
// write success, clear and free the msg
srs_freep(cache->audio);
return ret;
}
int SrsHlsMuxer::flush_video(SrsMpegtsFrame* /*af*/, SrsSimpleBuffer* /*ab*/, SrsMpegtsFrame* vf, SrsSimpleBuffer* vb)
int SrsHlsMuxer::flush_video(SrsTsCache* cache)
{
int ret = ERROR_SUCCESS;
... ... @@ -350,17 +350,21 @@ int SrsHlsMuxer::flush_video(SrsMpegtsFrame* /*af*/, SrsSimpleBuffer* /*ab*/, Sr
return ret;
}
if (!cache->video || cache->video->payload->length() <= 0) {
return ret;
}
srs_assert(current);
// update the duration of segment.
current->update_duration(vf->dts);
current->update_duration(cache->video->dts);
if ((ret = current->muxer->write_video(vf, vb)) != ERROR_SUCCESS) {
if ((ret = current->muxer->write_video(cache->video)) != ERROR_SUCCESS) {
return ret;
}
// write success, clear and free the buffer
vb->erase(vb->length());
// write success, clear and free the msg
srs_freep(cache->video);
return ret;
}
... ... @@ -649,7 +653,7 @@ int SrsHlsCache::on_unpublish(SrsHlsMuxer* muxer)
{
int ret = ERROR_SUCCESS;
if ((ret = muxer->flush_audio(cache->af, cache->ab)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush audio failed. ret=%d", ret);
return ret;
}
... ... @@ -682,8 +686,8 @@ int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
}
// flush if buffer exceed max size.
if (cache->ab->length() > SRS_AUTO_HLS_AUDIO_CACHE_SIZE) {
if ((ret = muxer->flush_audio(cache->af, cache->ab)) != ERROR_SUCCESS) {
if (cache->audio->payload->length() > SRS_AUTO_HLS_AUDIO_CACHE_SIZE) {
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
return ret;
}
}
... ... @@ -692,8 +696,8 @@ int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// in ms, audio delay to flush the audios.
int64_t audio_delay = SRS_CONF_DEFAULT_AAC_DELAY;
// flush if audio delay exceed
if (pts - cache->audio_buffer_start_pts > audio_delay * 90) {
if ((ret = muxer->flush_audio(cache->af, cache->ab)) != ERROR_SUCCESS) {
if (pts - cache->audio->start_pts > audio_delay * 90) {
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
return ret;
}
}
... ... @@ -707,7 +711,7 @@ int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// we use absolutely overflow of segment to make jwplayer/ffplay happy
// @see https://github.com/winlinvip/simple-rtmp-server/issues/151#issuecomment-71155184
if (muxer->is_segment_absolutely_overflow()) {
if ((ret = reap_segment("audio", muxer, cache->af->pts)) != ERROR_SUCCESS) {
if ((ret = reap_segment("audio", muxer, cache->audio->pts)) != ERROR_SUCCESS) {
return ret;
}
}
... ... @@ -728,14 +732,14 @@ int SrsHlsCache::write_video(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// 1. base on gop.
// 2. some gops duration overflow.
if (sample->frame_type == SrsCodecVideoAVCFrameKeyFrame && muxer->is_segment_overflow()) {
if ((ret = reap_segment("video", muxer, cache->vf->dts)) != ERROR_SUCCESS) {
if ((ret = reap_segment("video", muxer, cache->video->dts)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
// flush video when got one
if ((ret = muxer->flush_video(cache->af, cache->ab, cache->vf, cache->vb)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_video(cache)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush video failed. ret=%d", ret);
return ret;
}
... ... @@ -761,7 +765,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
// TODO: fresh segment begin with audio or video?
// segment open, flush video first.
if ((ret = muxer->flush_video(cache->af, cache->ab, cache->vf, cache->vb)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_video(cache)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush video failed. ret=%d", ret);
return ret;
}
... ... @@ -769,7 +773,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
// segment open, flush the audio.
// @see: ngx_rtmp_hls_open_fragment
/* start fragment with audio to make iPhone happy */
if ((ret = muxer->flush_audio(cache->af, cache->ab)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush audio failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -42,7 +42,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsSharedPtrMessage;
class SrsCodecSample;
class SrsMpegtsFrame;
class SrsAmf0Object;
class SrsRtmpJitter;
class SrsTSMuxer;
... ... @@ -55,6 +54,7 @@ class SrsSimpleBuffer;
class SrsTsAacJitter;
class SrsTsCache;
class SrsHlsSegment;
class SrsTsCache;
/**
* the handler for hls event.
... ... @@ -224,8 +224,8 @@ public:
virtual bool is_segment_absolutely_overflow();
public:
virtual int update_acodec(SrsCodecAudio ac);
virtual int flush_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab);
virtual int flush_video(SrsMpegtsFrame* af, SrsSimpleBuffer* ab, SrsMpegtsFrame* vf, SrsSimpleBuffer* vb);
virtual int flush_audio(SrsTsCache* cache);
virtual int flush_video(SrsTsCache* cache);
/**
* close segment(ts).
* @param log_desc the description for log.
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 116
#define VERSION_REVISION 117
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
... ...
... ... @@ -50,14 +50,11 @@ using namespace std;
#define _SRS_AAC_SAMPLE_SIZE 1024
// the mpegts header specifed the video/audio pid.
#define TS_VIDEO_PID 256
#define TS_AUDIO_PID 257
// ts aac stream id.
#define TS_AUDIO_AAC 0xc0
#define TS_AUDIO_MP3 0x04
// ts avc stream id.
#define TS_VIDEO_AVC 0xe0
#define TS_PMT_NUMBER 1
#define TS_PMT_PID 0x100
#define TS_VIDEO_AVC_PID 0x101
#define TS_AUDIO_AAC_PID 0x102
#define TS_AUDIO_MP3_PID 0x103
/**
* the public data, event HLS disable, others can use it.
... ... @@ -420,6 +417,7 @@ SrsTsChannel::SrsTsChannel()
apply = SrsTsPidApplyReserved;
stream = SrsTsStreamReserved;
msg = NULL;
continuity_counter = 0;
}
SrsTsChannel::~SrsTsChannel()
... ... @@ -437,6 +435,9 @@ SrsTsMessage::SrsTsMessage(SrsTsChannel* c, SrsTsPacket* p)
continuity_counter = 0;
PES_packet_length = 0;
payload = new SrsSimpleBuffer();
start_pts = 0;
write_pcr = false;
}
SrsTsMessage::~SrsTsMessage()
... ... @@ -489,12 +490,12 @@ bool SrsTsMessage::fresh()
bool SrsTsMessage::is_audio()
{
return ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudio;
return ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudioChecker;
}
bool SrsTsMessage::is_video()
{
return ((sid >> 4) & 0x0f) == SrsTsPESStreamIdVideo;
return ((sid >> 4) & 0x0f) == SrsTsPESStreamIdVideoChecker;
}
int SrsTsMessage::stream_number()
... ... @@ -585,31 +586,17 @@ int SrsTsContext::decode(SrsStream* stream, ISrsTsHandler* handler)
return ret;
}
int SrsTsContext::encode(SrsFileWriter* writer, SrsMpegtsFrame* frame, SrsSimpleBuffer* payload, SrsCodecVideo vc, SrsCodecAudio ac)
{
int ret = ERROR_SUCCESS;
// when any codec changed, write PAT/PMT table.
if (vcodec != vc || acodec != ac) {
vcodec = vc;
acodec = ac;
if ((ret = encode_pat_pmt(writer, vc, ac)) != ERROR_SUCCESS) {
return ret;
}
}
// encode the media frame to PES packets over TS.
return encode_pes(writer, frame, payload);
}
int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vc, SrsCodecAudio ac)
int SrsTsContext::encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsCodecVideo vc, SrsCodecAudio ac)
{
int ret = ERROR_SUCCESS;
SrsTsStream vs = SrsTsStreamReserved;
SrsTsStream as = SrsTsStreamReserved;
SrsTsStream vs, as;
int16_t video_pid, audio_pid;
switch (vc) {
case SrsCodecVideoAVC: vs = SrsTsStreamVideoH264; break;
case SrsCodecVideoAVC:
vs = SrsTsStreamVideoH264;
video_pid = TS_VIDEO_AVC_PID;
break;
case SrsCodecVideoReserved:
case SrsCodecVideoReserved1:
case SrsCodecVideoReserved2:
... ... @@ -618,11 +605,18 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vc, SrsCod
case SrsCodecVideoOn2VP6:
case SrsCodecVideoOn2VP6WithAlphaChannel:
case SrsCodecVideoScreenVideoVersion2:
vs = SrsTsStreamReserved;
break;
}
switch (ac) {
case SrsCodecAudioAAC: as = SrsTsStreamAudioAAC; break;
case SrsCodecAudioMP3: as = SrsTsStreamAudioMp3; break;
case SrsCodecAudioAAC:
as = SrsTsStreamAudioAAC;
audio_pid = TS_AUDIO_AAC_PID;
break;
case SrsCodecAudioMP3:
as = SrsTsStreamAudioMp3;
audio_pid = TS_AUDIO_MP3_PID;
break;
case SrsCodecAudioReserved1:
case SrsCodecAudioLinearPCMPlatformEndian:
case SrsCodecAudioADPCM:
... ... @@ -636,11 +630,33 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vc, SrsCod
case SrsCodecAudioSpeex:
case SrsCodecAudioReservedMP3_8kHz:
case SrsCodecAudioReservedDeviceSpecificSound:
as = SrsTsStreamReserved;
break;
}
// when any codec changed, write PAT/PMT table.
if (vcodec != vc || acodec != ac) {
vcodec = vc;
acodec = ac;
if ((ret = encode_pat_pmt(writer, video_pid, vs, audio_pid, as)) != ERROR_SUCCESS) {
return ret;
}
}
// encode the media frame to PES packets over TS.
if (msg->is_audio()) {
return encode_pes(writer, msg, audio_pid, as);
} else {
return encode_pes(writer, msg, video_pid, vs);
}
}
int16_t pmt_number = 1;
int16_t pmt_pid = 0x100;
int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as)
{
int ret = ERROR_SUCCESS;
int16_t pmt_number = TS_PMT_NUMBER;
int16_t pmt_pid = TS_PMT_PID;
if (true) {
SrsTsPacket* pkt = SrsTsPacket::create_pat(this, pmt_number, pmt_pid);
SrsAutoFree(SrsTsPacket, pkt);
... ... @@ -666,10 +682,8 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vc, SrsCod
return ret;
}
}
int16_t video_pid = 0x101;
int16_t audio_pid = 0x102;
if (true) {
SrsTsPacket* pkt = SrsTsPacket::create_pmt(this, pmt_number, pmt_pid, video_pid, vs, audio_pid, as);
SrsTsPacket* pkt = SrsTsPacket::create_pmt(this, pmt_number, pmt_pid, vpid, vs, apid, as);
SrsAutoFree(SrsTsPacket, pkt);
char* buf = new char[SRS_TS_PACKET_SIZE];
... ... @@ -697,9 +711,76 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vc, SrsCod
return ret;
}
int SrsTsContext::encode_pes(SrsFileWriter* writer, SrsMpegtsFrame* frame, SrsSimpleBuffer* payload)
int SrsTsContext::encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid)
{
int ret = ERROR_SUCCESS;
if (msg->payload->length() == 0) {
return ret;
}
SrsTsChannel* channel = get(pid);
srs_assert(channel);
char* start = msg->payload->bytes();
char* end = start + msg->payload->length();
char* p = start;
while (p < end) {
SrsTsPacket* pkt = NULL;
if (p == start) {
pkt = SrsTsPacket::create_pes_first(this,
pid, msg->sid, channel->continuity_counter++, msg->discontinuity,
msg->write_pcr? msg->dts:-1, msg->dts, msg->pts, msg->payload->length()
);
} else {
pkt = SrsTsPacket::create_pes_continue(this,
pid, msg->sid, channel->continuity_counter++
);
}
SrsAutoFree(SrsTsPacket, pkt);
char* buf = new char[SRS_TS_PACKET_SIZE];
SrsAutoFree(char, buf);
// set the left bytes with 0xFF.
int nb_buf = pkt->size();
srs_assert(nb_buf < SRS_TS_PACKET_SIZE);
int left = srs_min(end - p, SRS_TS_PACKET_SIZE - nb_buf);
int nb_stuffings = SRS_TS_PACKET_SIZE - nb_buf - left;
if (nb_stuffings > 0) {
// set all bytes to stuffings.
memset(buf, 0xFF, SRS_TS_PACKET_SIZE);
// padding with stuffings.
pkt->padding(nb_stuffings);
// size changed, recalc it.
nb_buf = pkt->size();
srs_assert(nb_buf < SRS_TS_PACKET_SIZE);
left = srs_min(end - p, SRS_TS_PACKET_SIZE - nb_buf);
nb_stuffings = SRS_TS_PACKET_SIZE - nb_buf - left;
srs_assert(nb_stuffings == 0);
}
memcpy(buf + nb_buf, p, left);
p += left;
SrsStream stream;
if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = pkt->encode(&stream)) != ERROR_SUCCESS) {
srs_error("ts encode ts packet failed. ret=%d", ret);
return ret;
}
if ((ret = writer->write(buf, SRS_TS_PACKET_SIZE, NULL)) != ERROR_SUCCESS) {
srs_error("ts write ts packet failed. ret=%d", ret);
return ret;
}
}
return ret;
}
... ... @@ -867,6 +948,34 @@ int SrsTsPacket::encode(SrsStream* stream)
return ret;
}
void SrsTsPacket::padding(int nb_stuffings)
{
if (!adaptation_field) {
SrsTsAdaptationField* af = new SrsTsAdaptationField(this);
adaptation_field = af;
af->adaption_field_length = 0; // calc in size.
af->discontinuity_indicator = 0;
af->random_access_indicator = 0;
af->elementary_stream_priority_indicator = 0;
af->PCR_flag = 0;
af->OPCR_flag = 0;
af->splicing_point_flag = 0;
af->transport_private_data_flag = 0;
af->adaptation_field_extension_flag = 0;
// consume the af size if possible.
nb_stuffings = srs_max(0, nb_stuffings - af->size());
}
adaptation_field->nb_af_reserved = nb_stuffings;
// set payload with af.
if (adaption_field_control == SrsTsAdaptationFieldTypePayloadOnly) {
adaption_field_control = SrsTsAdaptationFieldTypeBoth;
}
}
SrsTsPacket* SrsTsPacket::create_pat(SrsTsContext* context, int16_t pmt_number, int16_t pmt_pid)
{
SrsTsPacket* pkt = new SrsTsPacket(context);
... ... @@ -906,6 +1015,7 @@ SrsTsPacket* SrsTsPacket::create_pmt(SrsTsContext* context, int16_t pmt_number,
pkt->pid = (SrsTsPid)pmt_pid;
pkt->transport_scrambling_control = SrsTsScrambledDisabled;
pkt->adaption_field_control = SrsTsAdaptationFieldTypePayloadOnly;
// TODO: FIXME: maybe should continuous in channel.
pkt->continuity_counter = 0;
pkt->adaptation_field = NULL;
SrsTsPayloadPMT* pmt = new SrsTsPayloadPMT(pkt);
... ... @@ -928,6 +1038,80 @@ SrsTsPacket* SrsTsPacket::create_pmt(SrsTsContext* context, int16_t pmt_number,
return pkt;
}
SrsTsPacket* SrsTsPacket::create_pes_first(SrsTsContext* context,
int16_t pid, SrsTsPESStreamId sid, u_int8_t continuity_counter, bool discontinuity,
int64_t pcr, int64_t dts, int64_t pts, int size
) {
SrsTsPacket* pkt = new SrsTsPacket(context);
pkt->sync_byte = 0x47;
pkt->transport_error_indicator = 0;
pkt->payload_unit_start_indicator = 1;
pkt->transport_priority = 0;
pkt->pid = (SrsTsPid)pid;
pkt->transport_scrambling_control = SrsTsScrambledDisabled;
pkt->adaption_field_control = SrsTsAdaptationFieldTypePayloadOnly;
pkt->continuity_counter = continuity_counter;
pkt->adaptation_field = NULL;
SrsTsPayloadPES* pes = new SrsTsPayloadPES(pkt);
pkt->payload = pes;
if (pcr >= 0) {
SrsTsAdaptationField* af = new SrsTsAdaptationField(pkt);
pkt->adaptation_field = af;
pkt->adaption_field_control = SrsTsAdaptationFieldTypeBoth;
af->adaption_field_length = 0; // calc in size.
af->discontinuity_indicator = discontinuity;
af->random_access_indicator = 0;
af->elementary_stream_priority_indicator = 0;
af->PCR_flag = 1;
af->OPCR_flag = 0;
af->splicing_point_flag = 0;
af->transport_private_data_flag = 0;
af->adaptation_field_extension_flag = 0;
af->program_clock_reference_base = pcr;
af->program_clock_reference_extension = 0;
}
pes->packet_start_code_prefix = 0x01;
pes->stream_id = (u_int8_t)sid;
pes->PES_packet_length = (size > 0xFFFF)? 0:size;
pes->PES_scrambling_control = 0;
pes->PES_priority = 0;
pes->data_alignment_indicator = 0;
pes->copyright = 0;
pes->original_or_copy = 0;
pes->PTS_DTS_flags = (dts == pts)? 0x02:0x03;
pes->ESCR_flag = 0;
pes->ES_rate_flag = 0;
pes->DSM_trick_mode_flag = 0;
pes->additional_copy_info_flag = 0;
pes->PES_CRC_flag = 0;
pes->PES_extension_flag = 0;
pes->PES_header_data_length = 0; // calc in size.
pes->pts = pts;
pes->dts = dts;
return pkt;
}
SrsTsPacket* SrsTsPacket::create_pes_continue(SrsTsContext* context,
int16_t pid, SrsTsPESStreamId sid, u_int8_t continuity_counter
) {
SrsTsPacket* pkt = new SrsTsPacket(context);
pkt->sync_byte = 0x47;
pkt->transport_error_indicator = 0;
pkt->payload_unit_start_indicator = 0;
pkt->transport_priority = 0;
pkt->pid = (SrsTsPid)pid;
pkt->transport_scrambling_control = SrsTsScrambledDisabled;
pkt->adaption_field_control = SrsTsAdaptationFieldTypePayloadOnly;
pkt->continuity_counter = continuity_counter;
pkt->adaptation_field = NULL;
pkt->payload = NULL;
return pkt;
}
SrsTsAdaptationField::SrsTsAdaptationField(SrsTsPacket* pkt)
{
packet = pkt;
... ... @@ -964,6 +1148,10 @@ SrsTsAdaptationField::SrsTsAdaptationField(SrsTsPacket* pkt)
marker_bit2 = 0;
nb_af_ext_reserved = 0;
nb_af_reserved = 0;
const1_value0 = 0x3F;
const1_value1 = 0x1F;
const1_value2 = 0x3F;
}
SrsTsAdaptationField::~SrsTsAdaptationField()
... ... @@ -1014,7 +1202,7 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
OPCR_flag = (tmpv >> 3) & 0x01;
splicing_point_flag = (tmpv >> 2) & 0x01;
transport_private_data_flag = (tmpv >> 1) & 0x01;
adaptation_field_extension_flag = (tmpv >> 0) & 0x01;
adaptation_field_extension_flag = tmpv & 0x01;
if (PCR_flag) {
if (!stream->require(6)) {
... ... @@ -1027,7 +1215,8 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
char* p = stream->data() + stream->pos();
stream->skip(6);
pp = (char*)&program_clock_reference_base;
int64_t pcrv = 0;
pp = (char*)&pcrv;
pp[5] = *p++;
pp[4] = *p++;
pp[3] = *p++;
... ... @@ -1037,8 +1226,9 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
// @remark, use pcr base and ignore the extension
// @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
program_clock_reference_extension = program_clock_reference_base & 0x1ff;
program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
program_clock_reference_extension = pcrv & 0x1ff;
const1_value0 = (pcrv >> 9) & 0x3F;
program_clock_reference_base = (pcrv >> 15) & 0x1ffffffffLL;
}
if (OPCR_flag) {
... ... @@ -1051,8 +1241,9 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
char* pp = NULL;
char* p = stream->data() + stream->pos();
stream->skip(6);
pp = (char*)&original_program_clock_reference_base;
int64_t opcrv = 0;
pp = (char*)&opcrv;
pp[5] = *p++;
pp[4] = *p++;
pp[3] = *p++;
... ... @@ -1062,8 +1253,9 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
// @remark, use pcr base and ignore the extension
// @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
original_program_clock_reference_extension = program_clock_reference_base & 0x1ff;
original_program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
original_program_clock_reference_extension = opcrv & 0x1ff;
const1_value2 = (opcrv >> 9) & 0x3F;
original_program_clock_reference_base = (opcrv >> 15) & 0x1ffffffffLL;
}
if (splicing_point_flag) {
... ... @@ -1104,11 +1296,12 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
return ret;
}
adaptation_field_extension_length = (u_int8_t)stream->read_1bytes();
ltw_flag = stream->read_1bytes();
int8_t ltwfv = stream->read_1bytes();
piecewise_rate_flag = (ltw_flag >> 6) & 0x01;
seamless_splice_flag = (ltw_flag >> 5) & 0x01;
ltw_flag = (ltw_flag >> 7) & 0x01;
piecewise_rate_flag = (ltwfv >> 6) & 0x01;
seamless_splice_flag = (ltwfv >> 5) & 0x01;
ltw_flag = (ltwfv >> 7) & 0x01;
const1_value1 = ltwfv & 0x1F;
if (ltw_flag) {
if (!stream->require(2)) {
... ... @@ -1181,12 +1374,172 @@ int SrsTsAdaptationField::size()
sz += nb_af_ext_reserved;
sz += nb_af_reserved;
adaption_field_length = sz - 1;
return sz;
}
int SrsTsAdaptationField::encode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if (!stream->require(2)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(adaption_field_length);
// When the adaptation_field_control value is '11', the value of the adaptation_field_length shall
// be in the range 0 to 182.
if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret);
return ret;
}
// When the adaptation_field_control value is '10', the value of the adaptation_field_length shall
// be 183.
if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret);
return ret;
}
// no adaptation field.
if (adaption_field_length == 0) {
srs_info("ts: mux af empty.");
return ret;
}
int8_t tmpv = adaptation_field_extension_flag & 0x01;
tmpv |= (discontinuity_indicator << 7) & 0x80;
tmpv |= (random_access_indicator << 6) & 0x40;
tmpv |= (elementary_stream_priority_indicator << 5) & 0x20;
tmpv |= (PCR_flag << 4) & 0x10;
tmpv |= (OPCR_flag << 3) & 0x08;
tmpv |= (splicing_point_flag << 2) & 0x04;
tmpv |= (transport_private_data_flag << 1) & 0x02;
stream->write_1bytes(tmpv);
if (PCR_flag) {
if (!stream->require(6)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af PCR_flag failed. ret=%d", ret);
return ret;
}
char* pp = NULL;
char* p = stream->data() + stream->pos();
stream->skip(6);
// @remark, use pcr base and ignore the extension
// @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
int64_t pcrv = program_clock_reference_extension & 0x1ff;
pcrv |= (const1_value0 << 9) & 0x7E00;
pcrv |= (program_clock_reference_base << 15) & 0x1FFFFFFFF000000;
pp = (char*)&pcrv;
*p++ = pp[5];
*p++ = pp[4];
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
if (OPCR_flag) {
if (!stream->require(6)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: demux af OPCR_flag failed. ret=%d", ret);
return ret;
}
stream->skip(6);
srs_warn("ts: mux af ignore OPCR");
}
if (splicing_point_flag) {
if (!stream->require(1)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af splicing_point_flag failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(splice_countdown);
}
if (transport_private_data_flag) {
if (!stream->require(1)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af transport_private_data_flag failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(transport_private_data_length);
if (transport_private_data_length> 0) {
if (!stream->require(transport_private_data_length)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af transport_private_data_flag failed. ret=%d", ret);
return ret;
}
stream->write_bytes(transport_private_data, transport_private_data_length);
}
}
if (adaptation_field_extension_flag) {
if (!stream->require(2)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af adaptation_field_extension_flag failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(adaptation_field_extension_length);
int8_t ltwfv = const1_value1 & 0x1F;
ltwfv |= (ltw_flag << 7) & 0x80;
ltwfv |= (piecewise_rate_flag << 6) & 0x40;
ltwfv |= (seamless_splice_flag << 5) & 0x20;
stream->write_1bytes(ltwfv);
if (ltw_flag) {
if (!stream->require(2)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af ltw_flag failed. ret=%d", ret);
return ret;
}
stream->skip(2);
srs_warn("ts: mux af ignore ltw");
}
if (piecewise_rate_flag) {
if (!stream->require(3)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af piecewise_rate_flag failed. ret=%d", ret);
return ret;
}
stream->skip(3);
srs_warn("ts: mux af ignore piecewise_rate");
}
if (seamless_splice_flag) {
if (!stream->require(5)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: mux af seamless_splice_flag failed. ret=%d", ret);
return ret;
}
stream->skip(5);
srs_warn("ts: mux af ignore seamless_splice");
}
if (nb_af_ext_reserved) {
stream->skip(nb_af_ext_reserved);
}
}
if (nb_af_reserved) {
stream->skip(nb_af_reserved);
}
srs_info("ts: af parsed, discontinuity=%d random=%d priority=%d PCR=%d OPCR=%d slicing=%d private=%d extension=%d/%d pcr=%"PRId64"/%d opcr=%"PRId64"/%d",
discontinuity_indicator, random_access_indicator, elementary_stream_priority_indicator, PCR_flag, OPCR_flag, splicing_point_flag,
transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base,
program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension);
return ret;
}
... ... @@ -1207,6 +1560,8 @@ SrsTsPayloadPES::SrsTsPayloadPES(SrsTsPacket* p) : SrsTsPayload(p)
nb_stuffings = 0;
nb_bytes = 0;
nb_paddings = 0;
const2bits = 0x02;
const1_value0 = 0x07;
}
SrsTsPayloadPES::~SrsTsPayloadPES()
... ... @@ -1353,28 +1708,28 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
return ret;
}
// 1B
original_or_copy = stream->read_1bytes();
int8_t oocv = stream->read_1bytes();
// 1B
PES_extension_flag = stream->read_1bytes();
int8_t pefv = stream->read_1bytes();
// 1B
PES_header_data_length = stream->read_1bytes();
// position of header start.
int pos_header = stream->pos();
//int8_t const2bits = (original_or_copy >> 6) & 0x03;
PES_scrambling_control = (original_or_copy >> 4) & 0x03;
PES_priority = (original_or_copy >> 3) & 0x01;
data_alignment_indicator = (original_or_copy >> 2) & 0x01;
copyright = (original_or_copy >> 1) & 0x01;
original_or_copy &= 0x01;
PTS_DTS_flags = (PES_extension_flag >> 6) & 0x03;
ESCR_flag = (PES_extension_flag >> 5) & 0x01;
ES_rate_flag = (PES_extension_flag >> 4) & 0x01;
DSM_trick_mode_flag = (PES_extension_flag >> 3) & 0x01;
additional_copy_info_flag = (PES_extension_flag >> 2) & 0x01;
PES_CRC_flag = (PES_extension_flag >> 1) & 0x01;
PES_extension_flag &= 0x01;
const2bits = (oocv >> 6) & 0x03;
PES_scrambling_control = (oocv >> 4) & 0x03;
PES_priority = (oocv >> 3) & 0x01;
data_alignment_indicator = (oocv >> 2) & 0x01;
copyright = (oocv >> 1) & 0x01;
original_or_copy = oocv & 0x01;
PTS_DTS_flags = (pefv >> 6) & 0x03;
ESCR_flag = (pefv >> 5) & 0x01;
ES_rate_flag = (pefv >> 4) & 0x01;
DSM_trick_mode_flag = (pefv >> 3) & 0x01;
additional_copy_info_flag = (pefv >> 2) & 0x01;
PES_CRC_flag = (pefv >> 1) & 0x01;
PES_extension_flag = pefv & 0x01;
// check required together.
int nb_required = 0;
... ... @@ -1462,13 +1817,14 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
// 1B
if (PES_extension_flag) {
PES_extension_flag_2 = stream->read_1bytes();
int8_t efv = stream->read_1bytes();
PES_private_data_flag = (PES_extension_flag_2 >> 7) & 0x01;
pack_header_field_flag = (PES_extension_flag_2 >> 6) & 0x01;
program_packet_sequence_counter_flag = (PES_extension_flag_2 >> 5) & 0x01;
P_STD_buffer_flag = (PES_extension_flag_2 >> 4) & 0x01;
PES_extension_flag_2 &= PES_extension_flag_2 & 0x01;
PES_private_data_flag = (efv >> 7) & 0x01;
pack_header_field_flag = (efv >> 6) & 0x01;
program_packet_sequence_counter_flag = (efv >> 5) & 0x01;
P_STD_buffer_flag = (efv >> 4) & 0x01;
const1_value0 = (efv >> 1) & 0x07;
PES_extension_flag_2 = efv & 0x01;
nb_required = 0;
nb_required += PES_private_data_flag? 16:0;
... ... @@ -1619,8 +1975,9 @@ int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
int SrsTsPayloadPES::size()
{
int sz = 6;
int sz = 0;
PES_header_data_length = 0;
SrsTsPESStreamId sid = (SrsTsPESStreamId)stream_id;
if (sid != SrsTsPESStreamIdProgramStreamMap
... ... @@ -1632,7 +1989,9 @@ int SrsTsPayloadPES::size()
&& sid != SrsTsPESStreamIdDsmccStream
&& sid != SrsTsPESStreamIdH2221TypeE
) {
sz += 6;
sz += 3;
PES_header_data_length = sz;
sz += (PTS_DTS_flags == 0x2)? 5:0;
sz += (PTS_DTS_flags == 0x3)? 10:0;
... ... @@ -1650,6 +2009,7 @@ int SrsTsPayloadPES::size()
sz += P_STD_buffer_flag? 2:0;
sz += PES_extension_flag_2? 1 + PES_extension_field_length:0; // 1+x bytes.
}
PES_header_data_length = sz - PES_header_data_length;
sz += nb_stuffings;
... ... @@ -1673,6 +2033,161 @@ int SrsTsPayloadPES::size()
int SrsTsPayloadPES::encode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
// 6B fixed header.
if (!stream->require(6)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE failed. ret=%d", ret);
return ret;
}
// 3B
stream->write_3bytes(packet_start_code_prefix);
// 1B
stream->write_1bytes(stream_id);
// 2B
// the PES_packet_length is the actual bytes size, the pplv write to ts
// is the actual bytes plus the header size.
int32_t pplv = 0;
if (PES_packet_length > 0) {
pplv = PES_packet_length + 3 + PES_header_data_length;
pplv = (pplv > 0xFFFF)? 0 : pplv;
}
stream->write_2bytes(pplv);
// check the packet start prefix.
packet_start_code_prefix &= 0xFFFFFF;
if (packet_start_code_prefix != 0x01) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret);
return ret;
}
// 3B flags.
if (!stream->require(3)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE flags failed. ret=%d", ret);
return ret;
}
// 1B
int8_t oocv = original_or_copy & 0x01;
oocv |= (const2bits << 6) & 0xC0;
oocv |= (PES_scrambling_control << 4) & 0x30;
oocv |= (PES_priority << 3) & 0x08;
oocv |= (data_alignment_indicator << 2) & 0x04;
oocv |= (copyright << 1) & 0x02;
stream->write_1bytes(oocv);
// 1B
int8_t pefv = PES_extension_flag & 0x01;
pefv |= (PTS_DTS_flags << 6) & 0xC0;
pefv |= (ESCR_flag << 5) & 0x20;
pefv |= (ES_rate_flag << 4) & 0x10;
pefv |= (DSM_trick_mode_flag << 3) & 0x08;
pefv |= (additional_copy_info_flag << 2) & 0x04;
pefv |= (PES_CRC_flag << 1) & 0x02;
stream->write_1bytes(pefv);
// 1B
stream->write_1bytes(PES_header_data_length);
// check required together.
int nb_required = 0;
nb_required += (PTS_DTS_flags == 0x2)? 5:0;
nb_required += (PTS_DTS_flags == 0x3)? 10:0;
nb_required += ESCR_flag? 6:0;
nb_required += ES_rate_flag? 3:0;
nb_required += DSM_trick_mode_flag? 1:0;
nb_required += additional_copy_info_flag? 1:0;
nb_required += PES_CRC_flag? 2:0;
nb_required += PES_extension_flag? 1:0;
if (!stream->require(nb_required)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE payload failed. ret=%d", ret);
return ret;
}
// 5B
if (PTS_DTS_flags == 0x2) {
if ((ret = encode_33bits_dts_pts(stream, 0x02, pts)) != ERROR_SUCCESS) {
return ret;
}
}
// 10B
if (PTS_DTS_flags == 0x3) {
if ((ret = encode_33bits_dts_pts(stream, 0x03, pts)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = encode_33bits_dts_pts(stream, 0x01, dts)) != ERROR_SUCCESS) {
return ret;
}
// check sync, the diff of dts and pts should never greater than 1s.
if (dts - pts > 90000 || pts - dts > 90000) {
srs_warn("ts: sync dts=%"PRId64", pts=%"PRId64, dts, pts);
}
}
// 6B
if (ESCR_flag) {
stream->skip(6);
srs_warn("ts: demux PES, ignore the escr.");
}
// 3B
if (ES_rate_flag) {
stream->skip(3);
srs_warn("ts: demux PES, ignore the ES_rate.");
}
// 1B
if (DSM_trick_mode_flag) {
stream->skip(1);
srs_warn("ts: demux PES, ignore the DSM_trick_mode.");
}
// 1B
if (additional_copy_info_flag) {
stream->skip(1);
srs_warn("ts: demux PES, ignore the additional_copy_info.");
}
// 2B
if (PES_CRC_flag) {
stream->skip(2);
srs_warn("ts: demux PES, ignore the PES_CRC.");
}
// 1B
if (PES_extension_flag) {
int8_t efv = PES_extension_flag_2 & 0x01;
efv |= (PES_private_data_flag << 7) & 0x80;
efv |= (pack_header_field_flag << 6) & 0x40;
efv |= (program_packet_sequence_counter_flag << 5) & 0x20;
efv |= (P_STD_buffer_flag << 4) & 0x10;
efv |= (const1_value0 << 1) & 0xE0;
stream->write_1bytes(efv);
nb_required = 0;
nb_required += PES_private_data_flag? 16:0;
nb_required += pack_header_field_flag? 1+pack_field_length:0; // 1+x bytes.
nb_required += program_packet_sequence_counter_flag? 2:0;
nb_required += P_STD_buffer_flag? 2:0;
nb_required += PES_extension_flag_2? 1+PES_extension_field_length:0; // 1+x bytes.
if (!stream->require(nb_required)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE ext payload failed. ret=%d", ret);
return ret;
}
stream->skip(nb_required);
srs_warn("ts: demux PES, ignore the PES_extension.");
}
// stuffing_byte
if (nb_stuffings) {
stream->skip(nb_stuffings);
srs_warn("ts: demux PES, ignore the stuffings.");
}
return ret;
}
... ... @@ -1737,6 +2252,35 @@ int SrsTsPayloadPES::decode_33bits_dts_pts(SrsStream* stream, int64_t* pv)
return ret;
}
int SrsTsPayloadPES::encode_33bits_dts_pts(SrsStream* stream, u_int8_t fb, int64_t v)
{
int ret = ERROR_SUCCESS;
if (!stream->require(5)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: mux PSE dts/pts failed. ret=%d", ret);
return ret;
}
char* p = stream->data() + stream->pos();
stream->skip(5);
int32_t val;
val = fb << 4 | (((pts >> 30) & 0x07) << 1) | 1;
*p++ = val;
val = (((pts >> 15) & 0x7fff) << 1) | 1;
*p++ = (val >> 8);
*p++ = val;
val = (((pts) & 0x7fff) << 1) | 1;
*p++ = (val >> 8);
*p++ = val;
return ret;
}
SrsTsPayloadPSI::SrsTsPayloadPSI(SrsTsPacket* p) : SrsTsPayload(p)
{
pointer_field = 0;
... ... @@ -2081,8 +2625,14 @@ int SrsTsPayloadPAT::psi_encode(SrsStream* stream)
if ((ret = program->encode(stream)) != ERROR_SUCCESS) {
return ret;
}
// update the apply pid table.
packet->context->set(program->pid, SrsTsPidApplyPMT);
}
// update the apply pid table.
packet->context->set(packet->pid, SrsTsPidApplyPAT);
return ret;
}
... ... @@ -2341,8 +2891,28 @@ int SrsTsPayloadPMT::psi_encode(SrsStream* stream)
if ((ret = info->encode(stream)) != ERROR_SUCCESS) {
return ret;
}
// update the apply pid table
switch (info->stream_type) {
case SrsTsStreamVideoH264:
case SrsTsStreamVideoMpeg4:
packet->context->set(info->elementary_PID, SrsTsPidApplyVideo, info->stream_type);
break;
case SrsTsStreamAudioAAC:
case SrsTsStreamAudioAC3:
case SrsTsStreamAudioDTS:
case SrsTsStreamAudioMp3:
packet->context->set(info->elementary_PID, SrsTsPidApplyAudio, info->stream_type);
break;
default:
srs_warn("ts: drop pid=%#x, stream=%#x", info->elementary_PID, info->stream_type);
break;
}
}
// update the apply pid table.
packet->context->set(packet->pid, SrsTsPidApplyPMT);
return ret;
}
... ... @@ -2387,11 +2957,11 @@ int SrsTSMuxer::update_acodec(SrsCodecAudio ac)
return ERROR_SUCCESS;
}
int SrsTSMuxer::write_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab)
int SrsTSMuxer::write_audio(SrsTsMessage* audio)
{
int ret = ERROR_SUCCESS;
if ((ret = context->encode(writer, af, ab, vcodec, acodec)) != ERROR_SUCCESS) {
if ((ret = context->encode(writer, audio, vcodec, acodec)) != ERROR_SUCCESS) {
srs_error("hls encode audio failed. ret=%d", ret);
return ret;
}
... ... @@ -2400,11 +2970,11 @@ int SrsTSMuxer::write_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab)
return ret;
}
int SrsTSMuxer::write_video(SrsMpegtsFrame* vf, SrsSimpleBuffer* vb)
int SrsTSMuxer::write_video(SrsTsMessage* video)
{
int ret = ERROR_SUCCESS;
if ((ret = context->encode(writer, vf, vb, vcodec, acodec)) != ERROR_SUCCESS) {
if ((ret = context->encode(writer, video, vcodec, acodec)) != ERROR_SUCCESS) {
srs_error("hls encode video failed. ret=%d", ret);
return ret;
}
... ... @@ -2419,143 +2989,45 @@ void SrsTSMuxer::close()
writer->close();
}
SrsTsAacJitter::SrsTsAacJitter()
{
base_pts = 0;
nb_samples = 0;
// TODO: config it, 0 means no adjust
sync_ms = SRS_CONF_DEFAULT_AAC_SYNC;
}
SrsTsAacJitter::~SrsTsAacJitter()
{
}
int64_t SrsTsAacJitter::on_buffer_start(int64_t flv_pts, int sample_rate, int aac_sample_rate)
{
// use sample rate in flv/RTMP.
int flv_sample_rate = flv_sample_rates[sample_rate & 0x03];
// override the sample rate by sequence header
if (aac_sample_rate != __SRS_AAC_SAMPLE_RATE_UNSET) {
flv_sample_rate = aac_sample_rates[aac_sample_rate];
}
// sync time set to 0, donot adjust the aac timestamp.
if (!sync_ms) {
return flv_pts;
}
// @see: ngx_rtmp_hls_audio
// drop the rtmp audio packet timestamp, re-calc it by sample rate.
//
// resample for the tbn of ts is 90000, flv is 1000,
// we will lost timestamp if use audio packet timestamp,
// so we must resample. or audio will corupt in IOS.
int64_t est_pts = base_pts + nb_samples * 90000LL * _SRS_AAC_SAMPLE_SIZE / flv_sample_rate;
int64_t dpts = (int64_t) (est_pts - flv_pts);
if (dpts <= (int64_t) sync_ms * 90 && dpts >= (int64_t) sync_ms * -90) {
srs_info("HLS correct aac pts "
"from %"PRId64" to %"PRId64", base=%"PRId64", nb_samples=%d, sample_rate=%d",
flv_pts, est_pts, nb_samples, flv_sample_rate, base_pts);
nb_samples++;
return est_pts;
}
// resync
srs_trace("HLS aac resync, dpts=%"PRId64", pts=%"PRId64
", base=%"PRId64", nb_samples=%"PRId64", sample_rate=%d",
dpts, flv_pts, base_pts, nb_samples, flv_sample_rate);
base_pts = flv_pts;
nb_samples = 1;
return flv_pts;
}
void SrsTsAacJitter::on_buffer_continue()
{
nb_samples++;
}
SrsTsCache::SrsTsCache()
{
aac_jitter = new SrsTsAacJitter();
ab = new SrsSimpleBuffer();
vb = new SrsSimpleBuffer();
af = new SrsMpegtsFrame();
vf = new SrsMpegtsFrame();
audio_buffer_start_pts = 0;
audio = NULL;
video = NULL;
}
SrsTsCache::~SrsTsCache()
{
srs_freep(aac_jitter);
ab->erase(ab->length());
vb->erase(vb->length());
srs_freep(ab);
srs_freep(vb);
srs_freep(af);
srs_freep(vf);
srs_freep(audio);
srs_freep(video);
}
int SrsTsCache::cache_audio(SrsAvcAacCodec* codec, int64_t pts, SrsCodecSample* sample)
int SrsTsCache::cache_audio(SrsAvcAacCodec* codec, int64_t dts, SrsCodecSample* sample)
{
int ret = ERROR_SUCCESS;
// @remark, always use the orignal pts.
if (ab->length() == 0) {
audio_buffer_start_pts = pts;
// create the ts audio message.
if (!audio) {
audio = new SrsTsMessage();
audio->write_pcr = false;
audio->start_pts = dts;
}
audio->dts = dts;
audio->pts = audio->dts;
audio->sid = SrsTsPESStreamIdAudioCommon;
// must be aac or mp3
SrsCodecAudio acodec = (SrsCodecAudio)codec->audio_codec_id;
srs_assert(acodec == SrsCodecAudioAAC || acodec == SrsCodecAudioMP3);
// cache the aac audio.
// write video to cache.
if (codec->audio_codec_id == SrsCodecAudioAAC) {
// for aac audio, recalc the timestamp by aac jitter.
if (ab->length() == 0) {
pts = aac_jitter->on_buffer_start(pts, sample->sound_rate, codec->aac_sample_rate);
af->dts = af->pts = pts;
af->pid = TS_AUDIO_PID;
af->sid = TS_AUDIO_AAC;
} else {
aac_jitter->on_buffer_continue();
}
// write aac audio to cache.
if ((ret = do_cache_audio(codec, sample)) != ERROR_SUCCESS) {
if ((ret = do_cache_aac(codec, sample)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
// cache the mp3 audio.
if (codec->audio_codec_id == SrsCodecAudioMP3) {
// for mp3 audio, recalc the timestamp by mp3 jitter.
// TODO: FIXME: implements it.
af->dts = af->pts = pts;
af->pid = TS_AUDIO_PID;
af->sid = SrsCodecAudioMP3;
// for mp3, directly write to cache.
// TODO: FIXME: implements it.
for (int i = 0; i < sample->nb_sample_units; i++) {
SrsCodecSampleUnit* sample_unit = &sample->sample_units[i];
ab->append(sample_unit->bytes, sample_unit->size);
} else {
if ((ret = do_cache_mp3(codec, sample)) != ERROR_SUCCESS) {
return ret;
}
}
... ... @@ -2566,21 +3038,40 @@ int SrsTsCache::cache_video(SrsAvcAacCodec* codec, int64_t dts, SrsCodecSample*
{
int ret = ERROR_SUCCESS;
// create the ts video message.
if (!video) {
video = new SrsTsMessage();
video->write_pcr = sample->frame_type == SrsCodecVideoAVCFrameKeyFrame;
video->start_pts = dts;
}
video->dts = dts;
video->pts = video->dts + sample->cts * 90;
video->sid = SrsTsPESStreamIdVideoCommon;
// write video to cache.
if ((ret = do_cache_video(codec, sample)) != ERROR_SUCCESS) {
if ((ret = do_cache_avc(codec, sample)) != ERROR_SUCCESS) {
return ret;
}
vf->dts = dts;
vf->pts = vf->dts + sample->cts * 90;
vf->pid = TS_VIDEO_PID;
vf->sid = TS_VIDEO_AVC;
vf->write_pcr = sample->frame_type == SrsCodecVideoAVCFrameKeyFrame;
return ret;
}
int SrsTsCache::do_cache_mp3(SrsAvcAacCodec* codec, SrsCodecSample* sample)
{
int ret = ERROR_SUCCESS;
// for mp3, directly write to cache.
// TODO: FIXME: implements the ts jitter.
for (int i = 0; i < sample->nb_sample_units; i++) {
SrsCodecSampleUnit* sample_unit = &sample->sample_units[i];
audio->payload->append(sample_unit->bytes, sample_unit->size);
}
return ret;
}
int SrsTsCache::do_cache_audio(SrsAvcAacCodec* codec, SrsCodecSample* sample)
int SrsTsCache::do_cache_aac(SrsAvcAacCodec* codec, SrsCodecSample* sample)
{
int ret = ERROR_SUCCESS;
... ... @@ -2654,14 +3145,14 @@ int SrsTsCache::do_cache_audio(SrsAvcAacCodec* codec, SrsCodecSample* sample)
adts_header[5] |= 0x1f;
// copy to audio buffer
ab->append((const char*)adts_header, sizeof(adts_header));
ab->append(sample_unit->bytes, sample_unit->size);
audio->payload->append((const char*)adts_header, sizeof(adts_header));
audio->payload->append(sample_unit->bytes, sample_unit->size);
}
return ret;
}
int SrsTsCache::do_cache_video(SrsAvcAacCodec* codec, SrsCodecSample* sample)
int SrsTsCache::do_cache_avc(SrsAvcAacCodec* codec, SrsCodecSample* sample)
{
int ret = ERROR_SUCCESS;
... ... @@ -2720,7 +3211,7 @@ int SrsTsCache::do_cache_video(SrsAvcAacCodec* codec, SrsCodecSample* sample)
if (nal_unit_type == 1 || nal_unit_type == 5 || nal_unit_type == 6) {
// for type 6, append a aud with type 9.
vb->append((const char*)aud_nal, sizeof(aud_nal));
video->payload->append((const char*)aud_nal, sizeof(aud_nal));
aud_sent = true;
}
}
... ... @@ -2733,15 +3224,15 @@ int SrsTsCache::do_cache_video(SrsAvcAacCodec* codec, SrsCodecSample* sample)
// @see: ngx_rtmp_hls_append_sps_pps
if (codec->sequenceParameterSetLength > 0) {
// AnnexB prefix, for sps always 4 bytes header
vb->append((const char*)aud_nal, 4);
video->payload->append((const char*)aud_nal, 4);
// sps
vb->append(codec->sequenceParameterSetNALUnit, codec->sequenceParameterSetLength);
video->payload->append(codec->sequenceParameterSetNALUnit, codec->sequenceParameterSetLength);
}
if (codec->pictureParameterSetLength > 0) {
// AnnexB prefix, for pps always 4 bytes header
vb->append((const char*)aud_nal, 4);
video->payload->append((const char*)aud_nal, 4);
// pps
vb->append(codec->pictureParameterSetNALUnit, codec->pictureParameterSetLength);
video->payload->append(codec->pictureParameterSetNALUnit, codec->pictureParameterSetLength);
}
}
... ... @@ -2761,13 +3252,13 @@ int SrsTsCache::do_cache_video(SrsAvcAacCodec* codec, SrsCodecSample* sample)
u_int8_t* end = p + 3;
// first AnnexB prefix is long (4 bytes)
if (vb->length() == 0) {
if (video->payload->length() == 0) {
p = aud_nal;
}
vb->append((const char*)p, end - p);
video->payload->append((const char*)p, end - p);
// sample data
vb->append(sample_unit->bytes, sample_unit->size);
video->payload->append(sample_unit->bytes, sample_unit->size);
}
return ret;
... ... @@ -2858,7 +3349,7 @@ int SrsTsEncoder::write_audio(int64_t timestamp, char* data, int size)
}
// flush if buffer exceed max size.
if (cache->ab->length() > SRS_AUTO_HLS_AUDIO_CACHE_SIZE) {
if (cache->audio->payload->length() > SRS_AUTO_HLS_AUDIO_CACHE_SIZE) {
return flush_video();
}
... ... @@ -2866,7 +3357,7 @@ int SrsTsEncoder::write_audio(int64_t timestamp, char* data, int size)
// in ms, audio delay to flush the audios.
int64_t audio_delay = SRS_CONF_DEFAULT_AAC_DELAY;
// flush if audio delay exceed
if (dts - cache->audio_buffer_start_pts > audio_delay * 90) {
if (dts - cache->audio->start_pts > audio_delay * 90) {
return flush_audio();
}
... ... @@ -2913,12 +3404,12 @@ int SrsTsEncoder::flush_audio()
{
int ret = ERROR_SUCCESS;
if ((ret = muxer->write_audio(cache->af, cache->ab)) != ERROR_SUCCESS) {
if ((ret = muxer->write_audio(cache->audio)) != ERROR_SUCCESS) {
return ret;
}
// write success, clear and free the buffer
cache->ab->erase(cache->ab->length());
// write success, clear and free the ts message.
srs_freep(cache->audio);
return ret;
}
... ... @@ -2927,12 +3418,12 @@ int SrsTsEncoder::flush_video()
{
int ret = ERROR_SUCCESS;
if ((ret = muxer->write_video(cache->vf, cache->vb)) != ERROR_SUCCESS) {
if ((ret = muxer->write_video(cache->video)) != ERROR_SUCCESS) {
return ret;
}
// write success, clear and free the buffer
cache->vb->erase(cache->vb->length());
// write success, clear and free the ts message.
srs_freep(cache->video);
return ret;
}
... ...
... ... @@ -185,6 +185,8 @@ struct SrsTsChannel
SrsTsPidApply apply;
SrsTsStream stream;
SrsTsMessage* msg;
// for encoder.
u_int8_t continuity_counter;
SrsTsChannel();
virtual ~SrsTsChannel();
... ... @@ -209,13 +211,17 @@ enum SrsTsPESStreamId
// ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC
// 14496-3 audio stream number x xxxx
// ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudio
SrsTsPESStreamIdAudio = 0x06, // 0b110
// @remark, use SrsTsPESStreamIdAudioCommon as actually audio, SrsTsPESStreamIdAudio to check whether audio.
SrsTsPESStreamIdAudioChecker = 0x06, // 0b110
SrsTsPESStreamIdAudioCommon = 0xc0,
// 1110 xxxx
// ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC
// 14496-2 video stream number xxxx
// ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo
SrsTsPESStreamIdVideo = 0x0e, // 0b1110
// @remark, use SrsTsPESStreamIdVideoCommon as actually video, SrsTsPESStreamIdVideo to check whether video.
SrsTsPESStreamIdVideoChecker = 0x0e, // 0b1110
SrsTsPESStreamIdVideoCommon = 0xe0,
// ECM_stream
SrsTsPESStreamIdEcmStream = 0xf0, // 0b11110000
... ... @@ -253,9 +259,21 @@ enum SrsTsPESStreamId
class SrsTsMessage
{
public:
// decoder only,
// the ts messgae does not use them,
// for user to get the channel and packet.
SrsTsChannel* channel;
SrsTsPacket* packet;
public:
// the audio cache buffer start pts, to flush audio if full.
// @remark the pts is not the adjust one, it's the orignal pts.
int64_t start_pts;
// whether this message with pcr info,
// generally, the video IDR(I frame, the keyframe of h.264) carray the pcr info.
bool write_pcr;
// whether got discontinuity ts, for example, sequence header changed.
bool discontinuity;
public:
// the timestamp in 90khz
int64_t dts;
int64_t pts;
... ... @@ -269,8 +287,9 @@ public:
// the payload bytes.
SrsSimpleBuffer* payload;
public:
SrsTsMessage(SrsTsChannel* c, SrsTsPacket* p);
SrsTsMessage(SrsTsChannel* c = NULL, SrsTsPacket* p = NULL);
virtual ~SrsTsMessage();
// decoder
public:
/**
* dumps all bytes in stream to ts message.
... ... @@ -361,15 +380,14 @@ public:
public:
/**
* write the PES packet, the video/audio stream.
* @param frame the video/audio frame info.
* @param payload the video/audio payload bytes.
* @param msg the video/audio msg to write to ts.
* @param vc the video codec, write the PAT/PMT table when changed.
* @param ac the audio codec, write the PAT/PMT table when changed.
*/
virtual int encode(SrsFileWriter* writer, SrsMpegtsFrame* frame, SrsSimpleBuffer* payload, SrsCodecVideo vc, SrsCodecAudio ac);
virtual int encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsCodecVideo vc, SrsCodecAudio ac);
private:
virtual int encode_pat_pmt(SrsFileWriter* writer, SrsCodecVideo vcodec, SrsCodecAudio acodec);
virtual int encode_pes(SrsFileWriter* writer, SrsMpegtsFrame* frame, SrsSimpleBuffer* payload);
virtual int encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as);
virtual int encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid);
};
/**
... ... @@ -478,9 +496,22 @@ public:
public:
virtual int size();
virtual int encode(SrsStream* stream);
virtual void padding(int nb_stuffings);
public:
static SrsTsPacket* create_pat(SrsTsContext* context, int16_t pmt_number, int16_t pmt_pid);
static SrsTsPacket* create_pmt(SrsTsContext* context, int16_t pmt_number, int16_t pmt_pid, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as);
static SrsTsPacket* create_pat(SrsTsContext* context,
int16_t pmt_number, int16_t pmt_pid
);
static SrsTsPacket* create_pmt(SrsTsContext* context,
int16_t pmt_number, int16_t pmt_pid, int16_t vpid, SrsTsStream vs,
int16_t apid, SrsTsStream as
);
static SrsTsPacket* create_pes_first(SrsTsContext* context,
int16_t pid, SrsTsPESStreamId sid, u_int8_t continuity_counter, bool discontinuity,
int64_t pcr, int64_t dts, int64_t pts, int size
);
static SrsTsPacket* create_pes_continue(SrsTsContext* context,
int16_t pid, SrsTsPESStreamId sid, u_int8_t continuity_counter
);
};
/**
... ... @@ -627,7 +658,10 @@ public:
* the last bit of the program_clock_reference_base at the input of the system target decoder.
*/
int64_t program_clock_reference_base; //33bits
//6bits reserved.
/**
* 6bits reserved, must be '1'
*/
int8_t const1_value0; // 6bits
int16_t program_clock_reference_extension; //9bits
// if OPCR_flag, 6B
... ... @@ -646,7 +680,10 @@ public:
* in the original single program Transport Stream.
*/
int64_t original_program_clock_reference_base; //33bits
//6bits reserved.
/**
* 6bits reserved, must be '1'
*/
int8_t const1_value2; // 6bits
int16_t original_program_clock_reference_extension; //9bits
// if splicing_point_flag, 1B
... ... @@ -711,7 +748,10 @@ public:
* constraints indicated by the splice_type value.
*/
int8_t seamless_splice_flag; //1bit
//5bits reserved
/**
* reserved 5bits, must be '1'
*/
int8_t const1_value1; //5bits
// if ltw_flag, 2B
/**
* (legal time window_valid_flag) - This is a 1-bit field which when set to '1' indicates that the value of the
... ... @@ -856,6 +896,7 @@ public:
* elementary stream type as defined in Table 2-18. In Transport Streams, the elementary stream type is specified in the
* Program Specific Information as specified in 2.4.4.
*/
// @see SrsTsPESStreamId, value can be SrsTsPESStreamIdAudioCommon or SrsTsPESStreamIdVideoCommon.
u_int8_t stream_id; //8bits
// 2B
/**
... ... @@ -866,7 +907,10 @@ public:
u_int16_t PES_packet_length; //16bits
// 1B
// 2bits const '10'
/**
* 2bits const '10'
*/
int8_t const2bits; //2bits
/**
* The 2-bit PES_scrambling_control field indicates the scrambling mode of the PES packet
* payload. When scrambling is performed at the PES level, the PES packet header, including the optional fields when
... ... @@ -1068,7 +1112,10 @@ public:
* PES header.
*/
int8_t P_STD_buffer_flag; //1bit
// reserved 3bits
/**
* reverved value, must be '1'
*/
int8_t const1_value0; //3bits
/**
* A 1-bit field which when set to '1' indicates the presence of the PES_extension_field_length
* field and associated fields. When set to a value of '0' this indicates that the PES_extension_field_length field and any
... ... @@ -1179,6 +1226,7 @@ public:
virtual int encode(SrsStream* stream);
private:
virtual int decode_33bits_dts_pts(SrsStream* stream, int64_t* pv);
virtual int encode_33bits_dts_pts(SrsStream* stream, u_int8_t fb, int64_t v);
};
/**
... ... @@ -1516,14 +1564,12 @@ public:
virtual int update_acodec(SrsCodecAudio ac);
/**
* write an audio frame to ts,
* @remark write PSI first when not write yet.
*/
virtual int write_audio(SrsMpegtsFrame* af, SrsSimpleBuffer* ab);
virtual int write_audio(SrsTsMessage* audio);
/**
* write a video frame to ts,
* @remark write PSI first when not write yet.
*/
virtual int write_video(SrsMpegtsFrame* vf, SrsSimpleBuffer* vb);
virtual int write_video(SrsTsMessage* video);
/**
* close the writer.
*/
... ... @@ -1531,38 +1577,6 @@ public:
};
/**
* jitter correct for audio,
* the sample rate 44100/32000 will lost precise,
* when mp4/ts(tbn=90000) covert to flv/rtmp(1000),
* so the Hls on ipad or iphone will corrupt,
* @see nginx-rtmp: est_pts
*/
class SrsTsAacJitter
{
private:
int64_t base_pts;
int64_t nb_samples;
int sync_ms;
public:
SrsTsAacJitter();
virtual ~SrsTsAacJitter();
/**
* when buffer start, calc the "correct" pts for ts,
* @param flv_pts, the flv pts calc from flv header timestamp,
* @param sample_rate, the sample rate in format(flv/RTMP packet header).
* @param aac_sample_rate, the sample rate in codec(sequence header).
* @return the calc correct pts.
*/
virtual int64_t on_buffer_start(int64_t flv_pts, int sample_rate, int aac_sample_rate);
/**
* when buffer continue, muxer donot write to file,
* the audio buffer continue grow and donot need a pts,
* for the ts audio PES packet only has one pts at the first time.
*/
virtual void on_buffer_continue();
};
/**
* ts stream cache,
* use to cache ts stream.
*
... ... @@ -1575,18 +1589,9 @@ public:
class SrsTsCache
{
public:
// current frame and buffer
SrsMpegtsFrame* af;
SrsSimpleBuffer* ab;
SrsMpegtsFrame* vf;
SrsSimpleBuffer* vb;
public:
// the audio cache buffer start pts, to flush audio if full.
// @remark the pts is not the adjust one, it's the orignal pts.
int64_t audio_buffer_start_pts;
protected:
// time jitter for aac
SrsTsAacJitter* aac_jitter;
// current ts message.
SrsTsMessage* audio;
SrsTsMessage* video;
public:
SrsTsCache();
virtual ~SrsTsCache();
... ... @@ -1594,14 +1599,15 @@ public:
/**
* write audio to cache
*/
virtual int cache_audio(SrsAvcAacCodec* codec, int64_t pts, SrsCodecSample* sample);
virtual int cache_audio(SrsAvcAacCodec* codec, int64_t dts, SrsCodecSample* sample);
/**
* write video to muxer.
*/
virtual int cache_video(SrsAvcAacCodec* codec, int64_t dts, SrsCodecSample* sample);
private:
virtual int do_cache_audio(SrsAvcAacCodec* codec, SrsCodecSample* sample);
virtual int do_cache_video(SrsAvcAacCodec* codec, SrsCodecSample* sample);
virtual int do_cache_mp3(SrsAvcAacCodec* codec, SrsCodecSample* sample);
virtual int do_cache_aac(SrsAvcAacCodec* codec, SrsCodecSample* sample);
virtual int do_cache_avc(SrsAvcAacCodec* codec, SrsCodecSample* sample);
};
/**
... ...