winlin

support hls ingest, fix the bugs.

... ... @@ -169,6 +169,23 @@ int SrsTsMessage::stream_number()
return -1;
}
SrsTsMessage* SrsTsMessage::detach()
{
// @remark the packet cannot be used, but channel is ok.
SrsTsMessage* cp = new SrsTsMessage(channel, NULL);
cp->start_pts = start_pts;
cp->write_pcr = write_pcr;
cp->is_discontinuity = is_discontinuity;
cp->dts = dts;
cp->pts = pts;
cp->sid = sid;
cp->PES_packet_length = PES_packet_length;
cp->continuity_counter = continuity_counter;
cp->payload = payload;
payload = NULL;
return cp;
}
ISrsTsHandler::ISrsTsHandler()
{
}
... ...
... ... @@ -309,6 +309,13 @@ public:
* @return the stream number for audio/video; otherwise, -1.
*/
virtual int stream_number();
public:
/**
* detach the ts message,
* for user maybe need to parse the message by queue.
* @remark we always use the payload of original message.
*/
virtual SrsTsMessage* detach();
};
/**
... ...
... ... @@ -23,8 +23,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <stdlib.h>
#include <string>
#include <vector>
#include <map>
using namespace std;
#include <srs_kernel_error.hpp>
... ... @@ -148,7 +150,7 @@ private:
dirty = false;
}
int fetch(std::string m3u8, SrsHttpClient* client);
int fetch(std::string m3u8);
};
private:
SrsHttpUri* in_hls;
... ... @@ -196,7 +198,7 @@ private:
/**
* fetch all ts body.
*/
virtual void fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client);
virtual void fetch_all_ts(bool fresh_m3u8);
/**
* remove all ts which is dirty.
*/
... ... @@ -209,6 +211,7 @@ int SrsIngestSrsInput::connect()
int64_t now = srs_update_system_time_ms();
if (now < next_connect_time) {
srs_trace("input hls wait for %dms", next_connect_time - now);
st_usleep((next_connect_time - now) * 1000);
}
... ... @@ -328,7 +331,7 @@ int SrsIngestSrsInput::connect()
}
// fetch all ts.
fetch_all_ts(fresh_m3u8, &client);
fetch_all_ts(fresh_m3u8);
// remove all dirty ts.
remove_dirty();
... ... @@ -344,12 +347,19 @@ int SrsIngestSrsInput::parse(ISrsTsHandler* handler)
for (int i = 0; i < (int)pieces.size(); i++) {
SrsTsPiece* tp = pieces.at(i);
// sent only once.
if (tp->sent) {
continue;
}
tp->sent = true;
if (tp->body.empty()) {
continue;
}
srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration);
// use stream to parse ts packet.
int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE;
for (int i = 0; i < nb_packet; i++) {
... ... @@ -360,6 +370,12 @@ int SrsIngestSrsInput::parse(ISrsTsHandler* handler)
// process each ts packet
if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) {
// when peer closed, must interrupt parse and reconnect.
if (srs_is_client_gracefully_close(ret)) {
srs_warn("interrupt parse for peer closed. ret=%d", ret);
return ret;
}
srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);
continue;
}
... ... @@ -392,7 +408,7 @@ void SrsIngestSrsInput::dirty_all_ts()
}
}
void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client)
void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8)
{
int ret = ERROR_SUCCESS;
... ... @@ -410,17 +426,16 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client)
continue;
}
if ((ret = tp->fetch(in_hls->get_url(), client)) != ERROR_SUCCESS) {
if ((ret = tp->fetch(in_hls->get_url())) != ERROR_SUCCESS) {
srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret);
tp->skip = true;
continue;
}
// set the next connect time.
if (next_connect_time <= 0) {
next_connect_time = srs_update_system_time_ms();
// only wait for a duration of last piece.
if (i == pieces.size() - 1) {
next_connect_time = srs_update_system_time_ms() + (int)tp->duration * 1000;
}
next_connect_time += (int)tp->duration * 1000;
}
}
... ... @@ -432,6 +447,7 @@ void SrsIngestSrsInput::remove_dirty()
SrsTsPiece* tp = *it;
if (tp->dirty) {
srs_trace("erase dirty ts, url=%s, duration=%.2f", tp->url.c_str(), tp->duration);
srs_freep(tp);
it = pieces.erase(it);
} else {
... ... @@ -440,7 +456,7 @@ void SrsIngestSrsInput::remove_dirty()
}
}
int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client)
int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8)
{
int ret = ERROR_SUCCESS;
... ... @@ -450,8 +466,7 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client)
size_t pos = string::npos;
bool use_abs_client = false;
SrsHttpClient abs_client;
SrsHttpClient client;
std::string ts_url = url;
if (!srs_string_starts_with(ts_url, "http://")) {
... ... @@ -460,10 +475,6 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client)
baseurl = m3u8.substr(0, pos);
}
ts_url = baseurl + "/" + url;
// use fresh client for absolute url.
client = &abs_client;
use_abs_client = true;
}
SrsHttpUri uri;
... ... @@ -472,12 +483,12 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client)
}
// initialize the fresh http client.
if (use_abs_client && (ret = client->initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) {
if ((ret = client.initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) {
return ret;
}
SrsHttpMessage* msg = NULL;
if ((ret = client->get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
if ((ret = client.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
srs_error("HTTP GET %s failed. ret=%d", uri.get_url(), ret);
return ret;
}
... ... @@ -501,6 +512,9 @@ class SrsIngestSrsOutput : public ISrsTsHandler
private:
SrsHttpUri* out_rtmp;
private:
bool disconnected;
std::multimap<int64_t, SrsTsMessage*> queue;
private:
SrsRequest* req;
st_netfd_t stfd;
SrsStSocket* io;
... ... @@ -519,6 +533,7 @@ private:
public:
SrsIngestSrsOutput(SrsHttpUri* rtmp) {
out_rtmp = rtmp;
disconnected = false;
req = NULL;
io = NULL;
... ... @@ -537,14 +552,22 @@ public:
srs_freep(avc);
srs_freep(aac);
std::multimap<int64_t, SrsTsMessage*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsTsMessage* msg = it->second;
srs_freep(msg);
}
queue.clear();
}
// interface ISrsTsHandler
public:
virtual int on_ts_message(SrsTsMessage* msg);
private:
virtual int parse_message_queue();
virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs);
virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts);
virtual int write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts);
virtual int write_h264_ipb_frame(std::string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts);
virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs);
virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts);
private:
... ... @@ -554,6 +577,10 @@ public:
* connect to output rtmp server.
*/
virtual int connect();
/**
* flush the message queue when all ts parsed.
*/
virtual int flush_message_queue();
private:
virtual int connect_app(std::string ep_server, std::string ep_port);
// close the connected io and rtmp to ready to be re-connect.
... ... @@ -601,7 +628,7 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
// 14496-2 video stream number xxxx
// ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo
srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
srs_info("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
(msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(),
msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid,
msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
... ... @@ -621,41 +648,120 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
return ret;
}
// parse the stream.
SrsStream avs;
if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
srs_error("mpegts: initialize av stream failed. ret=%d", ret);
// we must use queue to cache the msg, then parse it if possible.
queue.insert(std::make_pair(msg->dts, msg->detach()));
if ((ret = parse_message_queue()) != ERROR_SUCCESS) {
// when peer closed, close the output and reconnect.
if (srs_is_client_gracefully_close(ret)) {
close();
}
return ret;
}
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
return on_ts_video(msg, &avs);
return ret;
}
int SrsIngestSrsOutput::parse_message_queue()
{
int ret = ERROR_SUCCESS;
int nb_videos = 0;
int nb_audios = 0;
std::multimap<int64_t, SrsTsMessage*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsTsMessage* msg = it->second;
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
nb_videos++;
} else {
nb_audios++;
}
}
// always wait 2+ videos, to left one video in the queue.
// TODO: FIXME: support pure audio hls.
if (nb_videos <= 1) {
return ret;
}
if (msg->channel->stream == SrsTsStreamAudioAAC) {
return on_ts_audio(msg, &avs);
// parse messages util the last video.
while (nb_videos > 1 && queue.size() > 0) {
std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
SrsTsMessage* msg = it->second;
if (msg->channel->stream == SrsTsStreamVideoH264) {
nb_videos--;
}
queue.erase(it);
// parse the stream.
SrsStream avs;
if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
srs_error("mpegts: initialize av stream failed. ret=%d", ret);
return ret;
}
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) {
return ret;
}
}
if (msg->channel->stream == SrsTsStreamAudioAAC) {
if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) {
return ret;
}
}
}
// TODO: FIXME: implements it.
return ret;
}
int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
int SrsIngestSrsOutput::flush_message_queue()
{
int ret = ERROR_SUCCESS;
// ensure rtmp connected.
if ((ret = connect()) != ERROR_SUCCESS) {
return ret;
// parse messages util the last video.
while (!queue.empty()) {
std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
SrsTsMessage* msg = it->second;
queue.erase(it);
// parse the stream.
SrsStream avs;
if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
srs_error("mpegts: initialize av stream failed. ret=%d", ret);
return ret;
}
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) {
return ret;
}
}
if (msg->channel->stream == SrsTsStreamAudioAAC) {
if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) {
return ret;
}
}
}
return ret;
}
int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
{
int ret = ERROR_SUCCESS;
// ts tbn to flv tbn.
u_int32_t dts = (u_int32_t)(msg->dts / 90);
u_int32_t pts = (u_int32_t)(msg->dts / 90);
// the whole ts pes video packet must be a flv frame packet.
char* ibpframe = avs->data() + avs->pos();
int ibpframe_size = avs->size() - avs->pos();
std::string ibps;
SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame;
// send each frame.
while (!avs->empty()) {
... ... @@ -665,10 +771,18 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
return ret;
}
// ignore invalid frame,
// * atleast 1bytes for SPS to decode the type
// * ignore the auth bytes '09f0'
if (frame_size <= 2) {
// 5bits, 7.3.1 NAL unit syntax,
// H.264-AVC-ISO_IEC_14496-10.pdf, page 44.
// 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
// for IDR frame, the frame is keyframe.
if (nal_unit_type == SrsAvcNaluTypeIDR) {
frame_type = SrsCodecVideoAVCFrameKeyFrame;
}
// ignore the nalu type aud(9)
if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) {
continue;
}
... ... @@ -684,10 +798,6 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
}
h264_sps_changed = true;
h264_sps = sps;
if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
return ret;
}
continue;
}
... ... @@ -703,27 +813,45 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
}
h264_pps_changed = true;
h264_pps = pps;
if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
return ret;
}
continue;
}
break;
// ibp frame.
std::string ibp;
if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) {
return ret;
}
ibps.append(ibp);
}
if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
return ret;
}
// ibp frame.
srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", ibpframe_size, dts);
return write_h264_ipb_frame(ibpframe, ibpframe_size, dts, pts);
if ((ret = write_h264_ipb_frame(ibps, frame_type, dts, pts)) != ERROR_SUCCESS) {
// drop the ts message.
if (ret == ERROR_H264_DROP_BEFORE_SPS_PPS) {
return ERROR_SUCCESS;
}
return ret;
}
return ret;
}
int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
{
int ret = ERROR_SUCCESS;
// only send when both sps and pps changed.
if (!h264_sps_changed || !h264_pps_changed) {
// when sps or pps changed, update the sequence header,
// for the pps maybe not changed while sps changed.
// so, we must check when each video ts message frame parsed.
if (h264_sps_pps_sent && !h264_sps_changed && !h264_pps_changed) {
return ret;
}
// when not got sps/pps, wait.
if (h264_pps.empty() || h264_sps.empty()) {
return ret;
}
... ... @@ -752,11 +880,12 @@ int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
h264_sps_changed = false;
h264_pps_changed = false;
h264_sps_pps_sent = true;
srs_trace("hls: h264 sps/pps sent, sps=%dB, pps=%dB", h264_sps.length(), h264_pps.length());
return ret;
}
int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts)
int SrsIngestSrsOutput::write_h264_ipb_frame(string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts)
{
int ret = ERROR_SUCCESS;
... ... @@ -766,26 +895,10 @@ int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int3
return ERROR_H264_DROP_BEFORE_SPS_PPS;
}
// 5bits, 7.3.1 NAL unit syntax,
// H.264-AVC-ISO_IEC_14496-10.pdf, page 44.
// 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
// for IDR frame, the frame is keyframe.
SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame;
if (nal_unit_type == SrsAvcNaluTypeIDR) {
frame_type = SrsCodecVideoAVCFrameKeyFrame;
}
std::string ibp;
if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) {
return ret;
}
int8_t avc_packet_type = SrsCodecVideoAVCTypeNALU;
char* flv = NULL;
int nb_flv = 0;
if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
if ((ret = avc->mux_avc2flv(ibps, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -798,14 +911,17 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
{
int ret = ERROR_SUCCESS;
// ensure rtmp connected.
if ((ret = connect()) != ERROR_SUCCESS) {
return ret;
}
// ts tbn to flv tbn.
u_int32_t dts = (u_int32_t)(msg->dts / 90);
// got the next video to calc the delta duration for each audio.
u_int32_t duration = 0;
if (!queue.empty()) {
SrsTsMessage* nm = queue.begin()->second;
duration = (u_int32_t)(srs_max(0, nm->dts - msg->dts) / 90);
}
u_int32_t max_dts = dts + duration;
// send each frame.
while (!avs->empty()) {
char* frame = NULL;
... ... @@ -842,6 +958,10 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) {
return ret;
}
// calc the delta of dts, when previous frame output.
u_int32_t delta = duration / (msg->payload->length() / frame_size);
dts = (u_int32_t)(srs_min(max_dts, dts + delta));
}
return ret;
... ... @@ -890,6 +1010,8 @@ int SrsIngestSrsOutput::connect()
return ret;
}
srs_trace("connect output=%s", out_rtmp->get_url());
// parse uri
if (!req) {
req = new SrsRequest();
... ... @@ -996,6 +1118,9 @@ int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port)
void SrsIngestSrsOutput::close()
{
srs_trace("close output=%s", out_rtmp->get_url());
h264_sps_pps_sent = false;
srs_freep(client);
srs_freep(io);
srs_freep(req);
... ... @@ -1035,6 +1160,11 @@ public:
return ret;
}
if ((ret = oc->flush_message_queue()) != ERROR_SUCCESS) {
srs_warn("flush oc message failed. ret=%d", ret);
return ret;
}
return ret;
}
};
... ...