胡斌

support forward between two origin,to make a full backup of top server

@@ -77,6 +77,57 @@ SrsForwarder::~SrsForwarder() @@ -77,6 +77,57 @@ SrsForwarder::~SrsForwarder()
77 srs_freep(sh_audio); 77 srs_freep(sh_audio);
78 } 78 }
79 79
  80 +int SrsForwarder::check_dead_loop(SrsRequest* req, std::string ep_forward)
  81 +{
  82 + int ret = ERROR_SUCCESS;
  83 + std::string server, port, tc_url;
  84 +
  85 + server = ep_forward;
  86 + port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  87 +
  88 + // TODO: FIXME: parse complex params
  89 + size_t pos = ep_forward.find(":");
  90 + if (pos != std::string::npos) {
  91 + port = ep_forward.substr(pos + 1);
  92 + server = ep_forward.substr(0, pos);
  93 + }
  94 +
  95 + // generate tcUrl
  96 + tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
  97 +
  98 + // dead loop check
  99 + std::string source_ep = "rtmp://";
  100 + source_ep += req->host;
  101 + source_ep += ":";
  102 + source_ep += req->port;
  103 + source_ep += "?vhost=";
  104 + source_ep += req->vhost;
  105 +
  106 + std::string dest_ep = "rtmp://";
  107 + if (ep_forward == SRS_CONSTS_LOCALHOST) {
  108 + dest_ep += req->host;
  109 + }
  110 + else {
  111 + dest_ep += server;
  112 + }
  113 + dest_ep += ":";
  114 + dest_ep += port;
  115 + dest_ep += "?vhost=";
  116 + dest_ep += req->vhost;
  117 +
  118 + if (source_ep == dest_ep) {
  119 + ret = ERROR_SYSTEM_FORWARD_LOOP;
  120 + srs_warn("forward loop detected. src=%s, dest=%s, ret=%d",
  121 + source_ep.c_str(), dest_ep.c_str(), ret);
  122 + return ret;
  123 + }
  124 + srs_trace("start forward %s to %s, tcUrl=%s, stream=%s",
  125 + source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),
  126 + req->stream.c_str());
  127 +
  128 + return ret;
  129 +}
  130 +
80 int SrsForwarder::initialize(SrsRequest* req, string ep_forward) 131 int SrsForwarder::initialize(SrsRequest* req, string ep_forward)
81 { 132 {
82 int ret = ERROR_SUCCESS; 133 int ret = ERROR_SUCCESS;
@@ -99,7 +150,7 @@ void SrsForwarder::set_queue_size(double queue_size) @@ -99,7 +150,7 @@ void SrsForwarder::set_queue_size(double queue_size)
99 int SrsForwarder::on_publish() 150 int SrsForwarder::on_publish()
100 { 151 {
101 int ret = ERROR_SUCCESS; 152 int ret = ERROR_SUCCESS;
102 - 153 +#if 0
103 SrsRequest* req = _req; 154 SrsRequest* req = _req;
104 155
105 // discovery the server port and tcUrl from req and ep_forward. 156 // discovery the server port and tcUrl from req and ep_forward.
@@ -134,7 +185,7 @@ int SrsForwarder::on_publish() @@ -134,7 +185,7 @@ int SrsForwarder::on_publish()
134 srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", 185 srs_trace("start forward %s to %s, tcUrl=%s, stream=%s",
135 source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 186 source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),
136 req->stream.c_str()); 187 req->stream.c_str());
137 - 188 +#endif
138 if ((ret = pthread->start()) != ERROR_SUCCESS) { 189 if ((ret = pthread->start()) != ERROR_SUCCESS) {
139 srs_error("start srs thread failed. ret=%d", ret); 190 srs_error("start srs thread failed. ret=%d", ret);
140 return ret; 191 return ret;
@@ -75,6 +75,7 @@ public: @@ -75,6 +75,7 @@ public:
75 SrsForwarder(SrsSource* _source); 75 SrsForwarder(SrsSource* _source);
76 virtual ~SrsForwarder(); 76 virtual ~SrsForwarder();
77 public: 77 public:
  78 + static int check_dead_loop(SrsRequest* req, std::string ep_forward);
78 virtual int initialize(SrsRequest* req, std::string ep_forward); 79 virtual int initialize(SrsRequest* req, std::string ep_forward);
79 virtual void set_queue_size(double queue_size); 80 virtual void set_queue_size(double queue_size);
80 public: 81 public:
@@ -2427,6 +2427,10 @@ int SrsSource::create_forwarders() @@ -2427,6 +2427,10 @@ int SrsSource::create_forwarders()
2427 SrsConfDirective* conf = _srs_config->get_forward(_req->vhost); 2427 SrsConfDirective* conf = _srs_config->get_forward(_req->vhost);
2428 for (int i = 0; conf && i < (int)conf->args.size(); i++) { 2428 for (int i = 0; conf && i < (int)conf->args.size(); i++) {
2429 std::string forward_server = conf->args.at(i); 2429 std::string forward_server = conf->args.at(i);
  2430 +
  2431 + if (SrsForwarder::check_dead_loop(_req, forward_server)) {
  2432 + continue;
  2433 + }
2430 2434
2431 SrsForwarder* forwarder = new SrsForwarder(this); 2435 SrsForwarder* forwarder = new SrsForwarder(this);
2432 forwarders.push_back(forwarder); 2436 forwarders.push_back(forwarder);
@@ -2459,31 +2463,39 @@ int SrsSource::create_forwarders() @@ -2459,31 +2463,39 @@ int SrsSource::create_forwarders()
2459 if(servers_size <= 0){ 2463 if(servers_size <= 0){
2460 return ret; 2464 return ret;
2461 } 2465 }
  2466 +
  2467 + for(int i = 0; i < servers_size; i++){
2462 2468
2463 - if(server_index >= servers_size){  
2464 - server_index = 0;  
2465 - } 2469 + if(server_index >= servers_size){
  2470 + server_index = 0;
  2471 + }
2466 2472
2467 - std::string forward_server = conf->args.at(server_index);  
2468 - server_index ++; 2473 + std::string forward_server = conf->args.at(server_index);
  2474 + server_index ++;
2469 2475
2470 - SrsForwarder* forwarder = new SrsForwarder(this);  
2471 - forwarders.push_back(forwarder); 2476 + if (SrsForwarder::check_dead_loop(_req, forward_server)) {
  2477 + continue;
  2478 + }
2472 2479
2473 - // initialize the forwarder with request.  
2474 - if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) {  
2475 - return ret;  
2476 - } 2480 + SrsForwarder* forwarder = new SrsForwarder(this);
  2481 + forwarders.push_back(forwarder);
2477 2482
2478 - double queue_size = _srs_config->get_queue_length(_req->vhost);  
2479 - forwarder->set_queue_size(queue_size); 2483 + // initialize the forwarder with request.
  2484 + if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) {
  2485 + return ret;
  2486 + }
2480 2487
2481 - if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {  
2482 - srs_error("start forwarder failed. "  
2483 - "vhost=%s, app=%s, stream=%s, forward-to=%s",  
2484 - _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),  
2485 - forward_server.c_str());  
2486 - return ret; 2488 + double queue_size = _srs_config->get_queue_length(_req->vhost);
  2489 + forwarder->set_queue_size(queue_size);
  2490 +
  2491 + if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) {
  2492 + srs_error("start forwarder failed. "
  2493 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  2494 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
  2495 + forward_server.c_str());
  2496 + return ret;
  2497 + }
  2498 + break;
2487 } 2499 }
2488 2500
2489 return ret; 2501 return ret;
@@ -1764,6 +1764,11 @@ void SrsRequest::update_auth(SrsRequest* req) @@ -1764,6 +1764,11 @@ void SrsRequest::update_auth(SrsRequest* req)
1764 pageUrl = req->pageUrl; 1764 pageUrl = req->pageUrl;
1765 swfUrl = req->swfUrl; 1765 swfUrl = req->swfUrl;
1766 tcUrl = req->tcUrl; 1766 tcUrl = req->tcUrl;
  1767 +
  1768 + vhost = req->vhost;
  1769 + app = req->app;
  1770 + param = req->param;
  1771 + port = req->port;
1767 1772
1768 if (args) { 1773 if (args) {
1769 srs_freep(args); 1774 srs_freep(args);