zhengfl

Merge branch '2.0release' of github.com:simple-rtmp-server/srs into 2.0release

... ... @@ -852,6 +852,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
return ret;
}
}
// http_remux, only one per vhost.
if (get_vhost_http_remux_enabled(vhost)) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload http_remux success.", vhost.c_str());
}
srs_trace("reload new vhost %s success.", vhost.c_str());
continue;
}
... ... @@ -1060,7 +1072,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
if (!srs_directive_equals(new_vhost->get("http_remux"), old_vhost->get("http_remux"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_http_remux_updated()) != ERROR_SUCCESS) {
if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret);
return ret;
}
... ... @@ -1077,7 +1089,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
}
continue;
}
srs_trace("igreno reload vhost, enabled old: %d, new: %d",
srs_trace("ignore reload vhost, enabled old: %d, new: %d",
get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost));
}
... ...
... ... @@ -508,8 +508,9 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
#ifdef SRS_PERF_FAST_FLV_ENCODER
SrsFastFlvStreamEncoder* ffe = dynamic_cast<SrsFastFlvStreamEncoder*>(enc);
#endif
while (true) {
// TODO: free and erase the disabled entry after all related connections is closed.
while (entry->enabled) {
pprint->elapse();
// get messages from consumer.
... ... @@ -593,6 +594,9 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h)
stream = NULL;
cache = NULL;
req = NULL;
source = NULL;
std::string ext;
size_t pos = string::npos;
... ... @@ -605,6 +609,11 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h)
_is_aac = (ext == ".aac");
}
void SrsLiveEntry::reset_hstrs(bool h)
{
hstrs = h;
}
bool SrsLiveEntry::is_flv()
{
return _is_flv;
... ... @@ -699,16 +708,19 @@ SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr)
server = svr;
mux.hijack(this);
_srs_config->subscribe(this);
}
SrsHttpStreamServer::~SrsHttpStreamServer()
{
mux.unhijack(this);
_srs_config->unsubscribe(this);
if (true) {
std::map<std::string, SrsLiveEntry*>::iterator it;
for (it = tflvs.begin(); it != tflvs.end(); ++it) {
SrsLiveEntry* entry = it->second;
srs_freep(entry->req);
srs_freep(entry);
}
tflvs.clear();
... ... @@ -771,23 +783,31 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
srs_info("ignore mount flv stream for disabled");
return ret;
}
SrsLiveEntry* tmpl = tflvs[r->vhost];
std::string mount = tmpl->mount;
// replace the vhost variable
mount = srs_string_replace(mount, "[vhost]", r->vhost);
mount = srs_string_replace(mount, "[app]", r->app);
mount = srs_string_replace(mount, "[stream]", r->stream);
// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
// TODO: FIXME: check match
if (mount.at(0) != '/') {
mount = "/" + mount;
}
entry = new SrsLiveEntry(mount, tmpl->hstrs);
entry->cache = new SrsStreamCache(s, r);
entry->stream = new SrsLiveStream(s, r, entry->cache);
srs_assert(!tmpl->req);
tmpl->source = s;
tmpl->req = r->copy();
sflvs[sid] = entry;
... ... @@ -809,8 +829,7 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
} else {
entry = sflvs[sid];
}
// TODO: FIXME: supports reload.
if (entry->stream) {
entry->stream->entry->enabled = true;
return ret;
... ... @@ -822,7 +841,7 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r)
{
std::string sid = r->get_stream_url();
if (sflvs.find(sid) == sflvs.end()) {
srs_info("ignore unmount flv stream for disabled");
return;
... ... @@ -832,17 +851,80 @@ void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r)
entry->stream->entry->enabled = false;
}
int SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost)
{
int ret = ERROR_SUCCESS;
if (tflvs.find(vhost) == tflvs.end()) {
if ((ret = initialize_flv_entry(vhost)) != ERROR_SUCCESS) {
return ret;
}
// http mount need SrsRequest and SrsSource param, only create a mapping template entry
// and do mount automatically on playing http flv if this stream is a new http_remux stream.
return ret;
}
SrsLiveEntry* tmpl = tflvs[vhost];
SrsRequest* req = tmpl->req;
SrsSource* source = tmpl->source;
if (source && req) {
// cleanup the exists http remux.
http_unmount(source, req);
}
if (!_srs_config->get_vhost_http_remux_enabled(vhost)) {
return ret;
}
string old_tmpl_mount = tmpl->mount;
string new_tmpl_mount = _srs_config->get_vhost_http_remux_mount(vhost);
bool hstrs = _srs_config->get_vhost_http_remux_hstrs(vhost);
tmpl->reset_hstrs(hstrs);
/**
* TODO: not support to reload different mount url for the time being.
* if the mount is change, need more logical thing to deal with.
* such as erase stream from sflvs and free all related resource.
*/
srs_assert(old_tmpl_mount == new_tmpl_mount);
// do http mount directly with SrsRequest and SrsSource if stream is played already.
if (req) {
std::string sid = req->get_stream_url();
if (sflvs.find(sid) != sflvs.end()) {
SrsLiveEntry* stream = sflvs[sid];
stream->reset_hstrs(hstrs);
}
// remount stream.
if ((ret = http_mount(source, req)) != ERROR_SUCCESS) {
srs_trace("vhost %s http_remux reload failed", vhost.c_str());
return ret;
}
} else {
// for without SrsRequest and SrsSource if stream is not played yet, do http mount automatically
// when start play this http flv stream.
}
srs_trace("vhost %s http_remux reload success", vhost.c_str());
return ret;
}
int SrsHttpStreamServer::mount_hls(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
std::string sid = r->get_stream_url();
if (shls.find(sid) == shls.end()) {
srs_info("ignore mount hls stream for disabled");
return ret;
}
SrsHlsEntry* entry = shls[sid];
// TODO: FIXME: supports reload.
... ... @@ -958,7 +1040,6 @@ int SrsHttpStreamServer::hls_update_ts(SrsRequest* r, string uri, string ts)
return ret;
}
int SrsHttpStreamServer::hls_remove_ts(SrsRequest* r, string uri)
{
int ret = ERROR_SUCCESS;
... ... @@ -1010,13 +1091,6 @@ void SrsHttpStreamServer::unmount_hls(SrsRequest* r)
}
}
int SrsHttpStreamServer::on_reload_vhost_http_remux_updated()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsHttpStreamServer::on_reload_vhost_hls(string vhost)
{
int ret = ERROR_SUCCESS;
... ... @@ -1056,6 +1130,8 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
}
// hstrs not enabled, ignore.
// for origin: generally set hstrs to 'off' and mount while stream is pushed to origin.
// for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount.
entry = it->second;
if (!entry->hstrs) {
return ret;
... ... @@ -1090,6 +1166,17 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
// hijack for entry.
SrsRequest* r = hreq->to_request(vhost->arg0());
SrsAutoFree(SrsRequest, r);
std::string sid = r->get_stream_url();
// check if the stream is enabled.
if (sflvs.find(sid) != sflvs.end()) {
SrsLiveEntry* entry = sflvs[sid];
if (!entry->stream->entry->enabled) {
srs_error("stream is disabled, hijack failed. ret=%d", ret);
return ret;
}
}
SrsSource* s = SrsSource::fetch(r);
if (!s) {
if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) {
... ... @@ -1097,15 +1184,14 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
}
}
srs_assert(s != NULL);
// create http streaming handler.
if ((ret = http_mount(s, r)) != ERROR_SUCCESS) {
return ret;
}
// use the handler if exists.
if (ph) {
std::string sid = r->get_stream_url();
if (sflvs.find(sid) != sflvs.end()) {
entry = sflvs[sid];
*ph = entry->stream;
... ... @@ -1132,7 +1218,7 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
int SrsHttpStreamServer::initialize_flv_streaming()
{
int ret = ERROR_SUCCESS;
// http flv live stream mount for each vhost.
SrsConfDirective* root = _srs_config->get_root();
for (int i = 0; i < (int)root->directives.size(); i++) {
... ... @@ -1141,21 +1227,29 @@ int SrsHttpStreamServer::initialize_flv_streaming()
if (!conf->is_vhost()) {
continue;
}
std::string vhost = conf->arg0();
if (!_srs_config->get_vhost_http_remux_enabled(vhost)) {
continue;
}
SrsLiveEntry* entry = new SrsLiveEntry(
_srs_config->get_vhost_http_remux_mount(vhost),
_srs_config->get_vhost_http_remux_hstrs(vhost)
);
tflvs[vhost] = entry;
srs_trace("http flv live stream, vhost=%s, mount=%s",
vhost.c_str(), entry->mount.c_str());
initialize_flv_entry(conf->arg0());
}
return ret;
}
int SrsHttpStreamServer::initialize_flv_entry(std::string vhost)
{
int ret = ERROR_SUCCESS;
if (!_srs_config->get_vhost_http_remux_enabled(vhost)) {
return ret;
}
SrsLiveEntry* entry = new SrsLiveEntry(
_srs_config->get_vhost_http_remux_mount(vhost),
_srs_config->get_vhost_http_remux_hstrs(vhost)
);
tflvs[vhost] = entry;
srs_trace("http flv live stream, vhost=%s, mount=%s",
vhost.c_str(), entry->mount.c_str());
return ret;
}
... ...
... ... @@ -242,6 +242,9 @@ private:
bool _is_aac;
bool _is_mp3;
public:
SrsRequest* req;
SrsSource* source;
public:
// for template, the mount contains variables.
// for concrete stream, the mount is url to access.
std::string mount;
... ... @@ -252,7 +255,8 @@ public:
SrsStreamCache* cache;
SrsLiveEntry(std::string m, bool h);
void reset_hstrs(bool h);
bool is_flv();
bool is_ts();
bool is_mp3();
... ... @@ -348,13 +352,14 @@ public:
virtual void unmount_hls(SrsRequest* r);
// interface ISrsReloadHandler.
public:
virtual int on_reload_vhost_http_remux_updated();
virtual int on_reload_vhost_http_remux_updated(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
// interface ISrsHttpMatchHijacker
public:
virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);
private:
virtual int initialize_flv_streaming();
virtual int initialize_flv_entry(std::string vhost);
virtual int initialize_hls_streaming();
virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl);
};
... ...
... ... @@ -55,22 +55,25 @@ public:
};
/**
* the stage is used for a collection of object to do print,
* the print time in a stage is constant and not changed.
* for example, stage #1 for all play clients, print time is 3s,
* if there is 10clients, then all clients should print in 10*3s.
* Usage:
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
while (true) {
pprint->elapse();
if (pprint->can_print()) {
// print pithy message.
// user can get the elapse time by: pprint->age()
}
// read and write RTMP messages.
}
*/
* the stage is used for a collection of object to do print,
* the print time in a stage is constant and not changed,
* that is, we always got one message to print every specified time.
*
* for example, stage #1 for all play clients, print time is 3s,
* if there is 1client, it will print every 3s.
* if there is 10clients, random select one to print every 3s.
* Usage:
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
while (true) {
pprint->elapse();
if (pprint->can_print()) {
// print pithy message.
// user can get the elapse time by: pprint->age()
}
// read and write RTMP messages.
}
*/
class SrsPithyPrint
{
private:
... ...
... ... @@ -100,7 +100,7 @@ int ISrsReloadHandler::on_reload_vhost_http_updated()
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_http_remux_updated()
int ISrsReloadHandler::on_reload_vhost_http_remux_updated(string vhost)
{
return ERROR_SUCCESS;
}
... ...
... ... @@ -58,7 +58,7 @@ public:
virtual int on_reload_http_stream_updated();
public:
virtual int on_reload_vhost_http_updated();
virtual int on_reload_vhost_http_remux_updated();
virtual int on_reload_vhost_http_remux_updated(std::string vhost);
virtual int on_reload_vhost_added(std::string vhost);
virtual int on_reload_vhost_removed(std::string vhost);
virtual int on_reload_vhost_atc(std::string vhost);
... ...