winlin

refine code, remove the base dts, donot drop start BP frames

... ... @@ -151,7 +151,6 @@ int SrsClient::do_cycle()
// find a source to publish.
SrsSource* source = SrsSource::find(req->get_stream_url());
srs_assert(source != NULL);
SrsHLS* hls = source->get_hls();
bool enabled_cache = true;
conf = config->get_gop_cache(req->vhost);
... ... @@ -182,7 +181,6 @@ int SrsClient::do_cycle()
}
srs_info("start to publish stream %s success", req->stream.c_str());
ret = publish(source, true);
hls->on_unpublish();
source->on_unpublish();
return ret;
}
... ... @@ -195,7 +193,6 @@ int SrsClient::do_cycle()
}
srs_info("flash start to publish stream %s success", req->stream.c_str());
ret = publish(source, false);
hls->on_unpublish();
source->on_unpublish();
return ret;
}
... ... @@ -333,10 +330,9 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
srs_verbose("check publish_refer success.");
SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
SrsHLS* hls = source->get_hls();
// notify the hls to prepare when publish start.
if ((ret = hls->on_publish(req->vhost)) != ERROR_SUCCESS) {
if ((ret = source->on_publish(req->vhost)) != ERROR_SUCCESS) {
srs_error("hls on_publish failed. ret=%d", ret);
return ret;
}
... ... @@ -362,7 +358,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
(int)(srs_get_system_time_ms()/1000), pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
}
if ((ret = process_publish_message(source, hls, msg, is_fmle)) != ERROR_SUCCESS) {
if ((ret = process_publish_message(source, msg, is_fmle)) != ERROR_SUCCESS) {
srs_error("process publish message failed. ret=%d", ret);
return ret;
}
... ... @@ -371,16 +367,12 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
return ret;
}
int SrsClient::process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommonMessage* msg, bool is_fmle)
int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle)
{
int ret = ERROR_SUCCESS;
// process audio packet
if (msg->header.is_audio()) {
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("hls process audio message failed. ret=%d", ret);
return ret;
}
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
... ... @@ -388,10 +380,6 @@ int SrsClient::process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommon
}
// process video packet
if (msg->header.is_video()) {
if ((ret = hls->on_video(msg)) != ERROR_SUCCESS) {
srs_error("hls process video message failed. ret=%d", ret);
return ret;
}
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
... ... @@ -408,10 +396,6 @@ int SrsClient::process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommon
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = hls->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("hls process onMetaData message failed. ret=%d", ret);
return ret;
}
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
... ...
... ... @@ -39,7 +39,6 @@ class SrsSource;
class SrsRefer;
class SrsConsumer;
class SrsCommonMessage;
class SrsHLS;
/**
* the client provides the main logic control for RTMP clients.
... ... @@ -61,7 +60,7 @@ private:
virtual int check_vhost();
virtual int playing(SrsSource* source);
virtual int publish(SrsSource* source, bool is_fmle);
virtual int process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommonMessage* msg, bool is_fmle);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle);
virtual int get_peer_ip();
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
};
... ...
... ... @@ -281,6 +281,24 @@ u_int8_t mpegts_header[] = {
// 63000: 700ms, ts_tbn=90000
#define SRS_HLS_DELAY 63000
// @see: ngx_rtmp_SrsMpegtsFrame_t
struct SrsMpegtsFrame
{
int64_t pts;
int64_t dts;
int pid;
int sid;
int cc;
bool key;
SrsMpegtsFrame()
{
pts = dts = 0;
pid = sid = cc = 0;
key = false;
}
};
// @see: ngx_rtmp_mpegts.c
// TODO: support full mpegts feature in future.
class SrsMpegtsWriter
... ... @@ -298,7 +316,7 @@ public:
return ret;
}
static int write_frame(int fd, mpegts_frame* frame, SrsCodecBuffer* buffer)
static int write_frame(int fd, SrsMpegtsFrame* frame, SrsCodecBuffer* buffer)
{
int ret = ERROR_SUCCESS;
... ... @@ -512,13 +530,11 @@ SrsTSMuxer::SrsTSMuxer()
{
fd = -1;
// ffmpeg set the start time to the delay time.
base_dts = SRS_HLS_DELAY;
audio_buffer = new SrsCodecBuffer();
video_buffer = new SrsCodecBuffer();
got_iframe = false;
audio_frame = new SrsMpegtsFrame();
video_frame = new SrsMpegtsFrame();
}
SrsTSMuxer::~SrsTSMuxer()
... ... @@ -530,6 +546,9 @@ SrsTSMuxer::~SrsTSMuxer()
srs_freep(audio_buffer);
srs_freep(video_buffer);
srs_freep(audio_frame);
srs_freep(video_frame);
}
int SrsTSMuxer::open(std::string _path)
... ... @@ -560,9 +579,12 @@ int SrsTSMuxer::write_audio(u_int32_t time, SrsCodec* codec, SrsCodecSample* sam
{
int ret = ERROR_SUCCESS;
audio_frame.dts = audio_frame.pts = base_dts + time * 90;
audio_frame.pid = TS_AUDIO_PID;
audio_frame.sid = TS_AUDIO_AAC;
if (!audio_frame) {
audio_frame = new SrsMpegtsFrame();
audio_frame->dts = audio_frame->pts = time * 90;
audio_frame->pid = TS_AUDIO_PID;
audio_frame->sid = TS_AUDIO_AAC;
}
for (int i = 0; i < sample->nb_buffers; i++) {
SrsCodecBuffer* buf = &sample->buffers[i];
... ... @@ -631,18 +653,11 @@ int SrsTSMuxer::write_video(u_int32_t time, SrsCodec* codec, SrsCodecSample* sam
{
int ret = ERROR_SUCCESS;
video_frame.dts = base_dts + time * 90;
video_frame.pts = video_frame.dts + sample->cts * 90;
video_frame.pid = TS_VIDEO_PID;
video_frame.sid = TS_VIDEO_AVC;
video_frame.key = sample->frame_type == SrsCodecVideoAVCFrameKeyFrame;
if (video_frame.key) {
got_iframe = true;
}
if (!got_iframe) {
return ret;
}
video_frame->dts = time * 90;
video_frame->pts = video_frame->dts + sample->cts * 90;
video_frame->pid = TS_VIDEO_PID;
video_frame->sid = TS_VIDEO_AVC;
video_frame->key = sample->frame_type == SrsCodecVideoAVCFrameKeyFrame;
static u_int8_t aud_nal[] = { 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0 };
video_buffer->append(aud_nal, sizeof(aud_nal));
... ... @@ -669,8 +684,12 @@ int SrsTSMuxer::write_video(u_int32_t time, SrsCodec* codec, SrsCodecSample* sam
if (nal_unit_type == 1) {
sps_pps_sent = false;
}
// 5: Coded slice of an IDR picture
// 5: Coded slice of an IDR picture.
// insert sps/pps before IDR or key frame is ok.
if (nal_unit_type == 5 && !sps_pps_sent) {
//if (video_frame->key && !sps_pps_sent) {
sps_pps_sent = true;
// ngx_rtmp_hls_append_sps_pps
if (codec->sequenceParameterSetLength > 0) {
// AnnexB prefix
... ... @@ -700,12 +719,14 @@ int SrsTSMuxer::write_video(u_int32_t time, SrsCodec* codec, SrsCodecSample* sam
video_buffer->append(buf->bytes, buf->size);
}
if ((ret = SrsMpegtsWriter::write_frame(fd, &video_frame, video_buffer)) != ERROR_SUCCESS) {
if ((ret = SrsMpegtsWriter::write_frame(fd, video_frame, video_buffer)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsMpegtsWriter::write_frame(fd, &audio_frame, audio_buffer)) != ERROR_SUCCESS) {
if ((ret = SrsMpegtsWriter::write_frame(fd, audio_frame, audio_buffer)) != ERROR_SUCCESS) {
return ret;
}
srs_freep(audio_frame);
return ret;
}
... ...
... ... @@ -35,6 +35,7 @@ class SrsOnMetaDataPacket;
class SrsCommonMessage;
class SrsCodecSample;
class SrsCodecBuffer;
class SrsMpegtsFrame;
class SrsTSMuxer;
class SrsCodec;
... ... @@ -57,35 +58,15 @@ public:
virtual int on_video(SrsCommonMessage* video);
};
// @see: ngx_rtmp_mpegts_frame_t
struct mpegts_frame
{
int64_t pts;
int64_t dts;
int pid;
int sid;
int cc;
bool key;
mpegts_frame()
{
pts = dts = 0;
pid = sid = cc = 0;
key = false;
}
};
class SrsTSMuxer
{
private:
int fd;
std::string path;
private:
bool got_iframe;
int64_t base_dts;
mpegts_frame audio_frame;
SrsMpegtsFrame* audio_frame;
SrsCodecBuffer* audio_buffer;
mpegts_frame video_frame;
SrsMpegtsFrame* video_frame;
SrsCodecBuffer* video_buffer;
public:
SrsTSMuxer();
... ...
... ... @@ -266,15 +266,15 @@ SrsSource::~SrsSource()
srs_freep(hls);
}
SrsHLS* SrsSource::get_hls()
{
return hls;
}
int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
if ((ret = hls->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("hls process onMetaData message failed. ret=%d", ret);
return ret;
}
metadata->metadata->set("server", new SrsAmf0String(
RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
... ... @@ -336,6 +336,11 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
{
int ret = ERROR_SUCCESS;
if ((ret = hls->on_audio(audio)) != ERROR_SUCCESS) {
srs_error("hls process audio message failed. ret=%d", ret);
return ret;
}
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
SrsAutoFree(SrsSharedPtrMessage, msg, false);
if ((ret = msg->initialize(audio, (char*)audio->payload, audio->size)) != ERROR_SUCCESS) {
... ... @@ -381,6 +386,11 @@ int SrsSource::on_video(SrsCommonMessage* video)
{
int ret = ERROR_SUCCESS;
if ((ret = hls->on_video(video)) != ERROR_SUCCESS) {
srs_error("hls process video message failed. ret=%d", ret);
return ret;
}
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
SrsAutoFree(SrsSharedPtrMessage, msg, false);
if ((ret = msg->initialize(video, (char*)video->payload, video->size)) != ERROR_SUCCESS) {
... ... @@ -422,8 +432,15 @@ int SrsSource::on_video(SrsCommonMessage* video)
return ret;
}
int SrsSource::on_publish(std::string vhost)
{
return hls->on_publish(vhost);
}
void SrsSource::on_unpublish()
{
hls->on_unpublish();
clear_gop_cache();
srs_freep(cache_metadata);
... ...
... ... @@ -143,15 +143,10 @@ public:
SrsSource(std::string _stream_url);
virtual ~SrsSource();
public:
/**
* get the hls handler, which has a long lifecycle
* util the source destroyed.
*/
virtual SrsHLS* get_hls();
public:
virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsCommonMessage* audio);
virtual int on_video(SrsCommonMessage* video);
virtual int on_publish(std::string vhost);
virtual void on_unpublish();
public:
virtual int create_consumer(SrsConsumer*& consumer);
... ...