winlin

update dvr, support segment plan

@@ -108,10 +108,19 @@ vhost dvr.srs.com { @@ -108,10 +108,19 @@ vhost dvr.srs.com {
108 # http://127.0.0.1/live/livestream.m3u8 108 # http://127.0.0.1/live/livestream.m3u8
109 # where dvr_path is /dvr, srs will create the following files: 109 # where dvr_path is /dvr, srs will create the following files:
110 # /dvr/live the app dir for all streams. 110 # /dvr/live the app dir for all streams.
111 - # /dvr/live/livestream.flv the dvr flv file. 111 + # /dvr/live/livestream.{time}.flv the dvr flv file.
  112 + # @remark, the time use system timestamp in ms, user can use http callback to rename it.
112 # in a word, the dvr_path is for vhost. 113 # in a word, the dvr_path is for vhost.
113 # default: ./objs/nginx/html 114 # default: ./objs/nginx/html
114 dvr_path ./objs/nginx/html; 115 dvr_path ./objs/nginx/html;
  116 + # the dvr plan. canbe:
  117 + # session reap flv when session end(unpublish).
  118 + # segment reap flv when flv duration exceed the specified duration.
  119 + # default: session
  120 + dvr_plan session;
  121 + # the param for plan(segment), in seconds.
  122 + # default: 30
  123 + dvr_duration 30;
115 } 124 }
116 } 125 }
117 126
@@ -2354,6 +2354,40 @@ string SrsConfig::get_dvr_path(string vhost) @@ -2354,6 +2354,40 @@ string SrsConfig::get_dvr_path(string vhost)
2354 return conf->arg0(); 2354 return conf->arg0();
2355 } 2355 }
2356 2356
  2357 +string SrsConfig::get_dvr_plan(string vhost)
  2358 +{
  2359 + SrsConfDirective* dvr = get_dvr(vhost);
  2360 +
  2361 + if (!dvr) {
  2362 + return SRS_CONF_DEFAULT_DVR_PLAN;
  2363 + }
  2364 +
  2365 + SrsConfDirective* conf = dvr->get("dvr_plan");
  2366 +
  2367 + if (!conf) {
  2368 + return SRS_CONF_DEFAULT_DVR_PLAN;
  2369 + }
  2370 +
  2371 + return conf->arg0();
  2372 +}
  2373 +
  2374 +int SrsConfig::get_dvr_duration(string vhost)
  2375 +{
  2376 + SrsConfDirective* dvr = get_dvr(vhost);
  2377 +
  2378 + if (!dvr) {
  2379 + return SRS_CONF_DEFAULT_DVR_DURATION;
  2380 + }
  2381 +
  2382 + SrsConfDirective* conf = dvr->get("dvr_duration");
  2383 +
  2384 + if (!conf) {
  2385 + return SRS_CONF_DEFAULT_DVR_DURATION;
  2386 + }
  2387 +
  2388 + return ::atoi(conf->arg0().c_str());
  2389 +}
  2390 +
2357 SrsConfDirective* SrsConfig::get_http_api() 2391 SrsConfDirective* SrsConfig::get_http_api()
2358 { 2392 {
2359 return root->get("http_api"); 2393 return root->get("http_api");
@@ -45,6 +45,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -45,6 +45,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
45 #define SRS_CONF_DEFAULT_HLS_FRAGMENT 10 45 #define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
46 #define SRS_CONF_DEFAULT_HLS_WINDOW 60 46 #define SRS_CONF_DEFAULT_HLS_WINDOW 60
47 #define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html" 47 #define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html"
  48 +#define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session"
  49 +#define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment"
  50 +#define SRS_CONF_DEFAULT_DVR_PLAN SRS_CONF_DEFAULT_DVR_PLAN_SESSION
  51 +#define SRS_CONF_DEFAULT_DVR_DURATION 30
48 // in ms, for HLS aac sync time. 52 // in ms, for HLS aac sync time.
49 #define SRS_CONF_DEFAULT_AAC_SYNC 100 53 #define SRS_CONF_DEFAULT_AAC_SYNC 100
50 // in ms, for HLS aac flush the audio 54 // in ms, for HLS aac flush the audio
@@ -229,6 +233,8 @@ private: @@ -229,6 +233,8 @@ private:
229 public: 233 public:
230 virtual bool get_dvr_enabled(std::string vhost); 234 virtual bool get_dvr_enabled(std::string vhost);
231 virtual std::string get_dvr_path(std::string vhost); 235 virtual std::string get_dvr_path(std::string vhost);
  236 + virtual std::string get_dvr_plan(std::string vhost);
  237 + virtual int get_dvr_duration(std::string vhost);
232 // http api section 238 // http api section
233 private: 239 private:
234 virtual SrsConfDirective* get_http_api(); 240 virtual SrsConfDirective* get_http_api();
@@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #ifdef SRS_AUTO_DVR 26 #ifdef SRS_AUTO_DVR
27 27
28 #include <fcntl.h> 28 #include <fcntl.h>
  29 +#include <sstream>
29 using namespace std; 30 using namespace std;
30 31
31 #include <srs_app_config.hpp> 32 #include <srs_app_config.hpp>
@@ -35,6 +36,7 @@ using namespace std; @@ -35,6 +36,7 @@ using namespace std;
35 #include <srs_app_source.hpp> 36 #include <srs_app_source.hpp>
36 #include <srs_core_autofree.hpp> 37 #include <srs_core_autofree.hpp>
37 #include <srs_kernel_stream.hpp> 38 #include <srs_kernel_stream.hpp>
  39 +#include <srs_kernel_utility.hpp>
38 40
39 SrsFileStream::SrsFileStream() 41 SrsFileStream::SrsFileStream()
40 { 42 {
@@ -302,7 +304,7 @@ SrsDvrPlan::~SrsDvrPlan() @@ -302,7 +304,7 @@ SrsDvrPlan::~SrsDvrPlan()
302 srs_freep(enc); 304 srs_freep(enc);
303 } 305 }
304 306
305 -int SrsDvrPlan::initialize(SrsSource* source) 307 +int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* /*req*/)
306 { 308 {
307 int ret = ERROR_SUCCESS; 309 int ret = ERROR_SUCCESS;
308 310
@@ -401,9 +403,16 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) @@ -401,9 +403,16 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
401 return ret; 403 return ret;
402 } 404 }
403 405
404 -SrsDvrPlan* SrsDvrPlan::create_plan() 406 +SrsDvrPlan* SrsDvrPlan::create_plan(string vhost)
405 { 407 {
406 - return new SrsDvrSessionPlan(); 408 + std::string plan = _srs_config->get_dvr_plan(vhost);
  409 + if (plan == SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT) {
  410 + return new SrsDvrSegmentPlan();
  411 + } else if (plan == SRS_CONF_DEFAULT_DVR_PLAN_SESSION) {
  412 + return new SrsDvrSessionPlan();
  413 + } else {
  414 + return new SrsDvrSessionPlan();
  415 + }
407 } 416 }
408 417
409 SrsDvrSessionPlan::SrsDvrSessionPlan() 418 SrsDvrSessionPlan::SrsDvrSessionPlan()
@@ -427,14 +436,13 @@ int SrsDvrSessionPlan::on_publish(SrsRequest* req) @@ -427,14 +436,13 @@ int SrsDvrSessionPlan::on_publish(SrsRequest* req)
427 return ret; 436 return ret;
428 } 437 }
429 438
430 - std::string path = _srs_config->get_dvr_path(req->vhost);  
431 - path += "/";  
432 - path += req->app;  
433 - path += "/";  
434 - path += req->stream;  
435 - path += ".flv"; 439 + std::stringstream path;
  440 +
  441 + path << _srs_config->get_dvr_path(req->vhost)
  442 + << "/" << req->app << "/"
  443 + << req->stream << "." << srs_get_system_time_ms() << ".flv";
436 444
437 - if ((ret = flv_open(req->get_stream_url(), path)) != ERROR_SUCCESS) { 445 + if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) {
438 return ret; 446 return ret;
439 } 447 }
440 dvr_enabled = true; 448 dvr_enabled = true;
@@ -458,6 +466,132 @@ void SrsDvrSessionPlan::on_unpublish() @@ -458,6 +466,132 @@ void SrsDvrSessionPlan::on_unpublish()
458 dvr_enabled = false; 466 dvr_enabled = false;
459 } 467 }
460 468
  469 +SrsDvrSegmentPlan::SrsDvrSegmentPlan()
  470 +{
  471 + starttime = -1;
  472 + duration = 0;
  473 + segment_duration = -1;
  474 +}
  475 +
  476 +SrsDvrSegmentPlan::~SrsDvrSegmentPlan()
  477 +{
  478 +}
  479 +
  480 +int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req)
  481 +{
  482 + int ret = ERROR_SUCCESS;
  483 +
  484 + if ((ret = SrsDvrPlan::initialize(source, req)) != ERROR_SUCCESS) {
  485 + return ret;
  486 + }
  487 +
  488 + segment_duration = _srs_config->get_dvr_duration(req->vhost);
  489 + // to ms
  490 + segment_duration *= 1000;
  491 +
  492 + return ret;
  493 +}
  494 +
  495 +int SrsDvrSegmentPlan::on_publish(SrsRequest* req)
  496 +{
  497 + int ret = ERROR_SUCCESS;
  498 +
  499 + // support multiple publish.
  500 + if (dvr_enabled) {
  501 + return ret;
  502 + }
  503 +
  504 + if (!_srs_config->get_dvr_enabled(req->vhost)) {
  505 + return ret;
  506 + }
  507 +
  508 + std::stringstream path;
  509 +
  510 + path << _srs_config->get_dvr_path(req->vhost)
  511 + << "/" << req->app << "/"
  512 + << req->stream << "." << srs_get_system_time_ms() << ".flv";
  513 +
  514 + if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) {
  515 + return ret;
  516 + }
  517 + dvr_enabled = true;
  518 +
  519 + return ret;
  520 +}
  521 +
  522 +void SrsDvrSegmentPlan::on_unpublish()
  523 +{
  524 + // support multiple publish.
  525 + if (!dvr_enabled) {
  526 + return;
  527 + }
  528 +
  529 + // ignore error.
  530 + int ret = flv_close();
  531 + if (ret != ERROR_SUCCESS) {
  532 + srs_warn("ignore flv close error. ret=%d", ret);
  533 + }
  534 +
  535 + dvr_enabled = false;
  536 +}
  537 +
  538 +int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio)
  539 +{
  540 + int ret = ERROR_SUCCESS;
  541 +
  542 + if (!dvr_enabled) {
  543 + return ret;
  544 + }
  545 +
  546 + if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
  547 + return ret;
  548 + }
  549 +
  550 + if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) {
  551 + return ret;
  552 + }
  553 +
  554 + return ret;
  555 +}
  556 +
  557 +int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video)
  558 +{
  559 + int ret = ERROR_SUCCESS;
  560 +
  561 + if (!dvr_enabled) {
  562 + return ret;
  563 + }
  564 +
  565 + if ((ret = update_duration(video)) != ERROR_SUCCESS) {
  566 + return ret;
  567 + }
  568 +
  569 + if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) {
  570 + return ret;
  571 + }
  572 +
  573 + return ret;
  574 +}
  575 +
  576 +int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
  577 +{
  578 + int ret = ERROR_SUCCESS;
  579 +
  580 + // foreach msg, collect the duration.
  581 + if (starttime < 0 || starttime > msg->header.timestamp) {
  582 + starttime = msg->header.timestamp;
  583 + }
  584 + duration += msg->header.timestamp - starttime;
  585 + starttime = msg->header.timestamp;
  586 +
  587 + // reap if exceed duration.
  588 + if (duration > 0 && segment_duration > 0 && duration > segment_duration) {
  589 + on_unpublish();
  590 + }
  591 +
  592 + return ret;
  593 +}
  594 +
461 SrsDvr::SrsDvr(SrsSource* source) 595 SrsDvr::SrsDvr(SrsSource* source)
462 { 596 {
463 _source = source; 597 _source = source;
@@ -469,14 +603,14 @@ SrsDvr::~SrsDvr() @@ -469,14 +603,14 @@ SrsDvr::~SrsDvr()
469 srs_freep(plan); 603 srs_freep(plan);
470 } 604 }
471 605
472 -int SrsDvr::initialize() 606 +int SrsDvr::initialize(SrsRequest* req)
473 { 607 {
474 int ret = ERROR_SUCCESS; 608 int ret = ERROR_SUCCESS;
475 609
476 srs_freep(plan); 610 srs_freep(plan);
477 - plan = SrsDvrPlan::create_plan(); 611 + plan = SrsDvrPlan::create_plan(req->vhost);
478 612
479 - if ((ret = plan->initialize(_source)) != ERROR_SUCCESS) { 613 + if ((ret = plan->initialize(_source, req)) != ERROR_SUCCESS) {
480 return ret; 614 return ret;
481 } 615 }
482 616
@@ -127,7 +127,7 @@ public: @@ -127,7 +127,7 @@ public:
127 SrsDvrPlan(); 127 SrsDvrPlan();
128 virtual ~SrsDvrPlan(); 128 virtual ~SrsDvrPlan();
129 public: 129 public:
130 - virtual int initialize(SrsSource* source); 130 + virtual int initialize(SrsSource* source, SrsRequest* req);
131 virtual int on_publish(SrsRequest* req) = 0; 131 virtual int on_publish(SrsRequest* req) = 0;
132 virtual void on_unpublish() = 0; 132 virtual void on_unpublish() = 0;
133 virtual int on_meta_data(SrsOnMetaDataPacket* metadata); 133 virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
@@ -137,14 +137,11 @@ protected: @@ -137,14 +137,11 @@ protected:
137 virtual int flv_open(std::string stream, std::string path); 137 virtual int flv_open(std::string stream, std::string path);
138 virtual int flv_close(); 138 virtual int flv_close();
139 public: 139 public:
140 - static SrsDvrPlan* create_plan(); 140 + static SrsDvrPlan* create_plan(std::string vhost);
141 }; 141 };
142 142
143 /** 143 /**
144 -* default session plan:  
145 -* 1. start dvr when session start(publish).  
146 -* 2. stop dvr when session stop(unpublish).  
147 -* 3. always dvr to file: dvr_path/app/stream.flv 144 +* session plan: reap flv when session complete(unpublish)
148 */ 145 */
149 class SrsDvrSessionPlan : public SrsDvrPlan 146 class SrsDvrSessionPlan : public SrsDvrPlan
150 { 147 {
@@ -157,6 +154,29 @@ public: @@ -157,6 +154,29 @@ public:
157 }; 154 };
158 155
159 /** 156 /**
  157 +* segment plan: reap flv when duration exceed.
  158 +*/
  159 +class SrsDvrSegmentPlan : public SrsDvrPlan
  160 +{
  161 +private:
  162 + int64_t duration;
  163 + int64_t starttime;
  164 + // in config, in ms
  165 + int segment_duration;
  166 +public:
  167 + SrsDvrSegmentPlan();
  168 + virtual ~SrsDvrSegmentPlan();
  169 +public:
  170 + virtual int initialize(SrsSource* source, SrsRequest* req);
  171 + virtual int on_publish(SrsRequest* req);
  172 + virtual void on_unpublish();
  173 + virtual int on_audio(SrsSharedPtrMessage* audio);
  174 + virtual int on_video(SrsSharedPtrMessage* video);
  175 +private:
  176 + virtual int update_duration(SrsSharedPtrMessage* msg);
  177 +};
  178 +
  179 +/**
160 * dvr(digital video recorder) to record RTMP stream to flv file. 180 * dvr(digital video recorder) to record RTMP stream to flv file.
161 * TODO: FIXME: add utest for it. 181 * TODO: FIXME: add utest for it.
162 */ 182 */
@@ -175,7 +195,7 @@ public: @@ -175,7 +195,7 @@ public:
175 * when system initialize(encoder publish at first time, or reload), 195 * when system initialize(encoder publish at first time, or reload),
176 * initialize the dvr will reinitialize the plan, the whole dvr framework. 196 * initialize the dvr will reinitialize the plan, the whole dvr framework.
177 */ 197 */
178 - virtual int initialize(); 198 + virtual int initialize(SrsRequest* req);
179 /** 199 /**
180 * publish stream event, 200 * publish stream event,
181 * when encoder start to publish RTMP stream. 201 * when encoder start to publish RTMP stream.
@@ -507,7 +507,7 @@ int SrsSource::initialize() @@ -507,7 +507,7 @@ int SrsSource::initialize()
507 int ret = ERROR_SUCCESS; 507 int ret = ERROR_SUCCESS;
508 508
509 #ifdef SRS_AUTO_DVR 509 #ifdef SRS_AUTO_DVR
510 - if ((ret = dvr->initialize()) != ERROR_SUCCESS) { 510 + if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) {
511 return ret; 511 return ret;
512 } 512 }
513 #endif 513 #endif
@@ -641,7 +641,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost) @@ -641,7 +641,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
641 dvr->on_unpublish(); 641 dvr->on_unpublish();
642 642
643 // reinitialize the dvr, update plan. 643 // reinitialize the dvr, update plan.
644 - if ((ret = dvr->initialize()) != ERROR_SUCCESS) { 644 + if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) {
645 return ret; 645 return ret;
646 } 646 }
647 647
@@ -31,6 +31,10 @@ static int64_t _srs_system_time_us_cache = 0; @@ -31,6 +31,10 @@ static int64_t _srs_system_time_us_cache = 0;
31 31
32 int64_t srs_get_system_time_ms() 32 int64_t srs_get_system_time_ms()
33 { 33 {
  34 + if (_srs_system_time_us_cache <= 0) {
  35 + srs_update_system_time_ms();
  36 + }
  37 +
34 return _srs_system_time_us_cache / 1000; 38 return _srs_system_time_us_cache / 1000;
35 } 39 }
36 40