winlin

call http api when dvr got keyframe

... ... @@ -96,6 +96,8 @@ vhost dvr.srs.com {
# dvr RTMP stream to file,
# start to record to file when encoder publish,
# reap flv according by specified dvr_plan.
# http callbacks:
# @see http callback on_dvr_keyframe on http_hooks section.
dvr {
# whether enabled dvr features
# default: off
... ... @@ -320,6 +322,19 @@ vhost hooks.callback.srs.com {
# support multiple api hooks, format:
# on_stop http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_stop http://127.0.0.1:8085/api/v1/sessions http://localhost:8085/api/v1/sessions;
# when dvr got an keyframe, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_dvr_keyframe",
# "vhost": "video.test.com", "app": "live",
# "stream": "livestream"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_stop http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_dvr_keyframe http://127.0.0.1:8085/api/v1/dvrs http://localhost:8085/api/v1/dvrs;
}
}
... ...
... ... @@ -1560,6 +1560,27 @@ SrsConfDirective* SrsConfig::get_vhost_on_stop(string vhost)
return conf->get("on_stop");
}
SrsConfDirective* SrsConfig::get_vhost_on_dvr_keyframe(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
SrsConfDirective* enabled = conf->get("enabled");
if (!enabled || enabled->arg0() != "on") {
return NULL;
}
return conf->get("on_dvr_keyframe");
}
bool SrsConfig::get_vhost_enabled(string vhost)
{
SrsConfDirective* vhost_conf = get_vhost(vhost);
... ...
... ... @@ -173,6 +173,7 @@ public:
virtual SrsConfDirective* get_vhost_on_unpublish(std::string vhost);
virtual SrsConfDirective* get_vhost_on_play(std::string vhost);
virtual SrsConfDirective* get_vhost_on_stop(std::string vhost);
virtual SrsConfDirective* get_vhost_on_dvr_keyframe(std::string vhost);
virtual bool get_gop_cache(std::string vhost);
virtual bool get_atc(std::string vhost);
virtual double get_queue_length(std::string vhost);
... ...
... ... @@ -37,6 +37,8 @@ using namespace std;
#include <srs_core_autofree.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_codec.hpp>
SrsFileStream::SrsFileStream()
{
... ... @@ -364,34 +366,6 @@ int SrsDvrPlan::on_publish()
return ret;
}
int SrsDvrPlan::flv_open(string stream, string path)
{
int ret = ERROR_SUCCESS;
if ((ret = fs->open(path)) != ERROR_SUCCESS) {
srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str());
return ret;
}
int SrsDvrPlan::flv_close()
{
return fs->close();
}
int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
... ... @@ -459,6 +433,16 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
return ret;
}
#ifdef SRS_AUTO_HTTP_CALLBACK
bool is_key_frame = SrsCodec::video_is_keyframe((int8_t*)payload, size);
srs_verbose("dvr video is key: %d", is_key_frame);
if (is_key_frame) {
if ((ret = on_dvr_keyframe()) != ERROR_SUCCESS) {
return ret;
}
}
#endif
if ((ret = on_video_msg(video)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -466,6 +450,29 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
return ret;
}
int SrsDvrPlan::flv_open(string stream, string path)
{
int ret = ERROR_SUCCESS;
if ((ret = fs->open(path)) != ERROR_SUCCESS) {
srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str());
return ret;
}
int SrsDvrPlan::on_audio_msg(SrsSharedPtrMessage* /*audio*/)
{
int ret = ERROR_SUCCESS;
... ... @@ -478,6 +485,32 @@ int SrsDvrPlan::on_video_msg(SrsSharedPtrMessage* /*video*/)
return ret;
}
int SrsDvrPlan::flv_close()
{
return fs->close();
}
int SrsDvrPlan::on_dvr_keyframe()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
// HTTP: on_dvr_keyframe
SrsConfDirective* on_dvr_keyframe = _srs_config->get_vhost_on_dvr_keyframe(_req->vhost);
if (!on_dvr_keyframe) {
srs_info("ignore the empty http callback: on_dvr_keyframe");
return ret;
}
for (int i = 0; i < (int)on_dvr_keyframe->args.size(); i++) {
std::string url = on_dvr_keyframe->args.at(i);
SrsHttpHooks::on_dvr_keyframe(url, _req);
}
#endif
return ret;
}
SrsDvrPlan* SrsDvrPlan::create_plan(string vhost)
{
std::string plan = _srs_config->get_dvr_plan(vhost);
... ...
... ... @@ -146,6 +146,8 @@ protected:
virtual int on_audio_msg(SrsSharedPtrMessage* audio);
virtual int on_video_msg(SrsSharedPtrMessage* video);
virtual int flv_close();
private:
virtual int on_dvr_keyframe();
public:
static SrsDvrPlan* create_plan(std::string vhost);
};
... ...
... ... @@ -55,7 +55,7 @@ SrsHttpClient::~SrsHttpClient()
srs_freep(parser);
}
int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res)
int SrsHttpClient::post(SrsHttpUri* uri, string req, string& res)
{
res = "";
... ... @@ -183,7 +183,7 @@ SrsHttpHooks::~SrsHttpHooks()
{
}
int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, SrsRequest* req)
int SrsHttpHooks::on_connect(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -236,7 +236,7 @@ int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, Srs
return ret;
}
void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsRequest* req)
void SrsHttpHooks::on_close(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -289,7 +289,7 @@ void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsR
return;
}
int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, SrsRequest* req)
int SrsHttpHooks::on_publish(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -343,7 +343,7 @@ int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, Srs
return ret;
}
void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req)
void SrsHttpHooks::on_unpublish(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -397,7 +397,7 @@ void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip,
return;
}
int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsRequest* req)
int SrsHttpHooks::on_play(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -451,7 +451,7 @@ int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsReq
return ret;
}
void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRequest* req)
void SrsHttpHooks::on_stop(string url, int client_id, string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ... @@ -505,4 +505,54 @@ void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRe
return;
}
void SrsHttpHooks::on_dvr_keyframe(string url, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_warn("http uri parse on_dvr_keyframe url failed, ignored. "
"url=%s, ret=%d", url.c_str(), ret);
return;
}
/**
{
"action": "on_dvr_keyframe",
"vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << JOBJECT_START
<< JFIELD_STR("action", "on_dvr_keyframe") << JFIELD_CONT
<< JFIELD_STR("vhost", req->vhost) << JFIELD_CONT
<< JFIELD_STR("app", req->app) << JFIELD_CONT
<< JFIELD_STR("stream", req->stream)
<< JOBJECT_END;
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_warn("http post on_dvr_keyframe uri failed, ignored. "
"url=%s, request=%s, response=%s, ret=%d",
url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_warn("http hook on_dvr_keyframe validate failed, ignored. "
"res=%s, ret=%d", res.c_str(), ret);
return;
}
srs_trace("http hook on_dvr_keyframe success. "
"url=%s, request=%s, response=%s, ret=%d",
url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
#endif
... ...
... ... @@ -71,8 +71,9 @@ private:
*/
class SrsHttpHooks
{
public:
private:
SrsHttpHooks();
public:
virtual ~SrsHttpHooks();
public:
/**
... ... @@ -82,14 +83,14 @@ public:
* ignore if empty.
* @return valid failed or connect to the url failed.
*/
virtual int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req);
static int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_close hook, when client disconnect to srs, where client is valid by on_connect.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_close(std::string url, int client_id, std::string ip, SrsRequest* req);
static void on_close(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_publish hook, when client(encoder) start to publish stream
* @param client_id the id of client on server.
... ... @@ -97,14 +98,14 @@ public:
* ignore if empty.
* @return valid failed or connect to the url failed.
*/
virtual int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req);
static int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_unpublish hook, when client(encoder) stop publish stream.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req);
static void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_play hook, when client start to play stream.
* @param client_id the id of client on server.
... ... @@ -112,14 +113,21 @@ public:
* ignore if empty.
* @return valid failed or connect to the url failed.
*/
virtual int on_play(std::string url, int client_id, std::string ip, SrsRequest* req);
static int on_play(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_stop hook, when client stop to play the stream.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req);
static void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req);
public:
/**
* on_dvr_keyframe hook, when dvr get keyframe.
* @param url the api server url, to process the event.
* ignore if empty.
*/
static void on_dvr_keyframe(std::string url, SrsRequest* req);
};
#endif
... ...
... ... @@ -68,9 +68,6 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
skt = new SrsSocket(client_stfd);
rtmp = new SrsRtmpServer(skt);
refer = new SrsRefer();
#ifdef SRS_AUTO_HTTP_CALLBACK
http_hooks = new SrsHttpHooks();
#endif
bandwidth = new SrsBandwidth();
duration = 0;
... ... @@ -86,9 +83,6 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(rtmp);
srs_freep(skt);
srs_freep(refer);
#ifdef SRS_AUTO_HTTP_CALLBACK
srs_freep(http_hooks);
#endif
srs_freep(bandwidth);
}
... ... @@ -730,7 +724,7 @@ int SrsRtmpConn::on_connect()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
#ifdef SRS_AUTO_HTTP_CALLBACK
// HTTP: on_connect
SrsConfDirective* on_connect = _srs_config->get_vhost_on_connect(req->vhost);
if (!on_connect) {
... ... @@ -740,7 +734,7 @@ int SrsRtmpConn::on_connect()
for (int i = 0; i < (int)on_connect->args.size(); i++) {
std::string url = on_connect->args.at(i);
if ((ret = http_hooks->on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) {
if ((ret = SrsHttpHooks::on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_connect failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
... ... @@ -763,7 +757,7 @@ void SrsRtmpConn::on_close()
for (int i = 0; i < (int)on_close->args.size(); i++) {
std::string url = on_close->args.at(i);
http_hooks->on_close(url, connection_id, ip, req);
SrsHttpHooks::on_close(url, connection_id, ip, req);
}
#endif
}
... ... @@ -782,7 +776,7 @@ int SrsRtmpConn::on_publish()
for (int i = 0; i < (int)on_publish->args.size(); i++) {
std::string url = on_publish->args.at(i);
if ((ret = http_hooks->on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) {
if ((ret = SrsHttpHooks::on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_publish failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
... ... @@ -805,7 +799,7 @@ void SrsRtmpConn::on_unpublish()
for (int i = 0; i < (int)on_unpublish->args.size(); i++) {
std::string url = on_unpublish->args.at(i);
http_hooks->on_unpublish(url, connection_id, ip, req);
SrsHttpHooks::on_unpublish(url, connection_id, ip, req);
}
#endif
}
... ... @@ -824,7 +818,7 @@ int SrsRtmpConn::on_play()
for (int i = 0; i < (int)on_play->args.size(); i++) {
std::string url = on_play->args.at(i);
if ((ret = http_hooks->on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) {
if ((ret = SrsHttpHooks::on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_play failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
... ... @@ -847,7 +841,7 @@ void SrsRtmpConn::on_stop()
for (int i = 0; i < (int)on_stop->args.size(); i++) {
std::string url = on_stop->args.at(i);
http_hooks->on_stop(url, connection_id, ip, req);
SrsHttpHooks::on_stop(url, connection_id, ip, req);
}
#endif
... ...
... ... @@ -58,9 +58,6 @@ private:
SrsSocket* skt;
SrsRtmpServer* rtmp;
SrsRefer* refer;
#ifdef SRS_AUTO_HTTP_CALLBACK
SrsHttpHooks* http_hooks;
#endif
SrsBandwidth* bandwidth;
// elapse duration in ms
// for live play duration, for instance, rtmpdump to record.
... ...