winlin

dvr support segment plan

@@ -90,6 +90,11 @@ int SrsFileStream::close() @@ -90,6 +90,11 @@ int SrsFileStream::close()
90 return ret; 90 return ret;
91 } 91 }
92 92
  93 +bool SrsFileStream::is_open()
  94 +{
  95 + return fd > 0;
  96 +}
  97 +
93 int SrsFileStream::read(void* buf, size_t count, ssize_t* pnread) 98 int SrsFileStream::read(void* buf, size_t count, ssize_t* pnread)
94 { 99 {
95 int ret = ERROR_SUCCESS; 100 int ret = ERROR_SUCCESS;
@@ -293,6 +298,8 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s @@ -293,6 +298,8 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s
293 SrsDvrPlan::SrsDvrPlan() 298 SrsDvrPlan::SrsDvrPlan()
294 { 299 {
295 _source = NULL; 300 _source = NULL;
  301 + _req = NULL;
  302 + jitter = NULL;
296 dvr_enabled = false; 303 dvr_enabled = false;
297 fs = new SrsFileStream(); 304 fs = new SrsFileStream();
298 enc = new SrsFlvEncoder(); 305 enc = new SrsFlvEncoder();
@@ -300,15 +307,56 @@ SrsDvrPlan::SrsDvrPlan() @@ -300,15 +307,56 @@ SrsDvrPlan::SrsDvrPlan()
300 307
301 SrsDvrPlan::~SrsDvrPlan() 308 SrsDvrPlan::~SrsDvrPlan()
302 { 309 {
  310 + srs_freep(jitter);
303 srs_freep(fs); 311 srs_freep(fs);
304 srs_freep(enc); 312 srs_freep(enc);
305 } 313 }
306 314
307 -int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* /*req*/) 315 +int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* req)
308 { 316 {
309 int ret = ERROR_SUCCESS; 317 int ret = ERROR_SUCCESS;
310 318
311 _source = source; 319 _source = source;
  320 + _req = req;
  321 +
  322 + return ret;
  323 +}
  324 +
  325 +int SrsDvrPlan::on_publish()
  326 +{
  327 + int ret = ERROR_SUCCESS;
  328 +
  329 + // support multiple publish.
  330 + if (dvr_enabled) {
  331 + return ret;
  332 + }
  333 +
  334 + SrsRequest* req = _req;
  335 +
  336 + if (!_srs_config->get_dvr_enabled(req->vhost)) {
  337 + return ret;
  338 + }
  339 +
  340 + // jitter.
  341 + srs_freep(jitter);
  342 + jitter = new SrsRtmpJitter();
  343 +
  344 + // new flv file
  345 + std::stringstream path;
  346 +
  347 + path << _srs_config->get_dvr_path(req->vhost)
  348 + << "/" << req->app << "/"
  349 + << req->stream << "." << srs_get_system_time_ms() << ".flv";
  350 +
  351 + if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) {
  352 + return ret;
  353 + }
  354 + dvr_enabled = true;
  355 +
  356 + // the dvr is enabled, notice the source to push the data.
  357 + if ((ret = _source->on_dvr_start()) != ERROR_SUCCESS) {
  358 + return ret;
  359 + }
312 360
313 return ret; 361 return ret;
314 } 362 }
@@ -332,10 +380,6 @@ int SrsDvrPlan::flv_open(string stream, string path) @@ -332,10 +380,6 @@ int SrsDvrPlan::flv_open(string stream, string path)
332 return ret; 380 return ret;
333 } 381 }
334 382
335 - if ((ret = _source->on_dvr_start()) != ERROR_SUCCESS) {  
336 - return ret;  
337 - }  
338 -  
339 srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str()); 383 srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str());
340 return ret; 384 return ret;
341 } 385 }
@@ -375,6 +419,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) @@ -375,6 +419,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio)
375 return ret; 419 return ret;
376 } 420 }
377 421
  422 + if ((jitter->correct(audio, 0, 0)) != ERROR_SUCCESS) {
  423 + return ret;
  424 + }
  425 +
378 int32_t timestamp = audio->header.timestamp; 426 int32_t timestamp = audio->header.timestamp;
379 char* payload = (char*)audio->payload; 427 char* payload = (char*)audio->payload;
380 int size = (int)audio->size; 428 int size = (int)audio->size;
@@ -382,6 +430,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) @@ -382,6 +430,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio)
382 return ret; 430 return ret;
383 } 431 }
384 432
  433 + if ((ret = on_audio_msg(audio)) != ERROR_SUCCESS) {
  434 + return ret;
  435 + }
  436 +
385 return ret; 437 return ret;
386 } 438 }
387 439
@@ -393,6 +445,10 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) @@ -393,6 +445,10 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
393 return ret; 445 return ret;
394 } 446 }
395 447
  448 + if ((jitter->correct(video, 0, 0)) != ERROR_SUCCESS) {
  449 + return ret;
  450 + }
  451 +
396 int32_t timestamp = video->header.timestamp; 452 int32_t timestamp = video->header.timestamp;
397 char* payload = (char*)video->payload; 453 char* payload = (char*)video->payload;
398 int size = (int)video->size; 454 int size = (int)video->size;
@@ -400,6 +456,22 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) @@ -400,6 +456,22 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video)
400 return ret; 456 return ret;
401 } 457 }
402 458
  459 + if ((ret = on_video_msg(video)) != ERROR_SUCCESS) {
  460 + return ret;
  461 + }
  462 +
  463 + return ret;
  464 +}
  465 +
  466 +int SrsDvrPlan::on_audio_msg(SrsSharedPtrMessage* /*audio*/)
  467 +{
  468 + int ret = ERROR_SUCCESS;
  469 + return ret;
  470 +}
  471 +
  472 +int SrsDvrPlan::on_video_msg(SrsSharedPtrMessage* /*video*/)
  473 +{
  474 + int ret = ERROR_SUCCESS;
403 return ret; 475 return ret;
404 } 476 }
405 477
@@ -423,33 +495,6 @@ SrsDvrSessionPlan::~SrsDvrSessionPlan() @@ -423,33 +495,6 @@ SrsDvrSessionPlan::~SrsDvrSessionPlan()
423 { 495 {
424 } 496 }
425 497
426 -int SrsDvrSessionPlan::on_publish(SrsRequest* req)  
427 -{  
428 - int ret = ERROR_SUCCESS;  
429 -  
430 - // support multiple publish.  
431 - if (dvr_enabled) {  
432 - return ret;  
433 - }  
434 -  
435 - if (!_srs_config->get_dvr_enabled(req->vhost)) {  
436 - return ret;  
437 - }  
438 -  
439 - std::stringstream path;  
440 -  
441 - path << _srs_config->get_dvr_path(req->vhost)  
442 - << "/" << req->app << "/"  
443 - << req->stream << "." << srs_get_system_time_ms() << ".flv";  
444 -  
445 - if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) {  
446 - return ret;  
447 - }  
448 - dvr_enabled = true;  
449 -  
450 - return ret;  
451 -}  
452 -  
453 void SrsDvrSessionPlan::on_unpublish() 498 void SrsDvrSessionPlan::on_unpublish()
454 { 499 {
455 // support multiple publish. 500 // support multiple publish.
@@ -492,31 +537,17 @@ int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req) @@ -492,31 +537,17 @@ int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req)
492 return ret; 537 return ret;
493 } 538 }
494 539
495 -int SrsDvrSegmentPlan::on_publish(SrsRequest* req) 540 +int SrsDvrSegmentPlan::on_publish()
496 { 541 {
497 int ret = ERROR_SUCCESS; 542 int ret = ERROR_SUCCESS;
498 543
499 - // support multiple publish.  
500 - if (dvr_enabled) {  
501 - return ret;  
502 - }  
503 -  
504 - if (!_srs_config->get_dvr_enabled(req->vhost)) {  
505 - return ret;  
506 - }  
507 -  
508 - std::stringstream path;  
509 -  
510 - path << _srs_config->get_dvr_path(req->vhost)  
511 - << "/" << req->app << "/"  
512 - << req->stream << "." << srs_get_system_time_ms() << ".flv";  
513 -  
514 - if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) { 544 + // if already opened, continue to dvr.
  545 + if (fs->is_open()) {
  546 + dvr_enabled = true;
515 return ret; 547 return ret;
516 } 548 }
517 - dvr_enabled = true;  
518 549
519 - return ret; 550 + return SrsDvrPlan::on_publish();
520 } 551 }
521 552
522 void SrsDvrSegmentPlan::on_unpublish() 553 void SrsDvrSegmentPlan::on_unpublish()
@@ -525,51 +556,28 @@ void SrsDvrSegmentPlan::on_unpublish() @@ -525,51 +556,28 @@ void SrsDvrSegmentPlan::on_unpublish()
525 if (!dvr_enabled) { 556 if (!dvr_enabled) {
526 return; 557 return;
527 } 558 }
528 -  
529 - // ignore error.  
530 - int ret = flv_close();  
531 - if (ret != ERROR_SUCCESS) {  
532 - srs_warn("ignore flv close error. ret=%d", ret);  
533 - }  
534 -  
535 dvr_enabled = false; 559 dvr_enabled = false;
536 } 560 }
537 561
538 -int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio) 562 +int SrsDvrSegmentPlan::on_audio_msg(SrsSharedPtrMessage* audio)
539 { 563 {
540 int ret = ERROR_SUCCESS; 564 int ret = ERROR_SUCCESS;
541 565
542 - if (!dvr_enabled) {  
543 - return ret;  
544 - }  
545 -  
546 if ((ret = update_duration(audio)) != ERROR_SUCCESS) { 566 if ((ret = update_duration(audio)) != ERROR_SUCCESS) {
547 return ret; 567 return ret;
548 } 568 }
549 569
550 - if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) {  
551 - return ret;  
552 - }  
553 -  
554 return ret; 570 return ret;
555 } 571 }
556 572
557 -int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video) 573 +int SrsDvrSegmentPlan::on_video_msg(SrsSharedPtrMessage* video)
558 { 574 {
559 int ret = ERROR_SUCCESS; 575 int ret = ERROR_SUCCESS;
560 576
561 - if (!dvr_enabled) {  
562 - return ret;  
563 - }  
564 -  
565 if ((ret = update_duration(video)) != ERROR_SUCCESS) { 577 if ((ret = update_duration(video)) != ERROR_SUCCESS) {
566 return ret; 578 return ret;
567 } 579 }
568 580
569 - if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) {  
570 - return ret;  
571 - }  
572 -  
573 return ret; 581 return ret;
574 } 582 }
575 583
@@ -586,7 +594,17 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) @@ -586,7 +594,17 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
586 594
587 // reap if exceed duration. 595 // reap if exceed duration.
588 if (duration > 0 && segment_duration > 0 && duration > segment_duration) { 596 if (duration > 0 && segment_duration > 0 && duration > segment_duration) {
  597 + duration = 0;
  598 + starttime = -1;
  599 +
  600 + if ((ret = flv_close()) != ERROR_SUCCESS) {
  601 + return ret;
  602 + }
589 on_unpublish(); 603 on_unpublish();
  604 +
  605 + if ((ret = on_publish()) != ERROR_SUCCESS) {
  606 + return ret;
  607 + }
590 } 608 }
591 609
592 return ret; 610 return ret;
@@ -617,11 +635,11 @@ int SrsDvr::initialize(SrsRequest* req) @@ -617,11 +635,11 @@ int SrsDvr::initialize(SrsRequest* req)
617 return ret; 635 return ret;
618 } 636 }
619 637
620 -int SrsDvr::on_publish(SrsRequest* req) 638 +int SrsDvr::on_publish(SrsRequest* /*req*/)
621 { 639 {
622 int ret = ERROR_SUCCESS; 640 int ret = ERROR_SUCCESS;
623 641
624 - if ((ret = plan->on_publish(req)) != ERROR_SUCCESS) { 642 + if ((ret = plan->on_publish()) != ERROR_SUCCESS) {
625 return ret; 643 return ret;
626 } 644 }
627 645
@@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 class SrsSource; 34 class SrsSource;
35 class SrsRequest; 35 class SrsRequest;
36 class SrsStream; 36 class SrsStream;
  37 +class SrsRtmpJitter;
37 class SrsOnMetaDataPacket; 38 class SrsOnMetaDataPacket;
38 class SrsSharedPtrMessage; 39 class SrsSharedPtrMessage;
39 40
@@ -51,6 +52,7 @@ public: @@ -51,6 +52,7 @@ public:
51 public: 52 public:
52 virtual int open(std::string file); 53 virtual int open(std::string file);
53 virtual int close(); 54 virtual int close();
  55 + virtual bool is_open();
54 public: 56 public:
55 /** 57 /**
56 * @param pnread, return the read size. NULL to ignore. 58 * @param pnread, return the read size. NULL to ignore.
@@ -123,18 +125,26 @@ protected: @@ -123,18 +125,26 @@ protected:
123 SrsFlvEncoder* enc; 125 SrsFlvEncoder* enc;
124 bool dvr_enabled; 126 bool dvr_enabled;
125 SrsSource* _source; 127 SrsSource* _source;
  128 + SrsRequest* _req;
  129 + SrsRtmpJitter* jitter;
126 public: 130 public:
127 SrsDvrPlan(); 131 SrsDvrPlan();
128 virtual ~SrsDvrPlan(); 132 virtual ~SrsDvrPlan();
129 public: 133 public:
130 virtual int initialize(SrsSource* source, SrsRequest* req); 134 virtual int initialize(SrsSource* source, SrsRequest* req);
131 - virtual int on_publish(SrsRequest* req) = 0; 135 + virtual int on_publish();
132 virtual void on_unpublish() = 0; 136 virtual void on_unpublish() = 0;
133 virtual int on_meta_data(SrsOnMetaDataPacket* metadata); 137 virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
134 virtual int on_audio(SrsSharedPtrMessage* audio); 138 virtual int on_audio(SrsSharedPtrMessage* audio);
135 virtual int on_video(SrsSharedPtrMessage* video); 139 virtual int on_video(SrsSharedPtrMessage* video);
136 protected: 140 protected:
137 virtual int flv_open(std::string stream, std::string path); 141 virtual int flv_open(std::string stream, std::string path);
  142 + /**
  143 + * user should override this method.
  144 + * for the audio/video is corrected by jitter.
  145 + */
  146 + virtual int on_audio_msg(SrsSharedPtrMessage* audio);
  147 + virtual int on_video_msg(SrsSharedPtrMessage* video);
138 virtual int flv_close(); 148 virtual int flv_close();
139 public: 149 public:
140 static SrsDvrPlan* create_plan(std::string vhost); 150 static SrsDvrPlan* create_plan(std::string vhost);
@@ -149,7 +159,6 @@ public: @@ -149,7 +159,6 @@ public:
149 SrsDvrSessionPlan(); 159 SrsDvrSessionPlan();
150 virtual ~SrsDvrSessionPlan(); 160 virtual ~SrsDvrSessionPlan();
151 public: 161 public:
152 - virtual int on_publish(SrsRequest* req);  
153 virtual void on_unpublish(); 162 virtual void on_unpublish();
154 }; 163 };
155 164
@@ -168,10 +177,10 @@ public: @@ -168,10 +177,10 @@ public:
168 virtual ~SrsDvrSegmentPlan(); 177 virtual ~SrsDvrSegmentPlan();
169 public: 178 public:
170 virtual int initialize(SrsSource* source, SrsRequest* req); 179 virtual int initialize(SrsSource* source, SrsRequest* req);
171 - virtual int on_publish(SrsRequest* req); 180 + virtual int on_publish();
172 virtual void on_unpublish(); 181 virtual void on_unpublish();
173 - virtual int on_audio(SrsSharedPtrMessage* audio);  
174 - virtual int on_video(SrsSharedPtrMessage* video); 182 + virtual int on_audio_msg(SrsSharedPtrMessage* audio);
  183 + virtual int on_video_msg(SrsSharedPtrMessage* video);
175 private: 184 private:
176 virtual int update_duration(SrsSharedPtrMessage* msg); 185 virtual int update_duration(SrsSharedPtrMessage* msg);
177 }; 186 };
@@ -1297,6 +1297,9 @@ int SrsHls::on_publish(SrsRequest* req) @@ -1297,6 +1297,9 @@ int SrsHls::on_publish(SrsRequest* req)
1297 return ret; 1297 return ret;
1298 } 1298 }
1299 1299
  1300 + // if enabled, open the muxer.
  1301 + hls_enabled = true;
  1302 +
1300 // notice the source to get the cached sequence header. 1303 // notice the source to get the cached sequence header.
1301 // when reload to start hls, hls will never get the sequence header in stream, 1304 // when reload to start hls, hls will never get the sequence header in stream,
1302 // use the SrsSource.on_hls_start to push the sequence header to HLS. 1305 // use the SrsSource.on_hls_start to push the sequence header to HLS.
@@ -1305,9 +1308,6 @@ int SrsHls::on_publish(SrsRequest* req) @@ -1305,9 +1308,6 @@ int SrsHls::on_publish(SrsRequest* req)
1305 return ret; 1308 return ret;
1306 } 1309 }
1307 1310
1308 - // if enabled, open the muxer.  
1309 - hls_enabled = true;  
1310 -  
1311 return ret; 1311 return ret;
1312 } 1312 }
1313 1313