winlin

dvr support plan and default session plan

@@ -288,39 +288,32 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s @@ -288,39 +288,32 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s
288 return ret; 288 return ret;
289 } 289 }
290 290
291 -SrsDvr::SrsDvr(SrsSource* source) 291 +SrsDvrPlan::SrsDvrPlan()
292 { 292 {
293 - _source = source; 293 + _source = NULL;
294 dvr_enabled = false; 294 dvr_enabled = false;
295 fs = new SrsFileStream(); 295 fs = new SrsFileStream();
296 enc = new SrsFlvEncoder(); 296 enc = new SrsFlvEncoder();
297 } 297 }
298 298
299 -SrsDvr::~SrsDvr() 299 +SrsDvrPlan::~SrsDvrPlan()
300 { 300 {
301 srs_freep(fs); 301 srs_freep(fs);
302 srs_freep(enc); 302 srs_freep(enc);
303 } 303 }
304 304
305 -int SrsDvr::on_publish(SrsRequest* req) 305 +int SrsDvrPlan::initialize(SrsSource* source)
306 { 306 {
307 int ret = ERROR_SUCCESS; 307 int ret = ERROR_SUCCESS;
308 308
309 - // support multiple publish.  
310 - if (dvr_enabled) {  
311 - return ret;  
312 - } 309 + _source = source;
313 310
314 - if (!_srs_config->get_dvr_enabled(req->vhost)) {  
315 return ret; 311 return ret;
316 - } 312 +}
317 313
318 - std::string path = _srs_config->get_dvr_path(req->vhost);  
319 - path += "/";  
320 - path += req->app;  
321 - path += "/";  
322 - path += req->stream;  
323 - path += ".flv"; 314 +int SrsDvrPlan::flv_open(string stream, string path)
  315 +{
  316 + int ret = ERROR_SUCCESS;
324 317
325 if ((ret = fs->open(path)) != ERROR_SUCCESS) { 318 if ((ret = fs->open(path)) != ERROR_SUCCESS) {
326 srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret); 319 srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
@@ -341,26 +334,16 @@ int SrsDvr::on_publish(SrsRequest* req) @@ -341,26 +334,16 @@ int SrsDvr::on_publish(SrsRequest* req)
341 return ret; 334 return ret;
342 } 335 }
343 336
344 - srs_trace("dvr stream %s to file %s", req->get_stream_url().c_str(), path.c_str());  
345 - dvr_enabled = true;  
346 - 337 + srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str());
347 return ret; 338 return ret;
348 } 339 }
349 340
350 -void SrsDvr::on_unpublish() 341 +int SrsDvrPlan::flv_close()
351 { 342 {
352 - // support multiple publish.  
353 - if (!dvr_enabled) {  
354 - return;  
355 - }  
356 -  
357 - // ignore error.  
358 - fs->close();  
359 -  
360 - dvr_enabled = false; 343 + return fs->close();
361 } 344 }
362 345
363 -int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) 346 +int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata)
364 { 347 {
365 int ret = ERROR_SUCCESS; 348 int ret = ERROR_SUCCESS;
366 349
@@ -382,12 +365,10 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) @@ -382,12 +365,10 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata)
382 return ret; 365 return ret;
383 } 366 }
384 367
385 -int SrsDvr::on_audio(SrsSharedPtrMessage* audio) 368 +int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio)
386 { 369 {
387 int ret = ERROR_SUCCESS; 370 int ret = ERROR_SUCCESS;
388 371
389 - SrsAutoFree(SrsSharedPtrMessage, audio, false);  
390 -  
391 if (!dvr_enabled) { 372 if (!dvr_enabled) {
392 return ret; 373 return ret;
393 } 374 }
@@ -402,12 +383,10 @@ int SrsDvr::on_audio(SrsSharedPtrMessage* audio) @@ -402,12 +383,10 @@ int SrsDvr::on_audio(SrsSharedPtrMessage* audio)
402 return ret; 383 return ret;
403 } 384 }
404 385
405 -int SrsDvr::on_video(SrsSharedPtrMessage* video) 386 +int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
406 { 387 {
407 int ret = ERROR_SUCCESS; 388 int ret = ERROR_SUCCESS;
408 389
409 - SrsAutoFree(SrsSharedPtrMessage, video, false);  
410 -  
411 if (!dvr_enabled) { 390 if (!dvr_enabled) {
412 return ret; 391 return ret;
413 } 392 }
@@ -422,5 +401,140 @@ int SrsDvr::on_video(SrsSharedPtrMessage* video) @@ -422,5 +401,140 @@ int SrsDvr::on_video(SrsSharedPtrMessage* video)
422 return ret; 401 return ret;
423 } 402 }
424 403
  404 +SrsDvrPlan* SrsDvrPlan::create_plan()
  405 +{
  406 + return new SrsDvrSessionPlan();
  407 +}
  408 +
  409 +SrsDvrSessionPlan::SrsDvrSessionPlan()
  410 +{
  411 +}
  412 +
  413 +SrsDvrSessionPlan::~SrsDvrSessionPlan()
  414 +{
  415 +}
  416 +
  417 +int SrsDvrSessionPlan::on_publish(SrsRequest* req)
  418 +{
  419 + int ret = ERROR_SUCCESS;
  420 +
  421 + // support multiple publish.
  422 + if (dvr_enabled) {
  423 + return ret;
  424 + }
  425 +
  426 + if (!_srs_config->get_dvr_enabled(req->vhost)) {
  427 + return ret;
  428 + }
  429 +
  430 + std::string path = _srs_config->get_dvr_path(req->vhost);
  431 + path += "/";
  432 + path += req->app;
  433 + path += "/";
  434 + path += req->stream;
  435 + path += ".flv";
  436 +
  437 + if ((ret = flv_open(req->get_stream_url(), path)) != ERROR_SUCCESS) {
  438 + return ret;
  439 + }
  440 + dvr_enabled = true;
  441 +
  442 + return ret;
  443 +}
  444 +
  445 +void SrsDvrSessionPlan::on_unpublish()
  446 +{
  447 + // support multiple publish.
  448 + if (!dvr_enabled) {
  449 + return;
  450 + }
  451 +
  452 + // ignore error.
  453 + int ret = flv_close();
  454 + if (ret != ERROR_SUCCESS) {
  455 + srs_warn("ignore flv close error. ret=%d", ret);
  456 + }
  457 +
  458 + dvr_enabled = false;
  459 +}
  460 +
  461 +SrsDvr::SrsDvr(SrsSource* source)
  462 +{
  463 + _source = source;
  464 + plan = NULL;
  465 +}
  466 +
  467 +SrsDvr::~SrsDvr()
  468 +{
  469 + srs_freep(plan);
  470 +}
  471 +
  472 +int SrsDvr::initialize()
  473 +{
  474 + int ret = ERROR_SUCCESS;
  475 +
  476 + srs_freep(plan);
  477 + plan = SrsDvrPlan::create_plan();
  478 +
  479 + if ((ret = plan->initialize(_source)) != ERROR_SUCCESS) {
  480 + return ret;
  481 + }
  482 +
  483 + return ret;
  484 +}
  485 +
  486 +int SrsDvr::on_publish(SrsRequest* req)
  487 +{
  488 + int ret = ERROR_SUCCESS;
  489 +
  490 + if ((ret = plan->on_publish(req)) != ERROR_SUCCESS) {
  491 + return ret;
  492 + }
  493 +
  494 + return ret;
  495 +}
  496 +
  497 +void SrsDvr::on_unpublish()
  498 +{
  499 + plan->on_unpublish();
  500 +}
  501 +
  502 +int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata)
  503 +{
  504 + int ret = ERROR_SUCCESS;
  505 +
  506 + if ((ret = plan->on_meta_data(metadata)) != ERROR_SUCCESS) {
  507 + return ret;
  508 + }
  509 +
  510 + return ret;
  511 +}
  512 +
  513 +int SrsDvr::on_audio(SrsSharedPtrMessage* audio)
  514 +{
  515 + int ret = ERROR_SUCCESS;
  516 +
  517 + SrsAutoFree(SrsSharedPtrMessage, audio, false);
  518 +
  519 + if ((ret = plan->on_audio(audio)) != ERROR_SUCCESS) {
  520 + return ret;
  521 + }
  522 +
  523 + return ret;
  524 +}
  525 +
  526 +int SrsDvr::on_video(SrsSharedPtrMessage* video)
  527 +{
  528 + int ret = ERROR_SUCCESS;
  529 +
  530 + SrsAutoFree(SrsSharedPtrMessage, video, false);
  531 +
  532 + if ((ret = plan->on_video(video)) != ERROR_SUCCESS) {
  533 + return ret;
  534 + }
  535 +
  536 + return ret;
  537 +}
  538 +
425 #endif 539 #endif
426 540
@@ -106,6 +106,57 @@ private: @@ -106,6 +106,57 @@ private:
106 }; 106 };
107 107
108 /** 108 /**
  109 +* the plan for dvr.
  110 +* use to control the following dvr params:
  111 +* 1. filename: the filename for record file.
  112 +* 2. reap flv: when to reap the flv and start new piece.
  113 +*/
  114 +class SrsDvrPlan
  115 +{
  116 +protected:
  117 + /**
  118 + * the underlayer dvr stream.
  119 + * if close, the flv is reap and closed.
  120 + * if open, new flv file is crote.
  121 + */
  122 + SrsFileStream* fs;
  123 + SrsFlvEncoder* enc;
  124 + bool dvr_enabled;
  125 + SrsSource* _source;
  126 +public:
  127 + SrsDvrPlan();
  128 + virtual ~SrsDvrPlan();
  129 +public:
  130 + virtual int initialize(SrsSource* source);
  131 + virtual int on_publish(SrsRequest* req) = 0;
  132 + virtual void on_unpublish() = 0;
  133 + virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
  134 + virtual int on_audio(SrsSharedPtrMessage* audio);
  135 + virtual int on_video(SrsSharedPtrMessage* video);
  136 +protected:
  137 + virtual int flv_open(std::string stream, std::string path);
  138 + virtual int flv_close();
  139 +public:
  140 + static SrsDvrPlan* create_plan();
  141 +};
  142 +
  143 +/**
  144 +* default session plan:
  145 +* 1. start dvr when session start(publish).
  146 +* 2. stop dvr when session stop(unpublish).
  147 +* 3. always dvr to file: dvr_path/app/stream.flv
  148 +*/
  149 +class SrsDvrSessionPlan : public SrsDvrPlan
  150 +{
  151 +public:
  152 + SrsDvrSessionPlan();
  153 + virtual ~SrsDvrSessionPlan();
  154 +public:
  155 + virtual int on_publish(SrsRequest* req);
  156 + virtual void on_unpublish();
  157 +};
  158 +
  159 +/**
109 * dvr(digital video recorder) to record RTMP stream to flv file. 160 * dvr(digital video recorder) to record RTMP stream to flv file.
110 * TODO: FIXME: add utest for it. 161 * TODO: FIXME: add utest for it.
111 */ 162 */
@@ -114,14 +165,18 @@ class SrsDvr @@ -114,14 +165,18 @@ class SrsDvr
114 private: 165 private:
115 SrsSource* _source; 166 SrsSource* _source;
116 private: 167 private:
117 - bool dvr_enabled;  
118 - SrsFileStream* fs;  
119 - SrsFlvEncoder* enc; 168 + SrsDvrPlan* plan;
120 public: 169 public:
121 SrsDvr(SrsSource* source); 170 SrsDvr(SrsSource* source);
122 virtual ~SrsDvr(); 171 virtual ~SrsDvr();
123 public: 172 public:
124 /** 173 /**
  174 + * initialize dvr, create dvr plan.
  175 + * when system initialize(encoder publish at first time, or reload),
  176 + * initialize the dvr will reinitialize the plan, the whole dvr framework.
  177 + */
  178 + virtual int initialize();
  179 + /**
125 * publish stream event, 180 * publish stream event,
126 * when encoder start to publish RTMP stream. 181 * when encoder start to publish RTMP stream.
127 */ 182 */
@@ -276,7 +276,10 @@ int SrsRtmpConn::stream_service_cycle() @@ -276,7 +276,10 @@ int SrsRtmpConn::stream_service_cycle()
276 srs_trace("set chunk_size=%d success", chunk_size); 276 srs_trace("set chunk_size=%d success", chunk_size);
277 277
278 // find a source to serve. 278 // find a source to serve.
279 - SrsSource* source = SrsSource::find(req); 279 + SrsSource* source = NULL;
  280 + if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) {
  281 + return ret;
  282 + }
280 srs_assert(source != NULL); 283 srs_assert(source != NULL);
281 284
282 // check publish available. 285 // check publish available.
@@ -413,17 +413,27 @@ int64_t SrsGopCache::get_start_time() @@ -413,17 +413,27 @@ int64_t SrsGopCache::get_start_time()
413 413
414 std::map<std::string, SrsSource*> SrsSource::pool; 414 std::map<std::string, SrsSource*> SrsSource::pool;
415 415
416 -SrsSource* SrsSource::find(SrsRequest* req) 416 +int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
417 { 417 {
  418 + int ret = ERROR_SUCCESS;
  419 +
418 string stream_url = req->get_stream_url(); 420 string stream_url = req->get_stream_url();
419 string vhost = req->vhost; 421 string vhost = req->vhost;
420 422
421 if (pool.find(stream_url) == pool.end()) { 423 if (pool.find(stream_url) == pool.end()) {
422 - pool[stream_url] = new SrsSource(req); 424 + SrsSource* source = new SrsSource(req);
  425 + if ((ret = source->initialize()) != ERROR_SUCCESS) {
  426 + srs_freep(source);
  427 + return ret;
  428 + }
  429 +
  430 + pool[stream_url] = source;
423 srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); 431 srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
424 } 432 }
425 433
426 - return pool[stream_url]; 434 + *ppsource = pool[stream_url];
  435 +
  436 + return ret;
427 } 437 }
428 438
429 SrsSource::SrsSource(SrsRequest* _req) 439 SrsSource::SrsSource(SrsRequest* _req)
@@ -492,6 +502,19 @@ SrsSource::~SrsSource() @@ -492,6 +502,19 @@ SrsSource::~SrsSource()
492 srs_freep(req); 502 srs_freep(req);
493 } 503 }
494 504
  505 +int SrsSource::initialize()
  506 +{
  507 + int ret = ERROR_SUCCESS;
  508 +
  509 +#ifdef SRS_AUTO_DVR
  510 + if ((ret = dvr->initialize()) != ERROR_SUCCESS) {
  511 + return ret;
  512 + }
  513 +#endif
  514 +
  515 + return ret;
  516 +}
  517 +
495 int SrsSource::on_reload_vhost_atc(string vhost) 518 int SrsSource::on_reload_vhost_atc(string vhost)
496 { 519 {
497 int ret = ERROR_SUCCESS; 520 int ret = ERROR_SUCCESS;
@@ -614,11 +637,20 @@ int SrsSource::on_reload_vhost_dvr(string vhost) @@ -614,11 +637,20 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
614 } 637 }
615 638
616 #ifdef SRS_AUTO_DVR 639 #ifdef SRS_AUTO_DVR
  640 + // cleanup dvr
617 dvr->on_unpublish(); 641 dvr->on_unpublish();
  642 +
  643 + // reinitialize the dvr, update plan.
  644 + if ((ret = dvr->initialize()) != ERROR_SUCCESS) {
  645 + return ret;
  646 + }
  647 +
  648 + // start to publish by new plan.
618 if ((ret = dvr->on_publish(req)) != ERROR_SUCCESS) { 649 if ((ret = dvr->on_publish(req)) != ERROR_SUCCESS) {
619 srs_error("dvr publish failed. ret=%d", ret); 650 srs_error("dvr publish failed. ret=%d", ret);
620 return ret; 651 return ret;
621 } 652 }
  653 +
622 srs_trace("vhost %s dvr reload success", vhost.c_str()); 654 srs_trace("vhost %s dvr reload success", vhost.c_str());
623 #endif 655 #endif
624 656
@@ -213,10 +213,10 @@ public: @@ -213,10 +213,10 @@ public:
213 /** 213 /**
214 * find stream by vhost/app/stream. 214 * find stream by vhost/app/stream.
215 * @param req the client request. 215 * @param req the client request.
216 - * @return the matched source, never be NULL. 216 + * @param ppsource the matched source, if success never be NULL.
217 * @remark stream_url should without port and schema. 217 * @remark stream_url should without port and schema.
218 */ 218 */
219 - static SrsSource* find(SrsRequest* req); 219 + static int find(SrsRequest* req, SrsSource** ppsource);
220 private: 220 private:
221 // deep copy of client request. 221 // deep copy of client request.
222 SrsRequest* req; 222 SrsRequest* req;
@@ -271,6 +271,8 @@ public: @@ -271,6 +271,8 @@ public:
271 */ 271 */
272 SrsSource(SrsRequest* _req); 272 SrsSource(SrsRequest* _req);
273 virtual ~SrsSource(); 273 virtual ~SrsSource();
  274 +public:
  275 + virtual int initialize();
274 // interface ISrsReloadHandler 276 // interface ISrsReloadHandler
275 public: 277 public:
276 virtual int on_reload_vhost_atc(std::string vhost); 278 virtual int on_reload_vhost_atc(std::string vhost);