winlin

for #179, refine dvr code to more simple.

... ... @@ -281,6 +281,12 @@ vhost dvr.srs.com {
# whether enabled dvr features
# default: off
enabled on;
# the dvr plan. canbe:
# session reap flv when session end(unpublish).
# segment reap flv when flv duration exceed the specified dvr_duration.
# api reap flv when api required.
# default: session
dvr_plan session;
# the dvr output path.
# we supports some variables to generate the filename.
# [vhost], the vhost of stream.
... ... @@ -314,22 +320,28 @@ vhost dvr.srs.com {
# dvr_path /data/ossrs.net/live/2015/01/livestream-03-10.57.30.776.flv;
# @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DVR#custom-path
# @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DVR#custom-path
# segment,session apply it.
# api apply before api specified the path.
# default: ./objs/nginx/html
dvr_path ./objs/nginx/html;
# the dvr plan. canbe:
# session reap flv when session end(unpublish).
# segment reap flv when flv duration exceed the specified dvr_duration.
# default: session
dvr_plan session;
# the param for plan(segment), in seconds.
# the duration for dvr file, reap if exeed, in seconds.
# segment apply it.
# session,api ignore.
# default: 30
dvr_duration 30;
# the param for plan(segment),
# whether wait keyframe to reap segment,
# if off, reap segment when duration exceed the dvr_duration,
# if on, reap segment when duration exceed and got keyframe.
# segment apply it.
# session,api ignore.
# default: on
dvr_wait_keyframe on;
# whether dvr auto start when publish.
# if off, dvr wait for api to start it.
# api apply it.
# segment,session ignore.
# default: on
dvr_autostart on;
# about the stream monotonically increasing:
# 1. video timestamp is monotonically increasing,
# 2. audio timestamp is monotonically increasing,
... ... @@ -340,10 +352,11 @@ vhost dvr.srs.com {
# 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
# 2. zero, only ensure sttream start at zero, ignore timestamp jitter.
# 3. off, disable the time jitter algorithm, like atc.
# apply for all dvr plan.
# default: full
time_jitter full;
# on_dvr
# on_dvr, never config in here, should config in http_hooks.
# for the dvr http callback, @see http_hooks.on_dvr of vhost hooks.callback.srs.com
# @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DVR#http-callback
# @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DVR#http-callback
... ...
... ... @@ -1418,6 +1418,7 @@ int SrsConfig::check_config()
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "dvr_path" && m != "dvr_plan"
&& m != "dvr_duration" && m != "dvr_wait_keyframe" && m != "time_jitter"
&& m != "dvr_autostart"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost dvr directive %s, ret=%d", m.c_str(), ret);
... ... @@ -3377,6 +3378,23 @@ bool SrsConfig::get_dvr_wait_keyframe(string vhost)
return false;
}
bool SrsConfig::get_dvr_autostart(string vhost)
{
SrsConfDirective* dvr = get_dvr(vhost);
if (!dvr) {
return true;
}
SrsConfDirective* conf = dvr->get("dvr_autostart");
if (!conf || conf->arg0() != "off") {
return true;
}
return false;
}
int SrsConfig::get_dvr_time_jitter(string vhost)
{
SrsConfDirective* dvr = get_dvr(vhost);
... ...
... ... @@ -60,6 +60,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session"
#define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment"
#define SRS_CONF_DEFAULT_DVR_PLAN_API "api"
#define SRS_CONF_DEFAULT_DVR_PLAN SRS_CONF_DEFAULT_DVR_PLAN_SESSION
#define SRS_CONF_DEFAULT_DVR_DURATION 30
#define SRS_CONF_DEFAULT_TIME_JITTER "full"
... ... @@ -921,14 +922,18 @@ public:
*/
virtual std::string get_dvr_plan(std::string vhost);
/**
* get the duration of dvr flv, for segment plan.
* get the duration of dvr flv.
*/
virtual int get_dvr_duration(std::string vhost);
/**
* whether wait keyframe to reap segment, for segment plan.
* whether wait keyframe to reap segment.
*/
virtual bool get_dvr_wait_keyframe(std::string vhost);
/**
* whether autostart for dvr. wait api to start dvr if false.
*/
virtual bool get_dvr_autostart(std::string vhost);
/**
* get the time_jitter algorithm for dvr.
*/
virtual int get_dvr_time_jitter(std::string vhost);
... ...
... ... @@ -39,8 +39,17 @@ using namespace std;
#include <srs_kernel_flv.hpp>
#include <srs_kernel_file.hpp>
SrsFlvSegment::SrsFlvSegment()
SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p)
{
req = NULL;
source = NULL;
jitter = NULL;
plan = p;
fs = new SrsFileWriter();
enc = new SrsFlvEncoder();
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
path = "";
has_keyframe = false;
duration = 0;
... ... @@ -48,95 +57,238 @@ SrsFlvSegment::SrsFlvSegment()
stream_starttime = 0;
stream_previous_pkt_time = -1;
stream_duration = 0;
_srs_config->subscribe(this);
}
SrsFlvSegment::~SrsFlvSegment()
{
_srs_config->unsubscribe(this);
srs_freep(jitter);
srs_freep(fs);
srs_freep(enc);
}
void SrsFlvSegment::reset()
int SrsFlvSegment::initialize(SrsSource* s, SrsRequest* r)
{
has_keyframe = false;
starttime = -1;
duration = 0;
int ret = ERROR_SUCCESS;
source = s;
req = r;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
return ret;
}
SrsDvrPlan::SrsDvrPlan()
bool SrsFlvSegment::is_overflow(int64_t max_duration)
{
_source = NULL;
_req = NULL;
jitter = NULL;
dvr_enabled = false;
fs = new SrsFileWriter();
enc = new SrsFlvEncoder();
segment = new SrsFlvSegment();
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
_srs_config->subscribe(this);
return duration >= max_duration;
}
SrsDvrPlan::~SrsDvrPlan()
int SrsFlvSegment::open()
{
_srs_config->unsubscribe(this);
int ret = ERROR_SUCCESS;
srs_freep(jitter);
srs_freep(fs);
srs_freep(enc);
srs_freep(segment);
// ignore when already open.
if (fs->is_open()) {
return ret;
}
path = generate_path();
bool fresh_flv_file = !srs_path_exists(path);
// create dir first.
std::string dir = path.substr(0, path.rfind("/"));
if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) {
srs_error("create dir=%s failed. ret=%d", dir.c_str(), ret);
return ret;
}
srs_info("create dir=%s ok", dir.c_str());
// create jitter.
if ((ret = create_jitter(!fresh_flv_file)) != ERROR_SUCCESS) {
srs_error("create jitter failed, path=%s, fresh=%d. ret=%d", path.c_str(), fresh_flv_file, ret);
return ret;
}
// generate the tmp flv path.
if (!fresh_flv_file) {
// when path exists, always append to it.
// so we must use the target flv path as output flv.
tmp_flv_file = path;
} else {
// when path not exists, dvr to tmp file.
tmp_flv_file = path + ".tmp";
}
// open file writer, in append or create mode.
if (!fresh_flv_file) {
if ((ret = fs->open_append(tmp_flv_file)) != ERROR_SUCCESS) {
srs_error("append file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
srs_trace("dvr: always append to when exists, file=%s.", path.c_str());
} else {
if ((ret = fs->open(tmp_flv_file)) != ERROR_SUCCESS) {
srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
}
// when exists, donot write flv header.
if (fresh_flv_file) {
// initialize the encoder.
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
// write the flv header to writer.
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header failed. ret=%d", ret);
return ret;
}
}
srs_trace("dvr stream %s to file %s", req->stream.c_str(), path.c_str());
return ret;
}
int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* req)
int SrsFlvSegment::close()
{
int ret = ERROR_SUCCESS;
_source = source;
_req = req;
// ignore when already closed.
if (!fs->is_open()) {
return ret;
}
fs->close();
// when tmp flv file exists, reap it.
if (tmp_flv_file != path) {
if (rename(tmp_flv_file.c_str(), path.c_str()) < 0) {
ret = ERROR_SYSTEM_FILE_RENAME;
srs_error("rename flv file failed, %s => %s. ret=%d",
tmp_flv_file.c_str(), path.c_str(), ret);
return ret;
}
}
#ifdef SRS_AUTO_HTTP_CALLBACK
if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
// HTTP: on_dvr
SrsConfDirective* on_dvr = _srs_config->get_vhost_on_dvr(req->vhost);
if (!on_dvr) {
srs_info("ignore the empty http callback: on_dvr");
return ret;
}
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(_req->vhost);
int connection_id = _srs_context->get_id();
std::string ip = req->ip;
std::string cwd = _srs_config->cwd();
std::string file = path;
for (int i = 0; i < (int)on_dvr->args.size(); i++) {
std::string url = on_dvr->args.at(i);
if ((ret = SrsHttpHooks::on_dvr(url, connection_id, ip, req, cwd, file)) != ERROR_SUCCESS) {
srs_error("hook client on_dvr failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
}
#endif
return ret;
}
int SrsDvrPlan::on_publish()
int SrsFlvSegment::write_metadata(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
// support multiple publish.
if (dvr_enabled) {
int size = 0;
char* payload = NULL;
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(char, payload);
SrsRequest* req = _req;
if (!_srs_config->get_dvr_enabled(req->vhost)) {
if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) {
return ret;
}
// jitter when publish, ensure whole stream start from 0.
srs_freep(jitter);
jitter = new SrsRtmpJitter();
return ret;
}
// always update time cache.
srs_update_system_time_ms();
int SrsFlvSegment::write_audio(SrsSharedPtrMessage* __audio)
{
int ret = ERROR_SUCCESS;
// when republish, stream starting.
segment->stream_previous_pkt_time = -1;
segment->stream_starttime = srs_get_system_time_ms();
segment->stream_duration = 0;
SrsSharedPtrMessage* audio = __audio->copy();
SrsAutoFree(SrsSharedPtrMessage, audio);
if ((ret = open_new_segment()) != ERROR_SUCCESS) {
if ((jitter->correct(audio, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
char* payload = audio->payload;
int size = audio->size;
int64_t timestamp = plan->filter_timestamp(audio->timestamp);
if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = on_update_duration(audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrPlan::open_new_segment()
int SrsFlvSegment::write_video(SrsSharedPtrMessage* __video)
{
int ret = ERROR_SUCCESS;
SrsRequest* req = _req;
SrsSharedPtrMessage* video = __video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
char* payload = video->payload;
int size = video->size;
#ifdef SRS_AUTO_HTTP_CALLBACK
bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size)
&& SrsFlvCodec::video_is_keyframe(payload, size)
&& !SrsFlvCodec::video_is_sequence_header(payload, size);
if (is_key_frame) {
has_keyframe = true;
if ((ret = plan->on_video_keyframe()) != ERROR_SUCCESS) {
return ret;
}
}
srs_verbose("dvr video is key: %d", is_key_frame);
#endif
if ((jitter->correct(video, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
// update segment duration, session plan just update the duration,
// the segment plan will reap segment if exceed, this video will write to next segment.
if ((ret = on_update_duration(video)) != ERROR_SUCCESS) {
return ret;
}
int32_t timestamp = plan->filter_timestamp(video->timestamp);
if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
string SrsFlvSegment::generate_path()
{
// the path in config, for example,
// /data/[vhost]/[app]/[stream]/[2006]/[01]/[02]/[15].[04].[05].[999].flv
std::string path_config = _srs_config->get_dvr_path(req->vhost);
... ... @@ -147,26 +299,26 @@ int SrsDvrPlan::open_new_segment()
}
// the flv file path
std::string path = path_config;
std::string flv_path = path_config;
// variable [vhost]
path = srs_string_replace(path, "[vhost]", req->vhost);
flv_path = srs_string_replace(flv_path, "[vhost]", req->vhost);
// variable [app]
path = srs_string_replace(path, "[app]", req->app);
flv_path = srs_string_replace(flv_path, "[app]", req->app);
// variable [stream]
path = srs_string_replace(path, "[stream]", req->stream);
flv_path = srs_string_replace(flv_path, "[stream]", req->stream);
// date and time substitude
// clock time
timeval tv;
if (gettimeofday(&tv, NULL) == -1) {
return ERROR_SYSTEM_TIME;
return flv_path;
}
// to calendar time
struct tm* tm;
if ((tm = localtime(&tv.tv_sec)) == NULL) {
return ERROR_SYSTEM_TIME;
return flv_path;
}
// the buffer to format the date and time.
... ... @@ -175,316 +327,212 @@ int SrsDvrPlan::open_new_segment()
// [2006], replace with current year.
if (true) {
snprintf(buf, sizeof(buf), "%d", 1900 + tm->tm_year);
path = srs_string_replace(path, "[2006]", buf);
flv_path = srs_string_replace(flv_path, "[2006]", buf);
}
// [2006], replace with current year.
if (true) {
snprintf(buf, sizeof(buf), "%d", 1900 + tm->tm_year);
path = srs_string_replace(path, "[2006]", buf);
flv_path = srs_string_replace(flv_path, "[2006]", buf);
}
// [01], replace this const to current month.
if (true) {
snprintf(buf, sizeof(buf), "%d", 1 + tm->tm_mon);
path = srs_string_replace(path, "[01]", buf);
flv_path = srs_string_replace(flv_path, "[01]", buf);
}
// [02], replace this const to current date.
if (true) {
snprintf(buf, sizeof(buf), "%d", tm->tm_mday);
path = srs_string_replace(path, "[02]", buf);
flv_path = srs_string_replace(flv_path, "[02]", buf);
}
// [15], replace this const to current hour.
if (true) {
snprintf(buf, sizeof(buf), "%d", tm->tm_hour);
path = srs_string_replace(path, "[15]", buf);
flv_path = srs_string_replace(flv_path, "[15]", buf);
}
// [04], repleace this const to current minute.
if (true) {
snprintf(buf, sizeof(buf), "%d", tm->tm_min);
path = srs_string_replace(path, "[04]", buf);
flv_path = srs_string_replace(flv_path, "[04]", buf);
}
// [05], repleace this const to current second.
if (true) {
snprintf(buf, sizeof(buf), "%d", tm->tm_sec);
path = srs_string_replace(path, "[05]", buf);
flv_path = srs_string_replace(flv_path, "[05]", buf);
}
// [999], repleace this const to current millisecond.
if (true) {
snprintf(buf, sizeof(buf), "%03d", (int)(tv.tv_usec / 1000));
path = srs_string_replace(path, "[999]", buf);
flv_path = srs_string_replace(flv_path, "[999]", buf);
}
// [timestamp],replace this const to current UNIX timestamp in ms.
if (true) {
int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec;
snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000);
path = srs_string_replace(path, "[timestamp]", buf);
}
// create dir first.
std::string dir = path.substr(0, path.rfind("/"));
if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) {
srs_error("create dir=%s failed. ret=%d", dir.c_str(), ret);
return ret;
flv_path = srs_string_replace(flv_path, "[timestamp]", buf);
}
srs_info("create dir=%s ok", dir.c_str());
if ((ret = flv_open(req->get_stream_url(), path)) != ERROR_SUCCESS) {
return ret;
}
dvr_enabled = true;
return ret;
return flv_path;
}
int SrsDvrPlan::on_dvr_request_sh()
int SrsFlvSegment::create_jitter(bool loads_from_flv)
{
int ret = ERROR_SUCCESS;
// the dvr is enabled, notice the source to push the data.
if ((ret = _source->on_dvr_request_sh()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrPlan::on_video_keyframe()
{
int ret = ERROR_SUCCESS;
return ret;
}
// when path exists, use exists jitter.
if (!loads_from_flv) {
// jitter when publish, ensure whole stream start from 0.
srs_freep(jitter);
jitter = new SrsRtmpJitter();
int64_t SrsDvrPlan::filter_timestamp(int64_t timestamp)
{
return timestamp;
}
// fresh stream starting.
starttime = -1;
stream_previous_pkt_time = -1;
stream_starttime = srs_update_system_time_ms();
stream_duration = 0;
int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
// fresh segment starting.
has_keyframe = false;
duration = 0;
if (!dvr_enabled) {
return ret;
}
int size = 0;
char* payload = NULL;
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
// when jitter ok, do nothing.
if (jitter) {
return ret;
}
SrsAutoFree(char, payload);
if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) {
return ret;
}
// always ensure the jitter crote.
// for the first time, initialize jitter from exists file.
jitter = new SrsRtmpJitter();
// TODO: FIXME: implements it.
return ret;
}
int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio)
int SrsFlvSegment::on_update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
if (!dvr_enabled) {
return ret;
}
SrsSharedPtrMessage* audio = __audio->copy();
SrsAutoFree(SrsSharedPtrMessage, audio);
// we must assumpt that the stream timestamp is monotonically increase,
// that is, always use time jitter to correct the timestamp.
// except the time jitter is disabled in config.
if ((jitter->correct(audio, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
// set the segment starttime at first time
if (starttime < 0) {
starttime = msg->timestamp;
}
char* payload = audio->payload;
int size = audio->size;
int64_t timestamp = filter_timestamp(audio->timestamp);
if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret;
// no previous packet or timestamp overflow.
if (stream_previous_pkt_time < 0 || stream_previous_pkt_time > msg->timestamp) {
stream_previous_pkt_time = msg->timestamp;
}
if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
return ret;
}
// collect segment and stream duration, timestamp overflow is ok.
duration += msg->timestamp - stream_previous_pkt_time;
stream_duration += msg->timestamp - stream_previous_pkt_time;
// update previous packet time
stream_previous_pkt_time = msg->timestamp;
return ret;
}
int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video)
int SrsFlvSegment::on_reload_vhost_dvr(std::string /*vhost*/)
{
int ret = ERROR_SUCCESS;
if (!dvr_enabled) {
return ret;
}
SrsSharedPtrMessage* video = __video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
char* payload = video->payload;
int size = video->size;
#ifdef SRS_AUTO_HTTP_CALLBACK
bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size)
&& SrsFlvCodec::video_is_keyframe(payload, size)
&& !SrsFlvCodec::video_is_sequence_header(payload, size);
if (is_key_frame) {
segment->has_keyframe = true;
if ((ret = on_video_keyframe()) != ERROR_SUCCESS) {
return ret;
}
}
srs_verbose("dvr video is key: %d", is_key_frame);
#endif
if ((jitter->correct(video, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
// update segment duration, session plan just update the duration,
// the segment plan will reap segment if exceed, this video will write to next segment.
if ((ret = update_duration(video)) != ERROR_SUCCESS) {
return ret;
}
int32_t timestamp = filter_timestamp(video->timestamp);
if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret;
}
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
return ret;
}
int SrsDvrPlan::on_reload_vhost_dvr(std::string /*vhost*/)
SrsDvrPlan::SrsDvrPlan()
{
int ret = ERROR_SUCCESS;
source = NULL;
req = NULL;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(_req->vhost);
dvr_enabled = false;
segment = new SrsFlvSegment(this);
}
return ret;
SrsDvrPlan::~SrsDvrPlan()
{
srs_freep(segment);
}
int SrsDvrPlan::flv_open(string stream, string path)
int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
segment->reset();
if (srs_path_exists(path)) {
// when path exists, always append to it.
// so we must use the target flv path as output flv.
tmp_flv_file = path;
} else {
// when path not exists, dvr to tmp file.
tmp_flv_file = path + ".tmp";
}
source = s;
req = r;
if (srs_path_exists(path)) {
if ((ret = fs->open_append(tmp_flv_file)) != ERROR_SUCCESS) {
srs_error("append file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
srs_trace("dvr: always append to when exists, file=%s.", path.c_str());
} else {
if ((ret = fs->open(tmp_flv_file)) != ERROR_SUCCESS) {
srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
if ((ret = segment->initialize(s, r)) != ERROR_SUCCESS) {
return ret;
}
}
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
}
// when exists, donot write flv header.
if (tmp_flv_file != path) {
if ((ret = write_flv_header()) != ERROR_SUCCESS) {
int SrsDvrPlan::on_dvr_request_sh()
{
int ret = ERROR_SUCCESS;
// the dvr is enabled, notice the source to push the data.
if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) {
return ret;
}
}
segment->path = path;
srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str());
return ret;
}
int SrsDvrPlan::flv_close()
int SrsDvrPlan::on_video_keyframe()
{
int ret = ERROR_SUCCESS;
fs->close();
return ERROR_SUCCESS;
}
// when tmp flv file exists, reap it.
if (tmp_flv_file != segment->path) {
if (rename(tmp_flv_file.c_str(), segment->path.c_str()) < 0) {
ret = ERROR_SYSTEM_FILE_RENAME;
srs_error("rename flv file failed, %s => %s. ret=%d",
tmp_flv_file.c_str(), segment->path.c_str(), ret);
return ret;
}
}
int64_t SrsDvrPlan::filter_timestamp(int64_t timestamp)
{
return timestamp;
}
#ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = _req;
if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
// HTTP: on_dvr
SrsConfDirective* on_dvr = _srs_config->get_vhost_on_dvr(req->vhost);
if (!on_dvr) {
srs_info("ignore the empty http callback: on_dvr");
return ret;
}
int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
int connection_id = _srs_context->get_id();
std::string ip = req->ip;
std::string cwd = _srs_config->cwd();
std::string file = segment->path;
for (int i = 0; i < (int)on_dvr->args.size(); i++) {
std::string url = on_dvr->args.at(i);
if ((ret = SrsHttpHooks::on_dvr(url, connection_id, ip, req, cwd, file)) != ERROR_SUCCESS) {
srs_error("hook client on_dvr failed. url=%s, ret=%d", url.c_str(), ret);
if (!dvr_enabled) {
return ret;
}
}
}
#endif
return ret;
return segment->write_metadata(metadata);
}
int SrsDvrPlan::update_duration(SrsSharedPtrMessage* msg)
int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio)
{
int ret = ERROR_SUCCESS;
// we must assumpt that the stream timestamp is monotonically increase,
// that is, always use time jitter to correct the timestamp.
// set the segment starttime at first time
if (segment->starttime < 0) {
segment->starttime = msg->timestamp;
if (!dvr_enabled) {
return ret;
}
// no previous packet or timestamp overflow.
if (segment->stream_previous_pkt_time < 0 || segment->stream_previous_pkt_time > msg->timestamp) {
segment->stream_previous_pkt_time = msg->timestamp;
if ((ret = segment->write_audio(__audio)) != ERROR_SUCCESS) {
return ret;
}
// collect segment and stream duration, timestamp overflow is ok.
segment->duration += msg->timestamp - segment->stream_previous_pkt_time;
segment->stream_duration += msg->timestamp - segment->stream_previous_pkt_time;
// update previous packet time
segment->stream_previous_pkt_time = msg->timestamp;
return ret;
}
int SrsDvrPlan::write_flv_header()
int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video)
{
int ret = ERROR_SUCCESS;
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header failed. ret=%d", ret);
if (!dvr_enabled) {
return ret;
}
if ((ret = segment->write_video(__video)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -511,6 +559,32 @@ SrsDvrSessionPlan::~SrsDvrSessionPlan()
{
}
int SrsDvrSessionPlan::on_publish()
{
int ret = ERROR_SUCCESS;
// support multiple publish.
if (dvr_enabled) {
return ret;
}
if (!_srs_config->get_dvr_enabled(req->vhost)) {
return ret;
}
if ((ret = segment->close()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = segment->open()) != ERROR_SUCCESS) {
return ret;
}
dvr_enabled = true;
return ret;
}
void SrsDvrSessionPlan::on_unpublish()
{
// support multiple publish.
... ... @@ -519,7 +593,7 @@ void SrsDvrSessionPlan::on_unpublish()
}
// ignore error.
int ret = flv_close();
int ret = segment->close();
if (ret != ERROR_SUCCESS) {
srs_warn("ignore flv close error. ret=%d", ret);
}
... ... @@ -558,65 +632,86 @@ int SrsDvrSegmentPlan::on_publish()
{
int ret = ERROR_SUCCESS;
// if already opened, continue to dvr.
// the segment plan maybe keep running longer than the encoder.
// for example, segment running, encoder restart,
// the segment plan will just continue going and donot open new segment.
if (fs->is_open()) {
dvr_enabled = true;
// support multiple publish.
if (dvr_enabled) {
return ret;
}
if (!_srs_config->get_dvr_enabled(req->vhost)) {
return ret;
}
if ((ret = segment->close()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = segment->open()) != ERROR_SUCCESS) {
return ret;
}
return SrsDvrPlan::on_publish();
dvr_enabled = true;
return ret;
}
void SrsDvrSegmentPlan::on_unpublish()
{
// support multiple publish.
if (!dvr_enabled) {
return;
}
dvr_enabled = false;
}
int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::audio_is_sequence_header(audio->payload, audio->size)) {
srs_freep(sh_audio);
sh_audio = audio->copy();
}
return SrsDvrPlan::on_audio(audio);
if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video)
{
int ret = ERROR_SUCCESS;
if (SrsFlvCodec::video_is_sequence_header(video->payload, video->size)) {
srs_freep(sh_video);
sh_video = video->copy();
}
return SrsDvrPlan::on_video(video);
if ((ret = update_duration(video)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsDvrPlan::update_duration(msg)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(segment);
// ignore if duration ok.
if (segment_duration <= 0 || segment->duration < segment_duration) {
if (segment_duration <= 0 || !segment->is_overflow(segment_duration)) {
return ret;
}
// when wait keyframe, ignore if no frame arrived.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/177
if (_srs_config->get_dvr_wait_keyframe(_req->vhost)) {
if (_srs_config->get_dvr_wait_keyframe(req->vhost)) {
if (!msg->is_video()) {
return ret;
}
... ... @@ -632,14 +727,12 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
}
// reap segment
if ((ret = flv_close()) != ERROR_SUCCESS) {
segment->reset();
if ((ret = segment->close()) != ERROR_SUCCESS) {
return ret;
}
on_unpublish();
// open new flv file
if ((ret = open_new_segment()) != ERROR_SUCCESS) {
if ((ret = segment->open()) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -654,9 +747,9 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
return ret;
}
SrsDvr::SrsDvr(SrsSource* source)
SrsDvr::SrsDvr(SrsSource* s)
{
_source = source;
source = s;
plan = NULL;
}
... ... @@ -665,21 +758,21 @@ SrsDvr::~SrsDvr()
srs_freep(plan);
}
int SrsDvr::initialize(SrsRequest* req)
int SrsDvr::initialize(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
srs_freep(plan);
plan = SrsDvrPlan::create_plan(req->vhost);
plan = SrsDvrPlan::create_plan(r->vhost);
if ((ret = plan->initialize(_source, req)) != ERROR_SUCCESS) {
if ((ret = plan->initialize(source, r)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvr::on_publish(SrsRequest* /*req*/)
int SrsDvr::on_publish(SrsRequest* /*r*/)
{
int ret = ERROR_SUCCESS;
... ... @@ -695,11 +788,11 @@ void SrsDvr::on_unpublish()
plan->on_unpublish();
}
int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata)
int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m)
{
int ret = ERROR_SUCCESS;
if ((ret = plan->on_meta_data(metadata)) != ERROR_SUCCESS) {
if ((ret = plan->on_meta_data(m)) != ERROR_SUCCESS) {
return ret;
}
... ...
... ... @@ -41,16 +41,34 @@ class SrsOnMetaDataPacket;
class SrsSharedPtrMessage;
class SrsFileWriter;
class SrsFlvEncoder;
class SrsDvrPlan;
#include <srs_app_source.hpp>
#include <srs_app_reload.hpp>
/**
* a piece of flv segment.
* when open segment, support start at 0 or not.
*/
class SrsFlvSegment
class SrsFlvSegment : public ISrsReloadHandler
{
public:
private:
SrsSource* source;
SrsRequest* req;
SrsDvrPlan* plan;
private:
/**
* the underlayer dvr stream.
* if close, the flv is reap and closed.
* if open, new flv file is crote.
*/
SrsFlvEncoder* enc;
SrsRtmpJitter* jitter;
SrsRtmpJitterAlgorithm jitter_algorithm;
SrsFileWriter* fs;
private:
std::string tmp_flv_file;
private:
/**
* current segment flv file path.
*/
... ... @@ -81,10 +99,56 @@ public:
*/
int64_t stream_previous_pkt_time;
public:
SrsFlvSegment();
SrsFlvSegment(SrsDvrPlan* p);
virtual ~SrsFlvSegment();
public:
virtual void reset();
/**
* initialize the segment.
*/
virtual int initialize(SrsSource* s, SrsRequest* r);
/**
* whether segment is overflow.
*/
virtual bool is_overflow(int64_t max_duration);
/**
* open new segment file, timestamp start at 0 for fresh flv file.
* @remark ignore when already open.
*/
virtual int open();
/**
* close current segment.
* @remark ignore when already closed.
*/
virtual int close();
/**
* write the metadata to segment.
*/
virtual int write_metadata(SrsOnMetaDataPacket* metadata);
/**
* @param __audio, directly ptr, copy it if need to save it.
*/
virtual int write_audio(SrsSharedPtrMessage* __audio);
/**
* @param __video, directly ptr, copy it if need to save it.
*/
virtual int write_video(SrsSharedPtrMessage* __video);
private:
/**
* generate the flv segment path.
*/
virtual std::string generate_path();
/**
* create flv jitter. load jitter when flv exists.
* @param loads_from_flv whether loads the jitter from exists flv file.
*/
virtual int create_jitter(bool loads_from_flv);
/**
* when update the duration of segment by rtmp msg.
*/
virtual int on_update_duration(SrsSharedPtrMessage* msg);
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_dvr(std::string vhost);
};
/**
... ... @@ -94,32 +158,25 @@ public:
* 2. reap flv: when to reap the flv and start new piece.
*/
// TODO: FIXME: the plan is too fat, refine me.
class SrsDvrPlan : public ISrsReloadHandler
class SrsDvrPlan
{
private:
/**
* the underlayer dvr stream.
* if close, the flv is reap and closed.
* if open, new flv file is crote.
*/
SrsFlvEncoder* enc;
SrsSource* _source;
SrsRtmpJitter* jitter;
SrsRtmpJitterAlgorithm jitter_algorithm;
public:
friend class SrsFlvSegment;
protected:
SrsSource* source;
SrsRequest* req;
SrsFlvSegment* segment;
SrsRequest* _req;
bool dvr_enabled;
SrsFileWriter* fs;
private:
std::string tmp_flv_file;
public:
SrsDvrPlan();
virtual ~SrsDvrPlan();
public:
virtual int initialize(SrsSource* source, SrsRequest* req);
virtual int on_publish();
virtual int initialize(SrsSource* s, SrsRequest* r);
virtual int on_publish() = 0;
virtual void on_unpublish() = 0;
/**
* when got metadata.
*/
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
/**
* @param __audio, directly ptr, copy it if need to save it.
... ... @@ -129,15 +186,7 @@ public:
* @param __video, directly ptr, copy it if need to save it.
*/
virtual int on_video(SrsSharedPtrMessage* __video);
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_dvr(std::string vhost);
protected:
virtual int flv_open(std::string stream, std::string path);
virtual int flv_close();
virtual int open_new_segment();
virtual int update_duration(SrsSharedPtrMessage* msg);
virtual int write_flv_header();
virtual int on_dvr_request_sh();
virtual int on_video_keyframe();
virtual int64_t filter_timestamp(int64_t timestamp);
... ... @@ -154,6 +203,7 @@ public:
SrsDvrSessionPlan();
virtual ~SrsDvrSessionPlan();
public:
virtual int on_publish();
virtual void on_unpublish();
};
... ... @@ -193,11 +243,11 @@ private:
class SrsDvr
{
private:
SrsSource* _source;
SrsSource* source;
private:
SrsDvrPlan* plan;
public:
SrsDvr(SrsSource* source);
SrsDvr(SrsSource* s);
virtual ~SrsDvr();
public:
/**
... ... @@ -205,12 +255,12 @@ public:
* when system initialize(encoder publish at first time, or reload),
* initialize the dvr will reinitialize the plan, the whole dvr framework.
*/
virtual int initialize(SrsRequest* req);
virtual int initialize(SrsRequest* r);
/**
* publish stream event,
* when encoder start to publish RTMP stream.
*/
virtual int on_publish(SrsRequest* req);
virtual int on_publish(SrsRequest* r);
/**
* the unpublish event.,
* when encoder stop(unpublish) to publish RTMP stream.
... ... @@ -219,7 +269,7 @@ public:
/**
* get some information from metadata, it's optinal.
*/
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_meta_data(SrsOnMetaDataPacket* m);
/**
* mux the audio packets to dvr.
* @param __audio, directly ptr, copy it if need to save it.
... ...
... ... @@ -106,7 +106,8 @@ int SrsGoApiV1::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
<< __SRS_JFIELD_STR("authors", "the primary authors and contributors") << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("requests", "the request itself, for http debug") << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("vhosts", "dumps vhost to json") << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("streams", "dumps streams to json")
<< __SRS_JFIELD_STR("streams", "dumps streams to json") << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("dvrs", "query or control the dvr plan")
<< __SRS_JOBJECT_END
<< __SRS_JOBJECT_END;
... ...