winlin

for #742, refine source, meta and origin hub. 3.0.16

... ... @@ -184,6 +184,7 @@ Please select your language:
### V3 changes
* v3.0, 2017-01-19, for [#742][bug #742] refine source, meta and origin hub. 3.0.16
* v3.0, 2017-01-17, for [#742][bug #742] refine source, timeout, live cycle. 3.0.15
* v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14
* v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13
... ... @@ -872,13 +873,13 @@ Remark:
| Input | SRS(Simple RTMP Server) | Output |
+----------------------+-------------------------+----------------+
| Encoder(1) | +-> RTMP/HDS --------+-> Flash player |
| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ---------+-> M3u8 player |
| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ---------+-> M3U8 player |
| Flash,XSPLIT, | +-> FLV/MP3/Aac/Ts ---+-> HTTP player |
| ......) | +-> Fowarder ---------+-> RTMP server |
| | +-> Transcoder -------+-> RTMP server |
| | +-> EXEC(5) ----------+-> External app |
| | +-> DVR --------------+-> Flv file |
| | +-> BandwidthTest ----+-> flash |
| | +-> DVR --------------+-> FLV file |
| | +-> BandwidthTest ----+-> Flash |
+----------------------+ | |
| MediaSource(2) | | |
| (RTSP,FILE, | | |
... ...
... ... @@ -970,7 +970,7 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
SrsDvr::SrsDvr()
{
source = NULL;
hub = NULL;
plan = NULL;
req = NULL;
actived = false;
... ... @@ -985,12 +985,12 @@ SrsDvr::~SrsDvr()
srs_freep(plan);
}
int SrsDvr::initialize(SrsSource* s, SrsRequest* r)
int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
req = r;
source = s;
hub = h;
SrsConfDirective* conf = _srs_config->get_dvr_apply(r->vhost);
actived = srs_config_apply_filter(conf, r);
... ... @@ -1018,7 +1018,7 @@ int SrsDvr::on_publish(bool fetch_sequence_header)
return ret;
}
if (fetch_sequence_header && (ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) {
if (fetch_sequence_header && (ret = hub->on_dvr_request_sh()) != ERROR_SUCCESS) {
return ret;
}
... ...
... ... @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#ifdef SRS_AUTO_DVR
class SrsSource;
class SrsOriginHub;
class SrsRequest;
class SrsBuffer;
class SrsRtmpJitter;
... ... @@ -300,7 +301,7 @@ private:
class SrsDvr : public ISrsReloadHandler
{
private:
SrsSource* source;
SrsOriginHub* hub;
SrsDvrPlan* plan;
SrsRequest* req;
private:
... ... @@ -317,7 +318,7 @@ public:
* when system initialize(encoder publish at first time, or reload),
* initialize the dvr will reinitialize the plan, the whole dvr framework.
*/
virtual int initialize(SrsSource* s, SrsRequest* r);
virtual int initialize(SrsOriginHub* h, SrsRequest* r);
/**
* publish stream event,
* when encoder start to publish RTMP stream.
... ...
... ... @@ -50,9 +50,9 @@ using namespace std;
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_CIMS (3000)
SrsForwarder::SrsForwarder(SrsSource* s)
SrsForwarder::SrsForwarder(SrsOriginHub* h)
{
source = s;
hub = h;
req = NULL;
sh_video = sh_audio = NULL;
... ... @@ -250,7 +250,7 @@ int SrsForwarder::cycle()
return ret;
}
if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) {
if ((ret = hub->on_forwarder_start(this)) != ERROR_SUCCESS) {
srs_error("callback the source to feed the sequence header failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -42,6 +42,7 @@ class SrsRtmpJitter;
class SrsRtmpClient;
class SrsRequest;
class SrsSource;
class SrsOriginHub;
class SrsKbps;
class SrsSimpleRtmpClient;
... ... @@ -58,7 +59,7 @@ private:
private:
SrsReusableThread2* pthread;
private:
SrsSource* source;
SrsOriginHub* hub;
SrsSimpleRtmpClient* sdk;
SrsRtmpJitter* jitter;
SrsMessageQueue* queue;
... ... @@ -69,7 +70,7 @@ private:
SrsSharedPtrMessage* sh_audio;
SrsSharedPtrMessage* sh_video;
public:
SrsForwarder(SrsSource* _source);
SrsForwarder(SrsOriginHub* h);
virtual ~SrsForwarder();
public:
virtual int initialize(SrsRequest* r, std::string ep);
... ...
... ... @@ -1125,7 +1125,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
SrsHls::SrsHls()
{
req = NULL;
source = NULL;
hub = NULL;
hls_enabled = false;
hls_can_dispose = false;
... ... @@ -1197,11 +1197,11 @@ int SrsHls::cycle()
return ret;
}
int SrsHls::initialize(SrsSource* s, SrsRequest* r)
int SrsHls::initialize(SrsOriginHub* h, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
source = s;
hub = h;
req = r;
if ((ret = muxer->initialize()) != ERROR_SUCCESS) {
... ... @@ -1244,7 +1244,7 @@ int SrsHls::on_publish(bool fetch_sequence_header)
// notice the source to get the cached sequence header.
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
if ((ret = hub->on_hls_start()) != ERROR_SUCCESS) {
srs_error("callback source hls start failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -45,6 +45,7 @@ class SrsAvcAacCodec;
class SrsRequest;
class SrsPithyPrint;
class SrsSource;
class SrsOriginHub;
class SrsFileWriter;
class SrsSimpleStream;
class SrsTsAacJitter;
... ... @@ -362,7 +363,7 @@ private:
bool hls_can_dispose;
int64_t last_update_time;
private:
SrsSource* source;
SrsOriginHub* hub;
SrsAvcAacCodec* codec;
SrsCodecSample* sample;
SrsRtmpJitter* jitter;
... ... @@ -391,7 +392,7 @@ public:
/**
* initialize the hls by handler and source.
*/
virtual int initialize(SrsSource* s, SrsRequest* r);
virtual int initialize(SrsOriginHub* h, SrsRequest* r);
/**
* publish stream event, continue to write the m3u8,
* for the muxer object not destroyed.
... ...
... ... @@ -745,207 +745,29 @@ ISrsSourceHandler::~ISrsSourceHandler()
{
}
std::map<std::string, SrsSource*> SrsSource::pool;
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
*pps = source;
return ret;
}
SrsSource* SrsSource::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->req->update_auth(r);
return source;
}
void SrsSource::dispose_all()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
source->dispose();
}
return;
}
int SrsSource::cycle_all()
{
int ret = ERROR_SUCCESS;
int cid = _srs_context->get_id();
ret = do_cycle_all();
_srs_context->set_id(cid);
return ret;
}
int SrsSource::do_cycle_all()
{
int ret = ERROR_SUCCESS;
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((ret = source->cycle()) != ERROR_SUCCESS) {
return ret;
}
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
// @see https://github.com/ossrs/srs/issues/714
#if 0
// When source expired, remove it.
if (source->expired()) {
int cid = source->source_id();
if (cid == -1 && source->pre_source_id() > 0) {
cid = source->pre_source_id();
}
if (cid > 0) {
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
pool.erase(it++);
} else {
++it;
}
#else
++it;
#endif
}
return ret;
}
void SrsSource::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
SrsMixQueue::SrsMixQueue()
{
nb_videos = 0;
nb_audios = 0;
}
SrsMixQueue::~SrsMixQueue()
{
clear();
}
void SrsMixQueue::clear()
{
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = it->second;
srs_freep(msg);
}
msgs.clear();
nb_videos = 0;
nb_audios = 0;
}
void SrsMixQueue::push(SrsSharedPtrMessage* msg)
{
msgs.insert(std::make_pair(msg->timestamp, msg));
if (msg->is_video()) {
nb_videos++;
} else {
nb_audios++;
}
}
SrsSharedPtrMessage* SrsMixQueue::pop()
// TODO: FIXME: Remove it?
bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* msg)
{
bool mix_ok = false;
// pure video
if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) {
mix_ok = true;
}
// pure audio
if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) {
mix_ok = true;
}
// got 1 video and 1 audio, mix ok.
if (nb_videos >= 1 && nb_audios >= 1) {
mix_ok = true;
}
if (!mix_ok) {
return NULL;
// only continue for decode error.
if (ret != ERROR_HLS_DECODE_ERROR) {
return false;
}
// pop the first msg.
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it = msgs.begin();
SrsSharedPtrMessage* msg = it->second;
msgs.erase(it);
if (msg->is_video()) {
nb_videos--;
} else {
nb_audios--;
// when video size equals to sequence header,
// the video actually maybe a sequence header,
// continue to make ffmpeg happy.
if (sh && sh->size == msg->size) {
srs_warn("the msg is actually a sequence header, ignore this packet.");
return true;
}
return msg;
return false;
}
SrsSource::SrsSource()
SrsOriginHub::SrsOriginHub(SrsSource* s)
{
source = s;
req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_HLS
hls = new SrsHls();
... ... @@ -957,36 +779,17 @@ SrsSource::SrsSource()
encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
hds = new SrsHds(this);
hds = new SrsHds(s);
#endif
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = -1;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
aggregate_stream = new SrsBuffer();
ng_exec = new SrsNgExec();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
SrsSource::~SrsSource()
SrsOriginHub::~SrsOriginHub()
{
_srs_config->unsubscribe(this);
// never free the consumers,
// for all consumers are auto free.
consumers.clear();
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
... ... @@ -995,16 +798,6 @@ SrsSource::~SrsSource()
}
forwarders.clear();
}
srs_freep(mix_queue);
srs_freep(cache_metadata);
srs_freep(cache_sh_video);
srs_freep(cache_sh_audio);
srs_freep(play_edge);
srs_freep(publish_edge);
srs_freep(gop_cache);
srs_freep(aggregate_stream);
srs_freep(ng_exec);
#ifdef SRS_AUTO_HLS
... ... @@ -1019,208 +812,429 @@ SrsSource::~SrsSource()
#ifdef SRS_AUTO_HDS
srs_freep(hds);
#endif
srs_freep(req);
}
void SrsSource::dispose()
{
#ifdef SRS_AUTO_HLS
hls->dispose();
#endif
// cleaup the cached packets.
srs_freep(cache_metadata);
srs_freep(cache_sh_video);
srs_freep(cache_sh_audio);
// cleanup the gop cache.
gop_cache->dispose();
}
int SrsSource::cycle()
int SrsOriginHub::initialize(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
req = r;
#ifdef SRS_AUTO_HLS
if ((ret = hls->cycle()) != ERROR_SUCCESS) {
if ((ret = hls->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
bool SrsSource::expired()
{
// unknown state?
if (die_at == -1) {
return false;
}
// still publishing?
if (!_can_publish || !publish_edge->can_publish()) {
return false;
#ifdef SRS_AUTO_DVR
if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
#endif
// has any consumers?
if (!consumers.empty()) {
return false;
}
return ret;
}
void SrsOriginHub::dispose()
{
#ifdef SRS_AUTO_HLS
hls->dispose();
#endif
}
int SrsOriginHub::cycle()
{
int ret = ERROR_SUCCESS;
int64_t now = srs_get_system_time_ms();
if (now > die_at + SRS_SOURCE_CLEANUP) {
return true;
#ifdef SRS_AUTO_HLS
if ((ret = hls->cycle()) != ERROR_SUCCESS) {
return ret;
}
#endif
return false;
return ret;
}
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
int SrsOriginHub::on_original_metadata(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
srs_assert(h);
srs_assert(!req);
handler = h;
req = r->copy();
atc = _srs_config->get_atc(req->vhost);
#ifdef SRS_AUTO_HLS
if ((ret = hls->initialize(this, req)) != ERROR_SUCCESS) {
if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) {
srs_error("hls process onMetaData message failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) {
if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("dvr process onMetaData message failed. ret=%d", ret);
return ret;
}
#endif
if ((ret = play_edge->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = publish_edge->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
double queue_size = _srs_config->get_queue_length(req->vhost);
publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
mix_correct = _srs_config->get_mix_correct(req->vhost);
return ret;
}
int SrsSource::on_reload_vhost_play(string vhost)
int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(shared_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
}
}
// time_jitter
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
return ret;
}
int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
// mix_correct
if (true) {
bool v = _srs_config->get_mix_correct(req->vhost);
// when changed, clear the mix queue.
if (v != mix_correct) {
mix_queue->clear();
SrsSharedPtrMessage* msg = shared_audio;
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
hls->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(ret, source->meta->ash(), msg)) {
ret = ERROR_SUCCESS;
} else {
srs_warn("hls continue audio failed. ret=%d", ret);
return ret;
}
} else {
srs_warn("hls disconnect publisher for audio error. ret=%d", ret);
return ret;
}
mix_correct = v;
}
#endif
// atc changed.
if (true) {
bool v = _srs_config->get_atc(vhost);
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
if (v != atc) {
srs_warn("vhost %s atc changed to %d, connected client may corrupt.", vhost.c_str(), v);
gop_cache->clear();
}
atc = v;
// unpublish, ignore ret.
dvr->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
// gop cache changed.
if (true) {
bool v = _srs_config->get_gop_cache(vhost);
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret);
if (v != gop_cache->enabled()) {
string url = req->get_stream_url();
srs_trace("vhost %s gop_cache changed to %d, source url=%s", vhost.c_str(), v, url.c_str());
gop_cache->set(v);
}
// unpublish, ignore ret.
hds->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
// queue length
// copy to all forwarders.
if (true) {
double v = _srs_config->get_queue_length(req->vhost);
if (true) {
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
consumer->set_queue_size(v);
}
srs_trace("consumers reload queue size success.");
}
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->set_queue_size(v);
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret);
return ret;
}
srs_trace("forwarders reload queue size success.");
}
if (true) {
publish_edge->set_queue_size(v);
srs_trace("publish_edge reload queue size success.");
}
}
return ret;
}
int SrsSource::on_reload_vhost_forward(string vhost)
int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
SrsSharedPtrMessage* msg = shared_video;
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
hls->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(ret, source->meta->vsh(), msg)) {
ret = ERROR_SUCCESS;
} else {
srs_warn("hls continue video failed. ret=%d", ret);
return ret;
}
} else {
srs_warn("hls disconnect publisher for video error. ret=%d", ret);
return ret;
}
}
#endif
// TODO: FIXME: maybe should ignore when publish already stopped?
// forwarders
destroy_forwarders();
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
srs_error("create forwarders failed. ret=%d", ret);
return ret;
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {
srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
dvr->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {
srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
hds->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
// copy to all forwarders.
if (!forwarders.empty()) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {
srs_error("forwarder process video message failed. ret=%d", ret);
return ret;
}
}
}
srs_trace("vhost %s forwarders reload success", vhost.c_str());
return ret;
}
int SrsSource::on_reload_vhost_hls(string vhost)
int SrsOriginHub::on_publish()
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
// create forwarders
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
srs_error("create forwarders failed. ret=%d", ret);
return ret;
}
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_TRANSCODE
if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start encoder failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_publish(false)) != ERROR_SUCCESS) {
srs_error("start hls failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_publish(false)) != ERROR_SUCCESS) {
srs_error("start dvr failed. ret=%d", ret);
return ret;
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start hds failed. ret=%d", ret);
return ret;
}
#endif
// TODO: FIXME: use initialize to set req.
if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start exec failed. ret=%d", ret);
return ret;
}
return ret;
}
void SrsOriginHub::on_unpublish()
{
// destroy all forwarders
destroy_forwarders();
#ifdef SRS_AUTO_TRANSCODE
encoder->on_unpublish();
#endif
#ifdef SRS_AUTO_HLS
hls->on_unpublish();
#endif
#ifdef SRS_AUTO_DVR
dvr->on_unpublish();
#endif
#ifdef SRS_AUTO_HDS
hds->on_unpublish();
#endif
ng_exec->on_unpublish();
}
int SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder)
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
// feed the forwarder the metadata/sequence header,
// when reload to enable the forwarder.
if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("forwarder process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("forwarder process audio sequence header message failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsOriginHub::on_hls_start()
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
#ifdef SRS_AUTO_HLS
// feed the hls the metadata/sequence header,
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// TODO: maybe need to decode the metadata?
if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) {
srs_error("hls process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("hls process audio sequence header message failed. ret=%d", ret);
return ret;
}
#endif
return ret;
}
int SrsOriginHub::on_dvr_request_sh()
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
#ifdef SRS_AUTO_DVR
// feed the dvr the metadata/sequence header,
// when reload to start dvr, dvr will never get the sequence header in stream,
// use the SrsSource.on_dvr_request_sh to push the sequence header to DVR.
if (cache_metadata) {
char* payload = cache_metadata->payload;
int size = cache_metadata->size;
SrsBuffer stream;
if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
srs_error("dvr decode metadata stream failed. ret=%d", ret);
return ret;
}
SrsOnMetaDataPacket pkt;
if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) {
srs_error("dvr decode metadata packet failed.");
return ret;
}
if ((ret = dvr->on_meta_data(&pkt)) != ERROR_SUCCESS) {
srs_error("dvr process onMetaData message failed. ret=%d", ret);
return ret;
}
}
if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("dvr process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("dvr process audio sequence header message failed. ret=%d", ret);
return ret;
}
#endif
return ret;
}
int SrsOriginHub::on_reload_vhost_forward(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
// forwarders
destroy_forwarders();
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
srs_error("create forwarders failed. ret=%d", ret);
return ret;
}
srs_trace("vhost %s forwarders reload success", vhost.c_str());
return ret;
}
int SrsOriginHub::on_reload_vhost_hls(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
... ... @@ -1238,16 +1252,16 @@ int SrsSource::on_reload_vhost_hls(string vhost)
return ret;
}
int SrsSource::on_reload_vhost_hds(string vhost)
int SrsOriginHub::on_reload_vhost_hds(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
#ifdef SRS_AUTO_HDS
hds->on_unpublish();
if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) {
... ... @@ -1256,11 +1270,11 @@ int SrsSource::on_reload_vhost_hds(string vhost)
}
srs_trace("vhost %s hds reload success", vhost.c_str());
#endif
return ret;
}
int SrsSource::on_reload_vhost_dvr(string vhost)
int SrsOriginHub::on_reload_vhost_dvr(string vhost)
{
int ret = ERROR_SUCCESS;
... ... @@ -1273,12 +1287,12 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
#ifdef SRS_AUTO_DVR
// cleanup dvr
dvr->on_unpublish();
// reinitialize the dvr, update plan.
if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
// start to publish by new plan.
if ((ret = dvr->on_publish(true)) != ERROR_SUCCESS) {
srs_error("dvr publish failed. ret=%d", ret);
... ... @@ -1291,131 +1305,624 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
return ret;
}
int SrsSource::on_reload_vhost_transcode(string vhost)
int SrsOriginHub::on_reload_vhost_transcode(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
#ifdef SRS_AUTO_TRANSCODE
encoder->on_unpublish();
if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start encoder failed. ret=%d", ret);
return ret;
}
srs_trace("vhost %s transcode reload success", vhost.c_str());
#endif
return ret;
}
int SrsOriginHub::on_reload_vhost_exec(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
ng_exec->on_unpublish();
if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start exec failed. ret=%d", ret);
return ret;
}
srs_trace("vhost %s exec reload success", vhost.c_str());
return ret;
}
int SrsOriginHub::create_forwarders()
{
int ret = ERROR_SUCCESS;
if (!_srs_config->get_forward_enabled(req->vhost)) {
return ret;
}
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);
// initialize the forwarder with request.
if ((ret = forwarder->initialize(req, forward_server)) != ERROR_SUCCESS) {
return ret;
}
// TODO: FIXME: support queue size.
//double queue_size = _srs_config->get_queue_length(req->vhost);
//forwarder->set_queue_size(queue_size);
if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
srs_error("start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),
forward_server.c_str());
return ret;
}
}
return ret;
}
void SrsOriginHub::destroy_forwarders()
{
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->on_unpublish();
srs_freep(forwarder);
}
forwarders.clear();
}
SrsMetaCache::SrsMetaCache()
{
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
}
SrsMetaCache::~SrsMetaCache()
{
dispose();
}
void SrsMetaCache::dispose()
{
srs_freep(cache_metadata);
srs_freep(cache_sh_video);
srs_freep(cache_sh_audio);
}
SrsSharedPtrMessage* SrsMetaCache::data()
{
return cache_metadata;
}
SrsSharedPtrMessage* SrsMetaCache::vsh()
{
return cache_sh_video;
}
SrsSharedPtrMessage* SrsMetaCache::ash()
{
return cache_sh_audio;
}
int SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{
int ret = ERROR_SUCCESS;
// copy metadata.
if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, ag)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret);
return ret;
}
srs_info("dispatch metadata success");
// copy sequence header
// copy audio sequence first, for hls to fast parse the "right" audio codec.
// @see https://github.com/ossrs/srs/issues/301
if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, ag)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch audio sequence header success");
if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, ag)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch video sequence header success");
return ret;
}
int SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated)
{
updated = false;
int ret = ERROR_SUCCESS;
SrsAmf0Any* prop = NULL;
// when exists the duration, remove it to make ExoPlayer happy.
if (metadata->metadata->get_property("duration") != NULL) {
metadata->metadata->remove("duration");
}
// generate metadata info to print
std::stringstream ss;
if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
ss << ", width=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
ss << ", height=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
ss << ", vcodec=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
ss << ", acodec=" << (int)prop->to_number();
}
srs_trace("got metadata%s", ss.str().c_str());
// add server info to metadata
metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// version, for example, 1.0.0
// add version to metadata, please donot remove it, for debug.
metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// encode the metadata to payload
int size = 0;
char* payload = NULL;
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
srs_error("encode metadata error. ret=%d", ret);
srs_freep(payload);
return ret;
}
srs_verbose("encode metadata success.");
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
return ret;
}
// create a shared ptr message.
srs_freep(cache_metadata);
cache_metadata = new SrsSharedPtrMessage();
updated = true;
// dump message to shared ptr message.
// the payload/size managed by cache_metadata, user should not free it.
if ((ret = cache_metadata->create(header, payload, size)) != ERROR_SUCCESS) {
srs_error("initialize the cache metadata failed. ret=%d", ret);
return ret;
}
srs_verbose("initialize shared ptr metadata success.");
return ret;
}
void SrsMetaCache::update_ash(SrsSharedPtrMessage* msg)
{
srs_freep(cache_sh_audio);
cache_sh_audio = msg->copy();
}
void SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg)
{
srs_freep(cache_sh_video);
cache_sh_video = msg->copy();
}
std::map<std::string, SrsSource*> SrsSource::pool;
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
*pps = source;
return ret;
}
SrsSource* SrsSource::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->req->update_auth(r);
return source;
}
void SrsSource::dispose_all()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
source->dispose();
}
return;
}
int SrsSource::cycle_all()
{
int ret = ERROR_SUCCESS;
int cid = _srs_context->get_id();
ret = do_cycle_all();
_srs_context->set_id(cid);
return ret;
}
int SrsSource::do_cycle_all()
{
int ret = ERROR_SUCCESS;
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((ret = source->cycle()) != ERROR_SUCCESS) {
return ret;
}
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
// @see https://github.com/ossrs/srs/issues/714
#if 0
// When source expired, remove it.
if (source->expired()) {
int cid = source->source_id();
if (cid == -1 && source->pre_source_id() > 0) {
cid = source->pre_source_id();
}
if (cid > 0) {
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
pool.erase(it++);
} else {
++it;
}
#else
++it;
#endif
}
return ret;
}
void SrsSource::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
SrsMixQueue::SrsMixQueue()
{
nb_videos = 0;
nb_audios = 0;
}
SrsMixQueue::~SrsMixQueue()
{
clear();
}
void SrsMixQueue::clear()
{
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = it->second;
srs_freep(msg);
}
msgs.clear();
nb_videos = 0;
nb_audios = 0;
}
void SrsMixQueue::push(SrsSharedPtrMessage* msg)
{
msgs.insert(std::make_pair(msg->timestamp, msg));
if (msg->is_video()) {
nb_videos++;
} else {
nb_audios++;
}
}
SrsSharedPtrMessage* SrsMixQueue::pop()
{
bool mix_ok = false;
// pure video
if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) {
mix_ok = true;
}
// pure audio
if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) {
mix_ok = true;
}
// got 1 video and 1 audio, mix ok.
if (nb_videos >= 1 && nb_audios >= 1) {
mix_ok = true;
}
if (!mix_ok) {
return NULL;
}
// pop the first msg.
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it = msgs.begin();
SrsSharedPtrMessage* msg = it->second;
msgs.erase(it);
if (msg->is_video()) {
nb_videos--;
} else {
nb_audios--;
}
return msg;
}
SrsSource::SrsSource()
{
req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = -1;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
aggregate_stream = new SrsBuffer();
hub = new SrsOriginHub(this);
meta = new SrsMetaCache();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
SrsSource::~SrsSource()
{
_srs_config->unsubscribe(this);
// never free the consumers,
// for all consumers are auto free.
consumers.clear();
srs_freep(hub);
srs_freep(meta);
srs_freep(mix_queue);
srs_freep(play_edge);
srs_freep(publish_edge);
srs_freep(gop_cache);
srs_freep(aggregate_stream);
srs_freep(req);
}
void SrsSource::dispose()
{
hub->dispose();
meta->dispose();
gop_cache->dispose();
}
int SrsSource::cycle()
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
#ifdef SRS_AUTO_TRANSCODE
encoder->on_unpublish();
if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start encoder failed. ret=%d", ret);
return ret;
}
srs_trace("vhost %s transcode reload success", vhost.c_str());
#endif
return ret;
return hub->cycle();
}
int SrsSource::on_reload_vhost_exec(string vhost)
bool SrsSource::expired()
{
int ret = ERROR_SUCCESS;
// unknown state?
if (die_at == -1) {
return false;
}
if (req->vhost != vhost) {
return ret;
// still publishing?
if (!_can_publish || !publish_edge->can_publish()) {
return false;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
// has any consumers?
if (!consumers.empty()) {
return false;
}
ng_exec->on_unpublish();
if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start exec failed. ret=%d", ret);
return ret;
int64_t now = srs_get_system_time_ms();
if (now > die_at + SRS_SOURCE_CLEANUP) {
return true;
}
srs_trace("vhost %s exec reload success", vhost.c_str());
return ret;
return false;
}
int SrsSource::on_forwarder_start(SrsForwarder* forwarder)
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
int ret = ERROR_SUCCESS;
// feed the forwarder the metadata/sequence header,
// when reload to enable the forwarder.
if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
srs_assert(h);
srs_assert(!req);
handler = h;
req = r->copy();
atc = _srs_config->get_atc(req->vhost);
if ((ret = hub->initialize(req)) != ERROR_SUCCESS) {
return ret;
}
if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("forwarder process video sequence header message failed. ret=%d", ret);
if ((ret = play_edge->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("forwarder process audio sequence header message failed. ret=%d", ret);
if ((ret = publish_edge->initialize(this, req)) != ERROR_SUCCESS) {
return ret;
}
double queue_size = _srs_config->get_queue_length(req->vhost);
publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
mix_correct = _srs_config->get_mix_correct(req->vhost);
return ret;
}
int SrsSource::on_hls_start()
int SrsSource::on_reload_vhost_play(string vhost)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HLS
// feed the hls the metadata/sequence header,
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// TODO: maybe need to decode the metadata?
if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) {
srs_error("hls process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("hls process audio sequence header message failed. ret=%d", ret);
if (req->vhost != vhost) {
return ret;
}
#endif
return ret;
}
int SrsSource::on_dvr_request_sh()
{
int ret = ERROR_SUCCESS;
// time_jitter
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
#ifdef SRS_AUTO_DVR
// feed the dvr the metadata/sequence header,
// when reload to start dvr, dvr will never get the sequence header in stream,
// use the SrsSource.on_dvr_request_sh to push the sequence header to DVR.
if (cache_metadata) {
char* payload = cache_metadata->payload;
int size = cache_metadata->size;
// mix_correct
if (true) {
bool v = _srs_config->get_mix_correct(req->vhost);
SrsBuffer stream;
if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
srs_error("dvr decode metadata stream failed. ret=%d", ret);
return ret;
// when changed, clear the mix queue.
if (v != mix_correct) {
mix_queue->clear();
}
mix_correct = v;
}
// atc changed.
if (true) {
bool v = _srs_config->get_atc(vhost);
SrsOnMetaDataPacket pkt;
if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) {
srs_error("dvr decode metadata packet failed.");
return ret;
if (v != atc) {
srs_warn("vhost %s atc changed to %d, connected client may corrupt.", vhost.c_str(), v);
gop_cache->clear();
}
atc = v;
}
// gop cache changed.
if (true) {
bool v = _srs_config->get_gop_cache(vhost);
if ((ret = dvr->on_meta_data(&pkt)) != ERROR_SUCCESS) {
srs_error("dvr process onMetaData message failed. ret=%d", ret);
return ret;
if (v != gop_cache->enabled()) {
string url = req->get_stream_url();
srs_trace("vhost %s gop_cache changed to %d, source url=%s", vhost.c_str(), v, url.c_str());
gop_cache->set(v);
}
}
if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("dvr process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("dvr process audio sequence header message failed. ret=%d", ret);
return ret;
}
// queue length
if (true) {
double v = _srs_config->get_queue_length(req->vhost);
if (true) {
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
consumer->set_queue_size(v);
}
srs_trace("consumers reload queue size success.");
}
// TODO: FIXME: https://github.com/ossrs/srs/issues/742#issuecomment-273656897
// TODO: FIXME: support queue size.
#if 0
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->set_queue_size(v);
}
srs_trace("forwarders reload queue size success.");
}
if (true) {
publish_edge->set_queue_size(v);
srs_trace("publish_edge reload queue size success.");
}
#endif
}
return ret;
}
... ... @@ -1469,53 +1976,13 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HLS
if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) {
srs_error("hls process onMetaData message failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("dvr process onMetaData message failed. ret=%d", ret);
// Notify hub about the original metadata.
if ((ret = hub->on_original_metadata(metadata)) != ERROR_SUCCESS) {
return ret;
}
#endif
SrsAmf0Any* prop = NULL;
// when exists the duration, remove it to make ExoPlayer happy.
if (metadata->metadata->get_property("duration") != NULL) {
metadata->metadata->remove("duration");
}
// generate metadata info to print
std::stringstream ss;
if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
ss << ", width=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
ss << ", height=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
ss << ", vcodec=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
ss << ", acodec=" << (int)prop->to_number();
}
srs_trace("got metadata%s", ss.str().c_str());
// add server info to metadata
metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// version, for example, 1.0.0
// add version to metadata, please donot remove it, for debug.
metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// if allow atc_auto and bravo-atc detected, open atc for vhost.
SrsAmf0Any* prop = NULL;
atc = _srs_config->get_atc(req->vhost);
if (_srs_config->get_atc_auto(req->vhost)) {
if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
... ... @@ -1525,65 +1992,36 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
}
}
// encode the metadata to payload
int size = 0;
char* payload = NULL;
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
srs_error("encode metadata error. ret=%d", ret);
srs_freep(payload);
// Update the meta cache.
bool updated = false;
if ((ret = meta->update_data(&msg->header, metadata, updated)) != ERROR_SUCCESS) {
return ret;
}
srs_verbose("encode metadata success.");
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
if (!updated) {
return ret;
}
// when already got metadata, drop when reduce sequence header.
bool drop_for_reduce = false;
if (cache_metadata && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
}
// create a shared ptr message.
srs_freep(cache_metadata);
cache_metadata = new SrsSharedPtrMessage();
// dump message to shared ptr message.
// the payload/size managed by cache_metadata, user should not free it.
if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) {
srs_error("initialize the cache metadata failed. ret=%d", ret);
return ret;
}
srs_verbose("initialize shared ptr metadata success.");
// copy to all consumer
if (!drop_for_reduce) {
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
if ((ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret);
return ret;
}
}
}
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
if ((ret = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret);
return ret;
}
}
}
return ret;
// Copy to hub to all utilities.
return hub->on_meta_data(meta->data());
}
int SrsSource::on_audio(SrsCommonMessage* shared_audio)
... ... @@ -1633,24 +2071,6 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
return ret;
}
bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* msg)
{
// only continue for decode error.
if (ret != ERROR_HLS_DECODE_ERROR) {
return false;
}
// when video size equals to sequence header,
// the video actually maybe a sequence header,
// continue to make ffmpeg happy.
if (sh && sh->size == msg->size) {
srs_warn("the msg is actually a sequence header, ignore this packet.");
return true;
}
return false;
}
int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
... ... @@ -1661,9 +2081,9 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_audio && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (cache_sh_audio->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_audio->payload, msg->payload, msg->size);
if (is_sequence_header && meta->ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->ash()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->ash()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
... ... @@ -1697,56 +2117,6 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
flv_sample_rates[sample.sound_rate]);
}
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
hls->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(ret, cache_sh_audio, msg)) {
ret = ERROR_SUCCESS;
} else {
srs_warn("hls continue audio failed. ret=%d", ret);
return ret;
}
} else {
srs_warn("hls disconnect publisher for audio error. ret=%d", ret);
return ret;
}
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
dvr->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
hds->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
... ... @@ -1759,24 +2129,16 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
srs_info("dispatch audio success.");
}
// copy to all forwarders.
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret);
return ret;
}
}
// Copy to hub to all utilities.
if ((ret = hub->on_audio(msg)) != ERROR_SUCCESS) {
return ret;
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
// TODO: FIXME: to refine the stream info system.
if (is_aac_sequence_header || !cache_sh_audio) {
srs_freep(cache_sh_audio);
cache_sh_audio = msg->copy();
if (is_aac_sequence_header || !meta->ash()) {
meta->update_ash(msg);
}
// when sequence header, donot push to gop cache and adjust the timestamp.
... ... @@ -1793,11 +2155,11 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
// if atc, update the sequence header to abs time.
if (atc) {
if (cache_sh_audio) {
cache_sh_audio->timestamp = msg->timestamp;
if (meta->ash()) {
meta->ash()->timestamp = msg->timestamp;
}
if (cache_metadata) {
cache_metadata->timestamp = msg->timestamp;
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
... ... @@ -1874,9 +2236,9 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (cache_sh_video->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);
if (is_sequence_header && meta->vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->vsh()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->vsh()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
... ... @@ -1884,8 +2246,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
if (is_sequence_header) {
srs_freep(cache_sh_video);
cache_sh_video = msg->copy();
meta->update_vsh(msg);
// parse detail audio codec
SrsAvcAacCodec codec;
... ... @@ -1913,55 +2274,10 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
}
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
hls->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(ret, cache_sh_video, msg)) {
ret = ERROR_SUCCESS;
} else {
srs_warn("hls continue video failed. ret=%d", ret);
return ret;
}
} else {
srs_warn("hls disconnect publisher for video error. ret=%d", ret);
return ret;
}
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {
srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
dvr->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {
srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
hds->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
// Copy to hub to all utilities.
if ((ret = hub->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
return ret;
}
#endif
// copy to all consumer
if (!drop_for_reduce) {
... ... @@ -1974,18 +2290,6 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
}
srs_info("dispatch video success.");
}
// copy to all forwarders.
if (!forwarders.empty()) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {
srs_error("forwarder process video message failed. ret=%d", ret);
return ret;
}
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
... ... @@ -2001,11 +2305,11 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// if atc, update the sequence header to abs time.
if (atc) {
if (cache_sh_video) {
cache_sh_video->timestamp = msg->timestamp;
if (meta->vsh()) {
meta->vsh()->timestamp = msg->timestamp;
}
if (cache_metadata) {
cache_metadata->timestamp = msg->timestamp;
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
... ... @@ -2142,45 +2446,8 @@ int SrsSource::on_publish()
is_monotonically_increase = true;
last_packet_time = 0;
// create forwarders
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
srs_error("create forwarders failed. ret=%d", ret);
return ret;
}
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_TRANSCODE
if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start encoder failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_publish(false)) != ERROR_SUCCESS) {
srs_error("start hls failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_publish(false)) != ERROR_SUCCESS) {
srs_error("start dvr failed. ret=%d", ret);
return ret;
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start hds failed. ret=%d", ret);
return ret;
}
#endif
// TODO: FIXME: use initialize to set req.
if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) {
srs_error("start exec failed. ret=%d", ret);
// Notify the hub about the publish event.
if ((ret = hub->on_publish()) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -2203,26 +2470,8 @@ void SrsSource::on_unpublish()
return;
}
// destroy all forwarders
destroy_forwarders();
#ifdef SRS_AUTO_TRANSCODE
encoder->on_unpublish();
#endif
#ifdef SRS_AUTO_HLS
hls->on_unpublish();
#endif
#ifdef SRS_AUTO_DVR
dvr->on_unpublish();
#endif
#ifdef SRS_AUTO_HDS
hds->on_unpublish();
#endif
ng_exec->on_unpublish();
// Notify the hub about the unpublish event.
hub->on_unpublish();
// only clear the gop cache,
// donot clear the sequence header, for it maybe not changed,
... ... @@ -2259,38 +2508,21 @@ int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (cache_metadata) {
cache_metadata->timestamp = gop_cache->start_time();
if (meta->data()) {
meta->data()->timestamp = gop_cache->start_time();
}
if (cache_sh_video) {
cache_sh_video->timestamp = gop_cache->start_time();
if (meta->vsh()) {
meta->vsh()->timestamp = gop_cache->start_time();
}
if (cache_sh_audio) {
cache_sh_audio->timestamp = gop_cache->start_time();
if (meta->ash()) {
meta->ash()->timestamp = gop_cache->start_time();
}
}
// copy metadata.
if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret);
return ret;
}
srs_info("dispatch metadata success");
// copy sequence header
// copy audio sequence first, for hls to fast parse the "right" audio codec.
// @see https://github.com/ossrs/srs/issues/301
if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch audio sequence header success");
if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
// Copy metadata and sequence header to consumer.
if ((ret = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != ERROR_SUCCESS) {
return ret;
}
srs_info("dispatch video sequence header success");
// copy gop cache to client.
if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
... ... @@ -2356,52 +2588,6 @@ void SrsSource::on_edge_proxy_unpublish()
publish_edge->on_proxy_unpublish();
}
int SrsSource::create_forwarders()
{
int ret = ERROR_SUCCESS;
if (!_srs_config->get_forward_enabled(req->vhost)) {
return ret;
}
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);
// initialize the forwarder with request.
if ((ret = forwarder->initialize(req, forward_server)) != ERROR_SUCCESS) {
return ret;
}
double queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);
if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
srs_error("start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),
forward_server.c_str());
return ret;
}
}
return ret;
}
void SrsSource::destroy_forwarders()
{
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->on_unpublish();
srs_freep(forwarder);
}
forwarders.clear();
}
string SrsSource::get_curr_origin()
{
return play_edge->get_curr_origin();
... ...
... ... @@ -53,6 +53,7 @@ class SrsEdgeProxyContext;
class SrsMessageArray;
class SrsNgExec;
class SrsConnection;
class SrsMessageHeader;
#ifdef SRS_AUTO_HLS
class SrsHls;
#endif
... ... @@ -411,10 +412,128 @@ public:
};
/**
* The hub for origin is a collection of utilities for origin only,
* for example, DVR, HLS, Forward and Transcode are only available for origin,
* they are meanless for edge server.
*/
class SrsOriginHub : public ISrsReloadHandler
{
private:
SrsSource* source;
SrsRequest* req;
private:
// hls handler.
#ifdef SRS_AUTO_HLS
SrsHls* hls;
#endif
// dvr handler.
#ifdef SRS_AUTO_DVR
SrsDvr* dvr;
#endif
// transcoding handler.
#ifdef SRS_AUTO_TRANSCODE
SrsEncoder* encoder;
#endif
#ifdef SRS_AUTO_HDS
// adobe hds(http dynamic streaming).
SrsHds *hds;
#endif
// nginx-rtmp exec feature.
SrsNgExec* ng_exec;
// to forward stream to other servers
std::vector<SrsForwarder*> forwarders;
public:
SrsOriginHub(SrsSource* s);
virtual ~SrsOriginHub();
public:
// Initialize the hub with source and request.
// @param r The request object, managed by source.
virtual int initialize(SrsRequest* r);
// Dispose the hub, release utilities resource,
// for example, delete all HLS pieces.
virtual void dispose();
// Cycle the hub, process some regular events,
// for example, dispose hls in cycle.
virtual int cycle();
public:
// When got a original metadata.
virtual int on_original_metadata(SrsOnMetaDataPacket* metadata);
// When got a parsed metadata.
virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata);
// When got a parsed audio packet.
virtual int on_audio(SrsSharedPtrMessage* shared_audio);
// When got a parsed video packet.
virtual int on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header);
public:
// When start publish stream.
virtual int on_publish();
// When stop publish stream.
virtual void on_unpublish();
// for the tools callback
public:
// for the SrsForwarder to callback to request the sequence headers.
virtual int on_forwarder_start(SrsForwarder* forwarder);
// for the SrsHls to callback to request the sequence headers.
virtual int on_hls_start();
// for the SrsDvr to callback to request the sequence headers.
virtual int on_dvr_request_sh();
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_forward(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
virtual int on_reload_vhost_hds(std::string vhost);
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_vhost_exec(std::string vhost);
private:
virtual int create_forwarders();
virtual void destroy_forwarders();
};
/**
* Each stream have optional meta(sps/pps in sequence header and metadata).
* This class cache and update the meta.
*/
class SrsMetaCache
{
private:
SrsSharedPtrMessage* cache_metadata;
// the cached video sequence header.
SrsSharedPtrMessage* cache_sh_video;
// the cached audio sequence header.
SrsSharedPtrMessage* cache_sh_audio;
public:
SrsMetaCache();
virtual ~SrsMetaCache();
public:
// Dispose the metadata cache.
virtual void dispose();
public:
// Get the cached metadata.
virtual SrsSharedPtrMessage* data();
// Get the cached vsh(video sequence header).
virtual SrsSharedPtrMessage* vsh();
// Get the cached ash(audio sequence header).
virtual SrsSharedPtrMessage* ash();
// Dumps cached metadata to consumer.
// @param dm Whether dumps the metadata.
// @param ds Whether dumps the sequence header.
virtual int dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
public:
// Update the cached metadata by packet.
virtual int update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated);
// Update the cached audio sequence header.
virtual void update_ash(SrsSharedPtrMessage* msg);
// Update the cached video sequence header.
virtual void update_vsh(SrsSharedPtrMessage* msg);
};
/**
* live streaming source.
*/
class SrsSource : public ISrsReloadHandler
{
friend class SrsOriginHub;
private:
static std::map<std::string, SrsSource*> pool;
public:
... ... @@ -475,35 +594,19 @@ private:
bool is_monotonically_increase;
// the time of the packet we just got.
int64_t last_packet_time;
// hls handler.
#ifdef SRS_AUTO_HLS
SrsHls* hls;
#endif
// dvr handler.
#ifdef SRS_AUTO_DVR
SrsDvr* dvr;
#endif
// transcoding handler.
#ifdef SRS_AUTO_TRANSCODE
SrsEncoder* encoder;
#endif
#ifdef SRS_AUTO_HDS
// adobe hds(http dynamic streaming).
SrsHds *hds;
#endif
// nginx-rtmp exec feature.
SrsNgExec* ng_exec;
// for aggregate message
SrsBuffer* aggregate_stream;
// the event handler.
ISrsSourceHandler* handler;
// edge control service
SrsPlayEdge* play_edge;
SrsPublishEdge* publish_edge;
// gop cache for client fast startup.
SrsGopCache* gop_cache;
// to forward stream to other servers
std::vector<SrsForwarder*> forwarders;
// for aggregate message
SrsBuffer* aggregate_stream;
// the event handler.
ISrsSourceHandler* handler;
// The hub for origin server.
SrsOriginHub* hub;
// The metadata cache.
SrsMetaCache* meta;
private:
/**
* can publish, true when is not streaming
... ... @@ -512,12 +615,6 @@ private:
// last die time, when all consumers quit and no publisher,
// we will remove the source when source die.
int64_t die_at;
private:
SrsSharedPtrMessage* cache_metadata;
// the cached video sequence header.
SrsSharedPtrMessage* cache_sh_video;
// the cached audio sequence header.
SrsSharedPtrMessage* cache_sh_audio;
public:
SrsSource();
virtual ~SrsSource();
... ... @@ -535,20 +632,8 @@ public:
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_play(std::string vhost);
virtual int on_reload_vhost_forward(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
virtual int on_reload_vhost_hds(std::string vhost);
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_vhost_exec(std::string vhost);
// for the tools callback
public:
// for the SrsForwarder to callback to request the sequence headers.
virtual int on_forwarder_start(SrsForwarder* forwarder);
// for the SrsHls to callback to request the sequence headers.
virtual int on_hls_start();
// for the SrsDvr to callback to request the sequence headers.
virtual int on_dvr_request_sh();
// source id changed.
virtual int on_source_id_changed(int id);
// get current source id.
... ... @@ -599,9 +684,6 @@ public:
virtual int on_edge_proxy_publish(SrsCommonMessage* msg);
// for edge, proxy stop publish
virtual void on_edge_proxy_unpublish();
private:
virtual int create_forwarders();
virtual void destroy_forwarders();
public:
virtual std::string get_curr_origin();
};
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
#define VERSION_REVISION 15
#define VERSION_REVISION 16
// generated by configure, only macros.
#include <srs_auto_headers.hpp>
... ...