winlin

for #179, refine dvr, support callback when reap dvr segment.

... ... @@ -297,7 +297,8 @@ vhost dvr.srs.com {
# vhost, query all dvr of this vhost.
# response in json, where:
# {code:0, dvrs: [{path_tmpl:"./[15].[04].[05].[999].flv", path_dvr:"./22.7.43.312.flv",
# wait_keyframe:true, vhost:"__defaultVhost", callback:"http://dvr/callback"
# wait_keyframe:true, vhost:"__defaultVhost", callback:"http://127.0.0.1:8085/api/v1/dvrs",
# status:"stop"|"start"
# }]}
# method=POST
# to start dvr of specified vhost.
... ... @@ -313,6 +314,8 @@ vhost dvr.srs.com {
# vhost, stop all dvr of this vhost.
# response in json, where:
# {code:0}
# when reap segment, the callback POST request in json:
# {action:"on_dvr_reap_segment"}
# default: session
dvr_plan session;
# the dvr output path.
... ...
... ... @@ -253,7 +253,7 @@ class RESTDvrs(object):
return json.dumps(dvrs)
'''
for SRS hook: on_dvr
for SRS hook: on_dvr, on_dvr_reap_segment
on_dvr:
when srs reap a dvr file, call the hook,
the request in the POST data string is a object encode by json:
... ... @@ -265,6 +265,17 @@ class RESTDvrs(object):
"cwd": "/usr/local/srs",
"file": "./objs/nginx/html/live/livestream.1420254068776.flv"
}
on_dvr_reap_segment:
when api dvr specifes the callback when reap flv segment, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_dvr_reap_segment",
"client_id": 1985,
"vhost": "video.test.com", "app": "live",
"stream": "livestream",
"cwd": "/usr/local/srs",
"file": "./objs/nginx/html/live/livestream.1420254068776.flv"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
... ... @@ -287,6 +298,8 @@ class RESTDvrs(object):
action = json_req["action"]
if action == "on_dvr":
code = self.__on_dvr(json_req)
if action == "on_dvr_reap_segment":
code = self.__on_dvr_reap_segment(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action
... ... @@ -308,6 +321,18 @@ class RESTDvrs(object):
return code
def __on_dvr_reap_segment(self, req):
code = Error.success
trace("srs %s: client id=%s, vhost=%s, app=%s, stream=%s, cwd=%s, file=%s"%(
req["action"], req["client_id"], req["vhost"], req["app"], req["stream"],
req["cwd"], req["file"]
))
# TODO: process the on_dvr event
return code
'''
handle the sessions requests: client play/stop stream
'''
... ...
... ... @@ -3342,6 +3342,13 @@ bool SrsConfig::get_dvr_enabled(string vhost)
return false;
}
void SrsConfig::set_dvr_enabled(string vhost, bool enabled)
{
SrsConfDirective* conf = create_directive(vhost, "dvr", "enabled");
conf->args.clear();
conf->args.push_back(enabled? "on":"off");
}
string SrsConfig::get_dvr_path(string vhost)
{
SrsConfDirective* dvr = get_dvr(vhost);
... ...
... ... @@ -923,6 +923,7 @@ public:
* whether dvr is enabled.
*/
virtual bool get_dvr_enabled(std::string vhost);
virtual void set_dvr_enabled(std::string vhost, bool enabled);
/**
* get the dvr path, the flv file to save in.
*/
... ...
... ... @@ -144,15 +144,15 @@ int SrsFlvSegment::open(bool use_tmp_file)
return ret;
}
}
// initialize the encoder.
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
// when exists, donot write flv header.
if (fresh_flv_file) {
// initialize the encoder.
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
// write the flv header to writer.
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header failed. ret=%d", ret);
... ... @@ -195,6 +195,8 @@ int SrsFlvSegment::close()
}
}
// TODO: FIXME: the http callback is async, which will trigger thread switch,
// so the on_video maybe invoked during the http callback, and error.
if ((ret = plan->on_reap_segment()) != ERROR_SUCCESS) {
srs_error("dvr: notify plan to reap segment failed. ret=%d", ret);
return ret;
... ... @@ -743,10 +745,15 @@ SrsDvrApiPlan::SrsDvrApiPlan()
{
autostart = false;
started = false;
metadata = sh_audio = sh_video = NULL;
}
SrsDvrApiPlan::~SrsDvrApiPlan()
{
srs_freep(metadata);
srs_freep(sh_audio);
srs_freep(sh_video);
}
int SrsDvrApiPlan::initialize(SrsSource* s, SrsRequest* r)
... ... @@ -787,6 +794,8 @@ int SrsDvrApiPlan::on_publish()
return ret;
}
dvr_enabled = true;
if ((ret = segment->close()) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -794,8 +803,17 @@ int SrsDvrApiPlan::on_publish()
if ((ret = segment->open()) != ERROR_SUCCESS) {
return ret;
}
dvr_enabled = true;
// update sequence header
if (metadata && (ret = SrsDvrPlan::on_meta_data(metadata)) != ERROR_SUCCESS) {
return ret;
}
if (sh_video && (ret = SrsDvrPlan::on_video(sh_video)) != ERROR_SUCCESS) {
return ret;
}
if (sh_audio && (ret = SrsDvrPlan::on_audio(sh_audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
... ... @@ -804,6 +822,48 @@ void SrsDvrApiPlan::on_unpublish()
{
}
int SrsDvrApiPlan::on_meta_data(SrsSharedPtrMessage* __metadata)
{
int ret = ERROR_SUCCESS;
srs_freep(metadata);
metadata = __metadata->copy();
return ret;
}
int SrsDvrApiPlan::on_audio(SrsSharedPtrMessage* __audio)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::audio_is_sequence_header(__audio->payload, __audio->size)) {
srs_freep(sh_audio);
sh_audio = __audio->copy();
}
if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrApiPlan::on_video(SrsSharedPtrMessage* __video)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::video_is_sequence_header(__video->payload, __video->size)) {
srs_freep(sh_video);
sh_video = __video->copy();
}
if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrApiPlan::set_path_tmpl(string path_tmpl)
{
_srs_config->set_dvr_path(req->vhost, path_tmpl);
... ... @@ -830,6 +890,9 @@ int SrsDvrApiPlan::start()
return ret;
}
// enable the config.
_srs_config->set_dvr_enabled(req->vhost, true);
// stop dvr
if (dvr_enabled) {
// ignore error.
... ... @@ -862,16 +925,58 @@ int SrsDvrApiPlan::dumps(stringstream& ss)
<< __SRS_JFIELD_STR("path_dvr", segment->get_path()) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_BOOL("wait_keyframe", wait_keyframe) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("callback", callback)
<< __SRS_JFIELD_STR("callback", callback) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("status", (dvr_enabled? "start":"stop"))
<< __SRS_JOBJECT_END;
return ret;
}
int SrsDvrApiPlan::stop()
{
int ret = ERROR_SUCCESS;
_srs_config->set_dvr_enabled(req->vhost, false);
started = false;
// stop dvr
if (dvr_enabled) {
// ignore error.
int ret = segment->close();
if (ret != ERROR_SUCCESS) {
srs_warn("ignore flv close error. ret=%d", ret);
}
dvr_enabled = false;
}
srs_trace("dvr: stop dvr of vhost=%s", req->vhost.c_str());
return ret;
}
int SrsDvrApiPlan::on_reap_segment()
{
// TODO: FIXME: implements it.
return ERROR_SUCCESS;
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
// HTTP: callback
if (callback.empty()) {
srs_warn("dvr: ignore for callback empty, vhost=%s", req->vhost.c_str());
return ret;
}
int connection_id = _srs_context->get_id();
std::string cwd = _srs_config->cwd();
std::string file = segment->get_path();
std::string url = callback;
if ((ret = SrsHttpHooks::on_dvr_reap_segment(url, connection_id, req, cwd, file)) != ERROR_SUCCESS) {
srs_error("hook client on_dvr_reap_segment failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
#endif
return ret;
}
SrsDvrAppendPlan::SrsDvrAppendPlan()
... ... @@ -909,30 +1014,30 @@ void SrsDvrAppendPlan::on_unpublish()
{
}
int SrsDvrAppendPlan::on_audio(SrsSharedPtrMessage* audio)
int SrsDvrAppendPlan::on_audio(SrsSharedPtrMessage* __audio)
{
int ret = ERROR_SUCCESS;
if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
if ((ret = update_duration(__audio)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrAppendPlan::on_video(SrsSharedPtrMessage* video)
int SrsDvrAppendPlan::on_video(SrsSharedPtrMessage* __video)
{
int ret = ERROR_SUCCESS;
if ((ret = update_duration(video)) != ERROR_SUCCESS) {
if ((ret = update_duration(__video)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -1038,40 +1143,40 @@ int SrsDvrSegmentPlan::on_meta_data(SrsSharedPtrMessage* __metadata)
return ret;
}
int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio)
int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* __audio)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::audio_is_sequence_header(audio->payload, audio->size)) {
if (SrsFlvCodec::audio_is_sequence_header(__audio->payload, __audio->size)) {
srs_freep(sh_audio);
sh_audio = audio->copy();
sh_audio = __audio->copy();
}
if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
if ((ret = update_duration(__audio)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::on_audio(__audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video)
int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* __video)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::video_is_sequence_header(video->payload, video->size)) {
if (SrsFlvCodec::video_is_sequence_header(__video->payload, __video->size)) {
srs_freep(sh_video);
sh_video = video->copy();
sh_video = __video->copy();
}
if ((ret = update_duration(video)) != ERROR_SUCCESS) {
if ((ret = update_duration(__video)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::on_video(__video)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -1240,6 +1345,30 @@ int SrsApiDvrPool::create(SrsJsonAny* json)
return dvr->start();
}
int SrsApiDvrPool::stop(string vhost)
{
int ret = ERROR_SUCCESS;
std::vector<SrsDvrApiPlan*> plans;
for (int i = 0; i < (int)dvrs.size(); i++) {
SrsDvrApiPlan* plan = dvrs.at(i);
if (!vhost.empty() && plan->req->vhost != vhost) {
continue;
}
plans.push_back(plan);
}
for (int i = 0; i < (int)plans.size(); i++) {
SrsDvrApiPlan* plan = plans.at(i);
if ((ret = plan->stop()) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
}
SrsDvr::SrsDvr(SrsSource* s)
{
source = s;
... ...
... ... @@ -237,6 +237,11 @@ public:
class SrsDvrApiPlan : public SrsDvrPlan
{
private:
// cache the metadata and sequence header, for new segment maybe opened.
SrsSharedPtrMessage* sh_audio;
SrsSharedPtrMessage* sh_video;
SrsSharedPtrMessage* metadata;
private:
std::string callback;
bool autostart;
bool started;
... ... @@ -247,12 +252,16 @@ public:
virtual int initialize(SrsSource* s, SrsRequest* r);
virtual int on_publish();
virtual void on_unpublish();
virtual int on_meta_data(SrsSharedPtrMessage* __metadata);
virtual int on_audio(SrsSharedPtrMessage* __audio);
virtual int on_video(SrsSharedPtrMessage* __video);
public:
virtual int set_path_tmpl(std::string path_tmpl);
virtual int set_callback(std::string value);
virtual int set_wait_keyframe(bool wait_keyframe);
virtual int start();
virtual int dumps(std::stringstream& ss);
virtual int stop();
protected:
virtual int on_reap_segment();
};
... ... @@ -270,14 +279,8 @@ public:
public:
virtual int on_publish();
virtual void on_unpublish();
/**
* @param audio, directly ptr, copy it if need to save it.
*/
virtual int on_audio(SrsSharedPtrMessage* audio);
/**
* @param video, directly ptr, copy it if need to save it.
*/
virtual int on_video(SrsSharedPtrMessage* video);
virtual int on_audio(SrsSharedPtrMessage* __audio);
virtual int on_video(SrsSharedPtrMessage* __video);
private:
virtual int update_duration(SrsSharedPtrMessage* msg);
};
... ... @@ -300,18 +303,9 @@ public:
virtual int initialize(SrsSource* source, SrsRequest* req);
virtual int on_publish();
virtual void on_unpublish();
/**
* when got metadata.
*/
virtual int on_meta_data(SrsSharedPtrMessage* __metadata);
/**
* @param audio, directly ptr, copy it if need to save it.
*/
virtual int on_audio(SrsSharedPtrMessage* audio);
/**
* @param video, directly ptr, copy it if need to save it.
*/
virtual int on_video(SrsSharedPtrMessage* video);
virtual int on_audio(SrsSharedPtrMessage* __audio);
virtual int on_video(SrsSharedPtrMessage* __video);
private:
virtual int update_duration(SrsSharedPtrMessage* msg);
};
... ... @@ -334,6 +328,7 @@ public:
public:
virtual int dumps(std::string vhost, std::stringstream& ss);
virtual int create(SrsJsonAny* json);
virtual int stop(std::string vhost);
};
/**
... ...
... ... @@ -501,8 +501,8 @@ int SrsGoApiDvrs::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
<< __SRS_JFIELD_ORG("dvrs", data.str())
<< __SRS_JOBJECT_END;
} else if (r->is_http_post()) {
char* body = (char*)r->body().c_str();
SrsJsonAny* json = SrsJsonAny::loads(body);
std::string body = r->body();
SrsJsonAny* json = SrsJsonAny::loads((char*)body.c_str());
int ret = ERROR_SUCCESS;
if (!json) {
ret = ERROR_HTTP_JSON_REQUIRED;
... ... @@ -513,6 +513,12 @@ int SrsGoApiDvrs::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_ERROR(ret)
<< __SRS_JOBJECT_END;
} else if (r->is_http_delete()) {
int ret = pool->stop(r->query_get("vhost"));
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_ERROR(ret)
<< __SRS_JOBJECT_END;
} else {
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_ERROR(ERROR_HTTP_DVR_REQUEST)
... ...
... ... @@ -436,4 +436,60 @@ int SrsHttpHooks::on_dvr(string url, int client_id, string ip, SrsRequest* req,
return ret;
}
int SrsHttpHooks::on_dvr_reap_segment(string url, int client_id, SrsRequest* req, string cwd, string file)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http uri parse on_dvr_reap_segment url failed, ignored. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return ret;
}
std::stringstream ss;
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_STR("action", "on_dvr_reap_segment") << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("client_id", client_id) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("app", req->app) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("stream", req->stream) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("cwd", cwd) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("file", file)
<< __SRS_JOBJECT_END;
std::string data = ss.str();
std::string res;
int status_code;
SrsHttpClient http;
if ((ret = http.post(&uri, data, status_code, res)) != ERROR_SUCCESS) {
srs_error("http post on_dvr_reap_segment uri failed, ignored. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
// ensure the http status is ok.
// https://github.com/winlinvip/simple-rtmp-server/issues/158
if (status_code != SRS_CONSTS_HTTP_OK) {
ret = ERROR_HTTP_STATUS_INVLIAD;
srs_error("http hook on_dvr_reap_segment status failed. "
"client_id=%d, code=%d, ret=%d", client_id, status_code, ret);
return ret;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_warn("http hook on_dvr_reap_segment validate failed, ignored. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return ret;
}
srs_trace("http hook on_dvr_reap_segment success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
#endif
... ...
... ... @@ -104,6 +104,15 @@ public:
* @param file the file path, can be relative or absolute path.
*/
static int on_dvr(std::string url, int client_id, std::string ip, SrsRequest* req, std::string cwd, std::string file);
/**
* when dvr reap segment, callback.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
* @param cwd the current work directory, used to resolve the reltive file path.
* @param file the file path, can be relative or absolute path.
*/
static int on_dvr_reap_segment(std::string url, int client_id, SrsRequest* req, std::string cwd, std::string file);
};
#endif
... ...