胡斌

add forward_peer,to support forward between origins.the streams pushed from peer

origins will not be forward any more.
remove forward_server_srs,which used to indicate the forward target server is or is not srs,
add forward_server_other, to list the target rtmp server other than srs in.
@@ -11,6 +11,9 @@ vhost __defaultVhost__ { @@ -11,6 +11,9 @@ vhost __defaultVhost__ {
11 forward 127.0.0.1:19350; 11 forward 127.0.0.1:19350;
12 #select on of the listed servers in turn to forward,not all of them 12 #select on of the listed servers in turn to forward,not all of them
13 forward_in_turn 127.0.0.1:19351 127.0.0.1:19352; 13 forward_in_turn 127.0.0.1:19351 127.0.0.1:19352;
14 -#the forward destination server type,default true. if the server is not srs server,connect app maybe fail,try config as false  
15 - forward_server_srs true; 14 +#if the forward destination server is not srs,such as some rtmp server in cdn,write the address of them below
  15 + forward_server_other 127.0.0.1:19350;
  16 +#if the forward server is same as this server , a origin,try use forward_peer,
  17 +#the stream pushed from other forward peer will not forward any more in this server
  18 + forward_peer 127.0.0.1:1936;
16 } 19 }
@@ -1823,7 +1823,7 @@ int SrsConfig::check_config() @@ -1823,7 +1823,7 @@ int SrsConfig::check_config()
1823 && n != "dvr" && n != "ingest" && n != "hls" && n != "http_hooks" 1823 && n != "dvr" && n != "ingest" && n != "hls" && n != "http_hooks"
1824 && n != "gop_cache" && n != "queue_length" 1824 && n != "gop_cache" && n != "queue_length"
1825 && n != "refer" && n != "refer_publish" && n != "refer_play" 1825 && n != "refer" && n != "refer_publish" && n != "refer_play"
1826 - && n != "forward" && n != "forward_server_srs" && n != "forward_in_turn" && n != "transcode" && n != "bandcheck" 1826 + && n != "forward" && n != "forward_server_other" && n != "forward_in_turn" && n != "forward_peer" && n != "transcode" && n != "bandcheck"
1827 && n != "time_jitter" && n != "mix_correct" 1827 && n != "time_jitter" && n != "mix_correct"
1828 && n != "atc" && n != "atc_auto" 1828 && n != "atc" && n != "atc_auto"
1829 && n != "debug_srs_upnode" 1829 && n != "debug_srs_upnode"
@@ -1939,15 +1939,6 @@ int SrsConfig::check_config() @@ -1939,15 +1939,6 @@ int SrsConfig::check_config()
1939 return ret; 1939 return ret;
1940 } 1940 }
1941 }*/ 1941 }*/
1942 - } else if (n == "forward_server_srs") {  
1943 - for (int j = 0; j < (int)conf->directives.size(); j++) {  
1944 - string m = conf->at(j)->name.c_str();  
1945 - if (m != "true" && m != "false") {  
1946 - ret = ERROR_SYSTEM_CONFIG_INVALID;  
1947 - srs_error("unsupported vhost forward_server_srs directive %s(only support true or flase), ret=%d", m.c_str(), ret);  
1948 - return ret;  
1949 - }  
1950 - }  
1951 } 1942 }
1952 else if (n == "security") { 1943 else if (n == "security") {
1953 for (int j = 0; j < (int)conf->directives.size(); j++) { 1944 for (int j = 0; j < (int)conf->directives.size(); j++) {
@@ -2756,20 +2747,15 @@ SrsConfDirective* SrsConfig::get_forward(string vhost) @@ -2756,20 +2747,15 @@ SrsConfDirective* SrsConfig::get_forward(string vhost)
2756 return conf->get("forward"); 2747 return conf->get("forward");
2757 } 2748 }
2758 2749
2759 -bool SrsConfig::get_forward_server_srs(string vhost) 2750 +SrsConfDirective* SrsConfig::get_forward_server_other(string vhost)
2760 { 2751 {
2761 SrsConfDirective* conf = get_vhost(vhost); 2752 SrsConfDirective* conf = get_vhost(vhost);
2762 2753
2763 if (!conf) { 2754 if (!conf) {
2764 - return true;  
2765 - }  
2766 -  
2767 - conf = conf->get("forward_server_srs");  
2768 - if (!conf || conf->arg0().empty()) {  
2769 - return true; 2755 + return NULL;
2770 } 2756 }
2771 2757
2772 - return conf->arg0()=="true"; 2758 + return conf->get("forward_server_other");
2773 } 2759 }
2774 2760
2775 SrsConfDirective* SrsConfig::get_forward_in_turn(string vhost) 2761 SrsConfDirective* SrsConfig::get_forward_in_turn(string vhost)
@@ -2783,6 +2769,17 @@ SrsConfDirective* SrsConfig::get_forward_in_turn(string vhost) @@ -2783,6 +2769,17 @@ SrsConfDirective* SrsConfig::get_forward_in_turn(string vhost)
2783 return conf->get("forward_in_turn"); 2769 return conf->get("forward_in_turn");
2784 } 2770 }
2785 2771
  2772 +SrsConfDirective* SrsConfig::get_forward_peer(string vhost)
  2773 +{
  2774 + SrsConfDirective* conf = get_vhost(vhost);
  2775 +
  2776 + if (!conf) {
  2777 + return NULL;
  2778 + }
  2779 +
  2780 + return conf->get("forward_peer");
  2781 +}
  2782 +
2786 SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) 2783 SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
2787 { 2784 {
2788 SrsConfDirective* conf = get_vhost(vhost); 2785 SrsConfDirective* conf = get_vhost(vhost);
@@ -570,13 +570,17 @@ public: @@ -570,13 +570,17 @@ public:
570 */ 570 */
571 virtual SrsConfDirective* get_forward(std::string vhost); 571 virtual SrsConfDirective* get_forward(std::string vhost);
572 /** 572 /**
573 - * get the forward server type of vhost,if srs return true,else false. 573 + * get the forward_server_other directive of vhost
574 */ 574 */
575 - virtual bool get_forward_server_srs(std::string vhost); 575 + virtual SrsConfDirective* get_forward_server_other(std::string vhost);
576 /** 576 /**
577 * get the forward_in_turn directive of vhost. 577 * get the forward_in_turn directive of vhost.
578 */ 578 */
579 virtual SrsConfDirective* get_forward_in_turn(std::string vhost); 579 virtual SrsConfDirective* get_forward_in_turn(std::string vhost);
  580 + /**
  581 + * get the forward_peer directive of vhost.
  582 + */
  583 + virtual SrsConfDirective* get_forward_peer(std::string vhost);
580 // http_hooks section 584 // http_hooks section
581 private: 585 private:
582 /** 586 /**
@@ -52,6 +52,8 @@ SrsForwarder::SrsForwarder(SrsSource* _source) @@ -52,6 +52,8 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
52 { 52 {
53 source = _source; 53 source = _source;
54 54
  55 + server_type = 0;
  56 +
55 _req = NULL; 57 _req = NULL;
56 io = NULL; 58 io = NULL;
57 client = NULL; 59 client = NULL;
@@ -117,13 +119,7 @@ int SrsForwarder::check_dead_loop(SrsRequest* req, std::string ep_forward) @@ -117,13 +119,7 @@ int SrsForwarder::check_dead_loop(SrsRequest* req, std::string ep_forward)
117 119
118 if (source_ep == dest_ep) { 120 if (source_ep == dest_ep) {
119 ret = ERROR_SYSTEM_FORWARD_LOOP; 121 ret = ERROR_SYSTEM_FORWARD_LOOP;
120 - srs_warn("forward loop detected. src=%s, dest=%s, ret=%d",  
121 - source_ep.c_str(), dest_ep.c_str(), ret);  
122 - return ret;  
123 } 122 }
124 - srs_trace("start forward %s to %s, tcUrl=%s, stream=%s",  
125 - source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),  
126 - req->stream.c_str());  
127 123
128 return ret; 124 return ret;
129 } 125 }
@@ -139,6 +135,15 @@ int SrsForwarder::initialize(SrsRequest* req, string ep_forward) @@ -139,6 +135,15 @@ int SrsForwarder::initialize(SrsRequest* req, string ep_forward)
139 // the ep(endpoint) to forward to 135 // the ep(endpoint) to forward to
140 _ep_forward = ep_forward; 136 _ep_forward = ep_forward;
141 137
  138 + SrsConfDirective* conf = _srs_config->get_forward_server_other(_req->vhost);
  139 + for (int i = 0; conf && i < (int)conf->args.size(); i++) {
  140 + std::string forward_server = conf->args.at(i);
  141 + if(ep_forward == forward_server){
  142 + server_type = 1;
  143 + break;
  144 + }
  145 + }
  146 +
142 return ret; 147 return ret;
143 } 148 }
144 149
@@ -428,13 +433,12 @@ int SrsForwarder::connect_app(string ep_server, string ep_port) @@ -428,13 +433,12 @@ int SrsForwarder::connect_app(string ep_server, string ep_port)
428 bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); 433 bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
429 434
430 //if forward_server is not srs,or srs compatible,don't send req when connect app,in case of be rejected 435 //if forward_server is not srs,or srs compatible,don't send req when connect app,in case of be rejected
431 - bool forward_server_srs = _srs_config->get_forward_server_srs(req->vhost);  
432 - if (!forward_server_srs) { 436 + if (server_type) {
433 srs_trace("forward to srs incompatible server, tcUrl=%s,vhost:%s", 437 srs_trace("forward to srs incompatible server, tcUrl=%s,vhost:%s",
434 tc_url.c_str(), req->vhost.c_str()); 438 tc_url.c_str(), req->vhost.c_str());
435 } 439 }
436 440
437 - if ((ret = client->connect_app(req->app, tc_url, forward_server_srs ? req : NULL, debug_srs_upnode)) != ERROR_SUCCESS) { 441 + if ((ret = client->connect_app(req->app, tc_url, server_type ? NULL : req, debug_srs_upnode)) != ERROR_SUCCESS) {
438 srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 442 srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
439 tc_url.c_str(), debug_srs_upnode, ret); 443 tc_url.c_str(), debug_srs_upnode, ret);
440 return ret; 444 return ret;
@@ -71,6 +71,8 @@ private: @@ -71,6 +71,8 @@ private:
71 */ 71 */
72 SrsSharedPtrMessage* sh_audio; 72 SrsSharedPtrMessage* sh_audio;
73 SrsSharedPtrMessage* sh_video; 73 SrsSharedPtrMessage* sh_video;
  74 +
  75 + int server_type; //server_type 0, srs, 1,other
74 public: 76 public:
75 SrsForwarder(SrsSource* _source); 77 SrsForwarder(SrsSource* _source);
76 virtual ~SrsForwarder(); 78 virtual ~SrsForwarder();
@@ -2424,13 +2424,59 @@ int SrsSource::create_forwarders() @@ -2424,13 +2424,59 @@ int SrsSource::create_forwarders()
2424 { 2424 {
2425 int ret = ERROR_SUCCESS; 2425 int ret = ERROR_SUCCESS;
2426 2426
2427 - SrsConfDirective* conf = _srs_config->get_forward(_req->vhost); 2427 + //forward_peer
  2428 + SrsConfDirective* conf = _srs_config->get_forward_peer(_req->vhost);
2428 for (int i = 0; conf && i < (int)conf->args.size(); i++) { 2429 for (int i = 0; conf && i < (int)conf->args.size(); i++) {
2429 std::string forward_server = conf->args.at(i); 2430 std::string forward_server = conf->args.at(i);
2430 2431
2431 - if (SrsForwarder::check_dead_loop(_req, forward_server)) {  
2432 - continue; 2432 + if (SrsForwarder::check_dead_loop(_req, forward_server)) {//if the source is from any of the peer list,don't forward it
  2433 + srs_trace("the source is come from forward_peer,don't forward it any more: "
  2434 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2435 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2436 + forward_server.c_str());
  2437 + return ret;
  2438 + }
2433 } 2439 }
  2440 +
  2441 + for (int i = 0; conf && i < (int)conf->args.size(); i++) {
  2442 + std::string forward_server = conf->args.at(i);
  2443 +
  2444 + SrsForwarder* forwarder = new SrsForwarder(this);
  2445 + forwarders.push_back(forwarder);
  2446 +
  2447 + // initialize the forwarder with request.
  2448 + if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) {
  2449 + return ret;
  2450 + }
  2451 +
  2452 + double queue_size = _srs_config->get_queue_length(_req->vhost);
  2453 + forwarder->set_queue_size(queue_size);
  2454 +
  2455 + srs_trace("start forward_peer: "
  2456 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2457 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2458 + forward_server.c_str());
  2459 + if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
  2460 + srs_error("start forwarder failed. "
  2461 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2462 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2463 + forward_server.c_str());
  2464 + return ret;
  2465 + }
  2466 + }
  2467 +
  2468 + //forward
  2469 + conf = _srs_config->get_forward(_req->vhost);
  2470 + for (int i = 0; conf && i < (int)conf->args.size(); i++) {
  2471 + std::string forward_server = conf->args.at(i);
  2472 +
  2473 + if (SrsForwarder::check_dead_loop(_req, forward_server)) {
  2474 + srs_trace("start forward,dead loop checked,don't forward it: "
  2475 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2476 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2477 + forward_server.c_str());
  2478 + continue;
  2479 + }
2434 2480
2435 SrsForwarder* forwarder = new SrsForwarder(this); 2481 SrsForwarder* forwarder = new SrsForwarder(this);
2436 forwarders.push_back(forwarder); 2482 forwarders.push_back(forwarder);
@@ -2474,7 +2520,11 @@ int SrsSource::create_forwarders() @@ -2474,7 +2520,11 @@ int SrsSource::create_forwarders()
2474 server_index ++; 2520 server_index ++;
2475 2521
2476 if (SrsForwarder::check_dead_loop(_req, forward_server)) { 2522 if (SrsForwarder::check_dead_loop(_req, forward_server)) {
2477 - continue; 2523 + srs_trace("start forward_in_turn,dead loop checked,don't forward it: "
  2524 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2525 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2526 + forward_server.c_str());
  2527 + continue;
2478 } 2528 }
2479 2529
2480 SrsForwarder* forwarder = new SrsForwarder(this); 2530 SrsForwarder* forwarder = new SrsForwarder(this);
@@ -2488,6 +2538,11 @@ int SrsSource::create_forwarders() @@ -2488,6 +2538,11 @@ int SrsSource::create_forwarders()
2488 double queue_size = _srs_config->get_queue_length(_req->vhost); 2538 double queue_size = _srs_config->get_queue_length(_req->vhost);
2489 forwarder->set_queue_size(queue_size); 2539 forwarder->set_queue_size(queue_size);
2490 2540
  2541 +
  2542 + srs_trace("start forward_in_turn: "
  2543 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2544 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2545 + forward_server.c_str());
2491 if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) { 2546 if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
2492 srs_error("start forwarder failed. " 2547 srs_error("start forwarder failed. "
2493 "vhost=%s, app=%s, stream=%s, forward-to=%s", 2548 "vhost=%s, app=%s, stream=%s, forward-to=%s",