胡斌

add config directive forward_in_turn to support forward streams to one of the server

in forward_in_turn list in turn
@@ -9,6 +9,8 @@ daemon off; @@ -9,6 +9,8 @@ daemon off;
9 srs_log_tank console; 9 srs_log_tank console;
10 vhost __defaultVhost__ { 10 vhost __defaultVhost__ {
11 forward 127.0.0.1:19350; 11 forward 127.0.0.1:19350;
  12 +#select on of the listed servers in turn to forward,not all of them
  13 + forward_in_turn 127.0.0.1:19351 127.0.0.1:19352;
12 #the forward destination server type,default true. if the server is not srs server,connect app maybe fail,try config as false 14 #the forward destination server type,default true. if the server is not srs server,connect app maybe fail,try config as false
13 forward_server_srs true; 15 forward_server_srs true;
14 } 16 }
@@ -1823,7 +1823,7 @@ int SrsConfig::check_config() @@ -1823,7 +1823,7 @@ int SrsConfig::check_config()
1823 && n != "dvr" && n != "ingest" && n != "hls" && n != "http_hooks" 1823 && n != "dvr" && n != "ingest" && n != "hls" && n != "http_hooks"
1824 && n != "gop_cache" && n != "queue_length" 1824 && n != "gop_cache" && n != "queue_length"
1825 && n != "refer" && n != "refer_publish" && n != "refer_play" 1825 && n != "refer" && n != "refer_publish" && n != "refer_play"
1826 - && n != "forward" && n != "forward_server_srs" && n != "transcode" && n != "bandcheck" 1826 + && n != "forward" && n != "forward_server_srs" && n != "forward_in_turn" && n != "transcode" && n != "bandcheck"
1827 && n != "time_jitter" && n != "mix_correct" 1827 && n != "time_jitter" && n != "mix_correct"
1828 && n != "atc" && n != "atc_auto" 1828 && n != "atc" && n != "atc_auto"
1829 && n != "debug_srs_upnode" 1829 && n != "debug_srs_upnode"
@@ -2772,6 +2772,17 @@ bool SrsConfig::get_forward_server_srs(string vhost) @@ -2772,6 +2772,17 @@ bool SrsConfig::get_forward_server_srs(string vhost)
2772 return conf->arg0()=="true"; 2772 return conf->arg0()=="true";
2773 } 2773 }
2774 2774
  2775 +SrsConfDirective* SrsConfig::get_forward_in_turn(string vhost)
  2776 +{
  2777 + SrsConfDirective* conf = get_vhost(vhost);
  2778 +
  2779 + if (!conf) {
  2780 + return NULL;
  2781 + }
  2782 +
  2783 + return conf->get("forward_in_turn");
  2784 +}
  2785 +
2775 SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) 2786 SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
2776 { 2787 {
2777 SrsConfDirective* conf = get_vhost(vhost); 2788 SrsConfDirective* conf = get_vhost(vhost);
@@ -573,6 +573,10 @@ public: @@ -573,6 +573,10 @@ public:
573 * get the forward server type of vhost,if srs return true,else false. 573 * get the forward server type of vhost,if srs return true,else false.
574 */ 574 */
575 virtual bool get_forward_server_srs(std::string vhost); 575 virtual bool get_forward_server_srs(std::string vhost);
  576 + /**
  577 + * get the forward_in_turn directive of vhost.
  578 + */
  579 + virtual SrsConfDirective* get_forward_in_turn(std::string vhost);
576 // http_hooks section 580 // http_hooks section
577 private: 581 private:
578 /** 582 /**
@@ -2448,6 +2448,44 @@ int SrsSource::create_forwarders() @@ -2448,6 +2448,44 @@ int SrsSource::create_forwarders()
2448 } 2448 }
2449 } 2449 }
2450 2450
  2451 + //forward a server in forward_in_turn
  2452 + static int server_index = 0;
  2453 + conf = _srs_config->get_forward_in_turn(_req->vhost);
  2454 + if(!conf){
  2455 + return ret;
  2456 + }
  2457 +
  2458 + int servers_size = (int)conf->args.size();
  2459 + if(servers_size <= 0){
  2460 + return ret;
  2461 + }
  2462 +
  2463 + if(server_index >= servers_size){
  2464 + server_index = 0;
  2465 + }
  2466 +
  2467 + std::string forward_server = conf->args.at(server_index);
  2468 + server_index ++;
  2469 +
  2470 + SrsForwarder* forwarder = new SrsForwarder(this);
  2471 + forwarders.push_back(forwarder);
  2472 +
  2473 + // initialize the forwarder with request.
  2474 + if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) {
  2475 + return ret;
  2476 + }
  2477 +
  2478 + double queue_size = _srs_config->get_queue_length(_req->vhost);
  2479 + forwarder->set_queue_size(queue_size);
  2480 +
  2481 + if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
  2482 + srs_error("start forwarder failed. "
  2483 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2484 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2485 + forward_server.c_str());
  2486 + return ret;
  2487 + }
  2488 +
2451 return ret; 2489 return ret;
2452 } 2490 }
2453 2491