winlin

add forward framework

@@ -573,6 +573,17 @@ SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) @@ -573,6 +573,17 @@ SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost)
573 return conf->get("gop_cache"); 573 return conf->get("gop_cache");
574 } 574 }
575 575
  576 +SrsConfDirective* SrsConfig::get_forward(std::string vhost)
  577 +{
  578 + SrsConfDirective* conf = get_vhost(vhost);
  579 +
  580 + if (!conf) {
  581 + return NULL;
  582 + }
  583 +
  584 + return conf->get("forward");
  585 +}
  586 +
576 SrsConfDirective* SrsConfig::get_hls(std::string vhost) 587 SrsConfDirective* SrsConfig::get_hls(std::string vhost)
577 { 588 {
578 SrsConfDirective* conf = get_vhost(vhost); 589 SrsConfDirective* conf = get_vhost(vhost);
@@ -115,6 +115,7 @@ public: @@ -115,6 +115,7 @@ public:
115 virtual SrsConfDirective* get_vhost(std::string vhost); 115 virtual SrsConfDirective* get_vhost(std::string vhost);
116 virtual SrsConfDirective* get_vhost_enabled(std::string vhost); 116 virtual SrsConfDirective* get_vhost_enabled(std::string vhost);
117 virtual SrsConfDirective* get_gop_cache(std::string vhost); 117 virtual SrsConfDirective* get_gop_cache(std::string vhost);
  118 + virtual SrsConfDirective* get_forward(std::string vhost);
118 virtual SrsConfDirective* get_hls(std::string vhost); 119 virtual SrsConfDirective* get_hls(std::string vhost);
119 virtual SrsConfDirective* get_hls_path(std::string vhost); 120 virtual SrsConfDirective* get_hls_path(std::string vhost);
120 virtual SrsConfDirective* get_hls_fragment(std::string vhost); 121 virtual SrsConfDirective* get_hls_fragment(std::string vhost);
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_core_forward.hpp> 24 #include <srs_core_forward.hpp>
25 25
  26 +#include <srs_core_error.hpp>
  27 +
26 SrsForwarder::SrsForwarder() 28 SrsForwarder::SrsForwarder()
27 { 29 {
28 } 30 }
@@ -31,3 +33,31 @@ SrsForwarder::~SrsForwarder() @@ -31,3 +33,31 @@ SrsForwarder::~SrsForwarder()
31 { 33 {
32 } 34 }
33 35
  36 +int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server)
  37 +{
  38 + int ret = ERROR_SUCCESS;
  39 + return ret;
  40 +}
  41 +
  42 +void SrsForwarder::on_unpublish()
  43 +{
  44 +}
  45 +
  46 +int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata)
  47 +{
  48 + int ret = ERROR_SUCCESS;
  49 + return ret;
  50 +}
  51 +
  52 +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
  53 +{
  54 + int ret = ERROR_SUCCESS;
  55 + return ret;
  56 +}
  57 +
  58 +int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
  59 +{
  60 + int ret = ERROR_SUCCESS;
  61 + return ret;
  62 +}
  63 +
@@ -29,6 +29,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,6 +29,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
  32 +#include <string>
  33 +
  34 +class SrsSharedPtrMessage;
  35 +class SrsOnMetaDataPacket;
  36 +
32 /** 37 /**
33 * forward the stream to other servers. 38 * forward the stream to other servers.
34 */ 39 */
@@ -38,6 +43,11 @@ public: @@ -38,6 +43,11 @@ public:
38 SrsForwarder(); 43 SrsForwarder();
39 virtual ~SrsForwarder(); 44 virtual ~SrsForwarder();
40 public: 45 public:
  46 + virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server);
  47 + virtual void on_unpublish();
  48 + virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
  49 + virtual int on_audio(SrsSharedPtrMessage* msg);
  50 + virtual int on_video(SrsSharedPtrMessage* msg);
41 }; 51 };
42 52
43 #endif 53 #endif
@@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <srs_core_codec.hpp> 32 #include <srs_core_codec.hpp>
33 #include <srs_core_hls.hpp> 33 #include <srs_core_hls.hpp>
34 #include <srs_core_forward.hpp> 34 #include <srs_core_forward.hpp>
  35 +#include <srs_core_config.hpp>
35 36
36 #define CONST_MAX_JITTER_MS 500 37 #define CONST_MAX_JITTER_MS 500
37 #define DEFAULT_FRAME_TIME_MS 10 38 #define DEFAULT_FRAME_TIME_MS 10
@@ -408,6 +409,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -408,6 +409,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
408 return ret; 409 return ret;
409 } 410 }
410 #endif 411 #endif
  412 +
  413 + if (true) {
  414 + std::vector<SrsForwarder*>::iterator it;
  415 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  416 + SrsForwarder* forwarder = *it;
  417 + if ((ret = forwarder->on_meta_data(metadata)) != ERROR_SUCCESS) {
  418 + srs_error("forwarder process onMetaData message failed. ret=%d", ret);
  419 + return ret;
  420 + }
  421 + }
  422 + }
411 423
412 metadata->metadata->set("server", new SrsAmf0String( 424 metadata->metadata->set("server", new SrsAmf0String(
413 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); 425 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
@@ -453,15 +465,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -453,15 +465,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
453 srs_verbose("initialize shared ptr metadata success."); 465 srs_verbose("initialize shared ptr metadata success.");
454 466
455 // copy to all consumer 467 // copy to all consumer
456 - std::vector<SrsConsumer*>::iterator it;  
457 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
458 - SrsConsumer* consumer = *it;  
459 - if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
460 - srs_error("dispatch the metadata failed. ret=%d", ret);  
461 - return ret; 468 + if (true) {
  469 + std::vector<SrsConsumer*>::iterator it;
  470 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  471 + SrsConsumer* consumer = *it;
  472 + if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  473 + srs_error("dispatch the metadata failed. ret=%d", ret);
  474 + return ret;
  475 + }
462 } 476 }
  477 + srs_trace("dispatch metadata success.");
463 } 478 }
464 - srs_trace("dispatch metadata success.");  
465 479
466 return ret; 480 return ret;
467 } 481 }
@@ -484,17 +498,30 @@ int SrsSource::on_audio(SrsCommonMessage* audio) @@ -484,17 +498,30 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
484 return ret; 498 return ret;
485 } 499 }
486 #endif 500 #endif
  501 +
  502 + if (true) {
  503 + std::vector<SrsForwarder*>::iterator it;
  504 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  505 + SrsForwarder* forwarder = *it;
  506 + if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {
  507 + srs_error("forwarder process audio message failed. ret=%d", ret);
  508 + return ret;
  509 + }
  510 + }
  511 + }
487 512
488 // copy to all consumer 513 // copy to all consumer
489 - std::vector<SrsConsumer*>::iterator it;  
490 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
491 - SrsConsumer* consumer = *it;  
492 - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
493 - srs_error("dispatch the audio failed. ret=%d", ret);  
494 - return ret; 514 + if (true) {
  515 + std::vector<SrsConsumer*>::iterator it;
  516 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  517 + SrsConsumer* consumer = *it;
  518 + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  519 + srs_error("dispatch the audio failed. ret=%d", ret);
  520 + return ret;
  521 + }
495 } 522 }
  523 + srs_info("dispatch audio success.");
496 } 524 }
497 - srs_info("dispatch audio success.");  
498 525
499 // cache the sequence header if h264 526 // cache the sequence header if h264
500 if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) { 527 if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) {
@@ -532,17 +559,30 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -532,17 +559,30 @@ int SrsSource::on_video(SrsCommonMessage* video)
532 return ret; 559 return ret;
533 } 560 }
534 #endif 561 #endif
  562 +
  563 + if (true) {
  564 + std::vector<SrsForwarder*>::iterator it;
  565 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  566 + SrsForwarder* forwarder = *it;
  567 + if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {
  568 + srs_error("forwarder process video message failed. ret=%d", ret);
  569 + return ret;
  570 + }
  571 + }
  572 + }
535 573
536 // copy to all consumer 574 // copy to all consumer
537 - std::vector<SrsConsumer*>::iterator it;  
538 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
539 - SrsConsumer* consumer = *it;  
540 - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
541 - srs_error("dispatch the video failed. ret=%d", ret);  
542 - return ret; 575 + if (true) {
  576 + std::vector<SrsConsumer*>::iterator it;
  577 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  578 + SrsConsumer* consumer = *it;
  579 + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  580 + srs_error("dispatch the video failed. ret=%d", ret);
  581 + return ret;
  582 + }
543 } 583 }
  584 + srs_info("dispatch video success.");
544 } 585 }
545 - srs_info("dispatch video success.");  
546 586
547 // cache the sequence header if h264 587 // cache the sequence header if h264
548 if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) { 588 if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) {
@@ -562,26 +602,55 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -562,26 +602,55 @@ int SrsSource::on_video(SrsCommonMessage* video)
562 return ret; 602 return ret;
563 } 603 }
564 604
565 -#ifdef SRS_HLS  
566 int SrsSource::on_publish(std::string vhost, std::string app, std::string stream) 605 int SrsSource::on_publish(std::string vhost, std::string app, std::string stream)
567 { 606 {
  607 + int ret = ERROR_SUCCESS;
  608 +
568 _can_publish = false; 609 _can_publish = false;
569 - return hls->on_publish(vhost, app, stream);  
570 -}  
571 -#else  
572 -int SrsSource::on_publish(std::string /*vhost*/, std::string /*app*/, std::string /*stream*/)  
573 -{  
574 - _can_publish = false;  
575 - return ERROR_SUCCESS;  
576 -} 610 +
  611 +#ifdef SRS_HLS
  612 + if ((ret = hls->on_publish(vhost, app, stream)) != ERROR_SUCCESS) {
  613 + return ret;
  614 + }
577 #endif 615 #endif
578 616
  617 + // TODO: support reload.
  618 +
  619 + // create forwarders
  620 + SrsConfDirective* conf = config->get_forward(vhost);
  621 + for (int i = 0; conf && i < conf->args.size(); i++) {
  622 + std::string forward_server = conf->args.at(i);
  623 +
  624 + SrsForwarder* forwarder = new SrsForwarder();
  625 + forwarders.push_back(forwarder);
  626 +
  627 + if ((ret = forwarder->on_publish(vhost, app, stream, forward_server)) != ERROR_SUCCESS) {
  628 + srs_error("start forwarder failed. "
  629 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  630 + vhost.c_str(), app.c_str(), stream.c_str(),
  631 + forward_server.c_str());
  632 + return ret;
  633 + }
  634 + }
  635 +
  636 + return ret;
  637 +}
  638 +
579 void SrsSource::on_unpublish() 639 void SrsSource::on_unpublish()
580 { 640 {
581 #ifdef SRS_HLS 641 #ifdef SRS_HLS
582 hls->on_unpublish(); 642 hls->on_unpublish();
583 #endif 643 #endif
584 644
  645 + // close all forwarders
  646 + std::vector<SrsForwarder*>::iterator it;
  647 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  648 + SrsForwarder* forwarder = *it;
  649 + forwarder->on_unpublish();
  650 + srs_freep(forwarder);
  651 + }
  652 + forwarders.clear();
  653 +
585 gop_cache->clear(); 654 gop_cache->clear();
586 655
587 srs_freep(cache_metadata); 656 srs_freep(cache_metadata);