winlin

use the right int type for port.

@@ -360,19 +360,17 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) @@ -360,19 +360,17 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)
360 } 360 }
361 361
362 // select the origin. 362 // select the origin.
  363 + if (true) {
363 std::string server = lb->select(conf->args); 364 std::string server = lb->select(conf->args);
364 - int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);  
365 - srs_parse_hostport(server, server, port);  
366 -  
367 - // output the connected server and port.  
368 - ep_server = server;  
369 - ep_port = port; 365 + ep_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  366 + srs_parse_hostport(server, ep_server, ep_port);
  367 + }
370 368
371 // open socket. 369 // open socket.
372 int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US; 370 int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;
373 - if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { 371 + if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) {
374 srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", 372 srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
375 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); 373 + _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
376 return ret; 374 return ret;
377 } 375 }
378 376
@@ -386,7 +384,7 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) @@ -386,7 +384,7 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)
386 kbps->set_io(io, io); 384 kbps->set_io(io, io);
387 385
388 srs_trace("edge pull connected, url=%s/%s, server=%s:%d", 386 srs_trace("edge pull connected, url=%s/%s, server=%s:%d",
389 - _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port); 387 + _req->tcUrl.c_str(), _req->stream.c_str(), ep_server.c_str(), ep_port);
390 388
391 return ret; 389 return ret;
392 } 390 }
@@ -398,7 +396,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() @@ -398,7 +396,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
398 client = NULL; 396 client = NULL;
399 _edge = NULL; 397 _edge = NULL;
400 _req = NULL; 398 _req = NULL;
401 - origin_index = 0; 399 + lb = new SrsLbRoundRobin();
402 stream_id = 0; 400 stream_id = 0;
403 stfd = NULL; 401 stfd = NULL;
404 pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); 402 pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
@@ -410,6 +408,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder() @@ -410,6 +408,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
410 { 408 {
411 stop(); 409 stop();
412 410
  411 + srs_freep(lb);
413 srs_freep(pthread); 412 srs_freep(pthread);
414 srs_freep(queue); 413 srs_freep(queue);
415 srs_freep(kbps); 414 srs_freep(kbps);
@@ -437,7 +436,8 @@ int SrsEdgeForwarder::start() @@ -437,7 +436,8 @@ int SrsEdgeForwarder::start()
437 436
438 send_error_code = ERROR_SUCCESS; 437 send_error_code = ERROR_SUCCESS;
439 438
440 - std::string ep_server, ep_port; 439 + std::string ep_server;
  440 + int ep_port;
441 if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { 441 if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
442 return ret; 442 return ret;
443 } 443 }
@@ -591,7 +591,7 @@ void SrsEdgeForwarder::close_underlayer_socket() @@ -591,7 +591,7 @@ void SrsEdgeForwarder::close_underlayer_socket()
591 srs_close_stfd(stfd); 591 srs_close_stfd(stfd);
592 } 592 }
593 593
594 -int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port) 594 +int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port)
595 { 595 {
596 int ret = ERROR_SUCCESS; 596 int ret = ERROR_SUCCESS;
597 597
@@ -602,27 +602,17 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port) @@ -602,27 +602,17 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port)
602 srs_assert(conf); 602 srs_assert(conf);
603 603
604 // select the origin. 604 // select the origin.
605 - std::string server = conf->args.at(origin_index % conf->args.size());  
606 - origin_index = (origin_index + 1) % conf->args.size();  
607 -  
608 - std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
609 - int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);  
610 - size_t pos = server.find(":");  
611 - if (pos != std::string::npos) {  
612 - s_port = server.substr(pos + 1);  
613 - server = server.substr(0, pos);  
614 - port = ::atoi(s_port.c_str()); 605 + if (true) {
  606 + std::string server = lb->select(conf->args);
  607 + ep_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  608 + srs_parse_hostport(server, ep_server, ep_port);
615 } 609 }
616 610
617 - // output the connected server and port.  
618 - ep_server = server;  
619 - ep_port = s_port;  
620 -  
621 // open socket. 611 // open socket.
622 int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; 612 int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US;
623 - if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { 613 + if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) {
624 srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", 614 srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
625 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); 615 + _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
626 return ret; 616 return ret;
627 } 617 }
628 618
@@ -637,13 +627,13 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port) @@ -637,13 +627,13 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port)
637 627
638 // open socket. 628 // open socket.
639 srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d", 629 srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d",
640 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); 630 + _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port);
641 631
642 return ret; 632 return ret;
643 } 633 }
644 634
645 // TODO: FIXME: refine the connect_app. 635 // TODO: FIXME: refine the connect_app.
646 -int SrsEdgeForwarder::connect_app(string ep_server, string ep_port) 636 +int SrsEdgeForwarder::connect_app(string ep_server, int ep_port)
647 { 637 {
648 int ret = ERROR_SUCCESS; 638 int ret = ERROR_SUCCESS;
649 639
@@ -685,7 +675,7 @@ int SrsEdgeForwarder::connect_app(string ep_server, string ep_port) @@ -685,7 +675,7 @@ int SrsEdgeForwarder::connect_app(string ep_server, string ep_port)
685 // generate the tcUrl 675 // generate the tcUrl
686 std::string param = ""; 676 std::string param = "";
687 std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param); 677 std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param);
688 - srs_trace("edge forward to %s:%s at %s", ep_server.c_str(), ep_port.c_str(), tc_url.c_str()); 678 + srs_trace("edge forward to %s:%d at %s", ep_server.c_str(), ep_port, tc_url.c_str());
689 679
690 // replace the tcUrl in request, 680 // replace the tcUrl in request,
691 // which will replace the tc_url in client.connect_app(). 681 // which will replace the tc_url in client.connect_app().
@@ -125,7 +125,7 @@ private: @@ -125,7 +125,7 @@ private:
125 ISrsProtocolReaderWriter* io; 125 ISrsProtocolReaderWriter* io;
126 SrsKbps* kbps; 126 SrsKbps* kbps;
127 SrsRtmpClient* client; 127 SrsRtmpClient* client;
128 - int origin_index; 128 + SrsLbRoundRobin* lb;
129 /** 129 /**
130 * we must ensure one thread one fd principle, 130 * we must ensure one thread one fd principle,
131 * that is, a fd must be write/read by the one thread. 131 * that is, a fd must be write/read by the one thread.
@@ -153,8 +153,8 @@ public: @@ -153,8 +153,8 @@ public:
153 virtual int proxy(SrsCommonMessage* msg); 153 virtual int proxy(SrsCommonMessage* msg);
154 private: 154 private:
155 virtual void close_underlayer_socket(); 155 virtual void close_underlayer_socket();
156 - virtual int connect_server(std::string& ep_server, std::string& ep_port);  
157 - virtual int connect_app(std::string ep_server, std::string ep_port); 156 + virtual int connect_server(std::string& ep_server, int& ep_port);
  157 + virtual int connect_app(std::string ep_server, int ep_port);
158 }; 158 };
159 159
160 /** 160 /**
@@ -281,7 +281,7 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir @@ -281,7 +281,7 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir
281 // output stream, to other/self server 281 // output stream, to other/self server
282 // ie. rtmp://localhost:1935/live/livestream_sd 282 // ie. rtmp://localhost:1935/live/livestream_sd
283 output = srs_string_replace(output, "[vhost]", req->vhost); 283 output = srs_string_replace(output, "[vhost]", req->vhost);
284 - output = srs_string_replace(output, "[port]", req->port); 284 + output = srs_string_replace(output, "[port]", srs_int2str(req->port));
285 output = srs_string_replace(output, "[app]", req->app); 285 output = srs_string_replace(output, "[app]", req->app);
286 output = srs_string_replace(output, "[stream]", req->stream); 286 output = srs_string_replace(output, "[stream]", req->stream);
287 output = srs_string_replace(output, "[engine]", engine->arg0()); 287 output = srs_string_replace(output, "[engine]", engine->arg0());
@@ -44,6 +44,7 @@ using namespace std; @@ -44,6 +44,7 @@ using namespace std;
44 #include <srs_protocol_amf0.hpp> 44 #include <srs_protocol_amf0.hpp>
45 #include <srs_kernel_codec.hpp> 45 #include <srs_kernel_codec.hpp>
46 #include <srs_core_autofree.hpp> 46 #include <srs_core_autofree.hpp>
  47 +#include <srs_kernel_utility.hpp>
47 48
48 // when error, forwarder sleep for a while and retry. 49 // when error, forwarder sleep for a while and retry.
49 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) 50 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -105,7 +106,8 @@ int SrsForwarder::on_publish() @@ -105,7 +106,8 @@ int SrsForwarder::on_publish()
105 SrsRequest* req = _req; 106 SrsRequest* req = _req;
106 107
107 // discovery the server port and tcUrl from req and ep_forward. 108 // discovery the server port and tcUrl from req and ep_forward.
108 - std::string server, port, tc_url; 109 + int port;
  110 + std::string server, tc_url;
109 discovery_ep(server, port, tc_url); 111 discovery_ep(server, port, tc_url);
110 112
111 // dead loop check 113 // dead loop check
@@ -228,7 +230,8 @@ int SrsForwarder::cycle() @@ -228,7 +230,8 @@ int SrsForwarder::cycle()
228 { 230 {
229 int ret = ERROR_SUCCESS; 231 int ret = ERROR_SUCCESS;
230 232
231 - std::string ep_server, ep_port; 233 + std::string ep_server;
  234 + int ep_port;
232 if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { 235 if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
233 return ret; 236 return ret;
234 } 237 }
@@ -273,25 +276,18 @@ void SrsForwarder::close_underlayer_socket() @@ -273,25 +276,18 @@ void SrsForwarder::close_underlayer_socket()
273 srs_close_stfd(stfd); 276 srs_close_stfd(stfd);
274 } 277 }
275 278
276 -void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url) 279 +void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url)
277 { 280 {
278 SrsRequest* req = _req; 281 SrsRequest* req = _req;
279 282
280 - server = _ep_forward;  
281 port = SRS_CONSTS_RTMP_DEFAULT_PORT; 283 port = SRS_CONSTS_RTMP_DEFAULT_PORT;
282 -  
283 - // TODO: FIXME: parse complex params  
284 - size_t pos = _ep_forward.find(":");  
285 - if (pos != std::string::npos) {  
286 - port = _ep_forward.substr(pos + 1);  
287 - server = _ep_forward.substr(0, pos);  
288 - } 284 + srs_parse_hostport(_ep_forward, server, port);
289 285
290 // generate tcUrl 286 // generate tcUrl
291 tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param); 287 tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
292 } 288 }
293 289
294 -int SrsForwarder::connect_server(string& ep_server, string& ep_port) 290 +int SrsForwarder::connect_server(string& ep_server, int& ep_port)
295 { 291 {
296 int ret = ERROR_SUCCESS; 292 int ret = ERROR_SUCCESS;
297 293
@@ -299,19 +295,14 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port) @@ -299,19 +295,14 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port)
299 close_underlayer_socket(); 295 close_underlayer_socket();
300 296
301 // discovery the server port and tcUrl from req and ep_forward. 297 // discovery the server port and tcUrl from req and ep_forward.
302 - std::string server, s_port, tc_url;  
303 - discovery_ep(server, s_port, tc_url);  
304 - int port = ::atoi(s_port.c_str());  
305 -  
306 - // output the connected server and port.  
307 - ep_server = server;  
308 - ep_port = s_port; 298 + string tc_url;
  299 + discovery_ep(ep_server, ep_port, tc_url);
309 300
310 // open socket. 301 // open socket.
311 int64_t timeout = SRS_FORWARDER_SLEEP_US; 302 int64_t timeout = SRS_FORWARDER_SLEEP_US;
312 - if ((ret = srs_socket_connect(ep_server, port, timeout, &stfd)) != ERROR_SUCCESS) { 303 + if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) {
313 srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", 304 srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
314 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); 305 + _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
315 return ret; 306 return ret;
316 } 307 }
317 308
@@ -325,13 +316,13 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port) @@ -325,13 +316,13 @@ int SrsForwarder::connect_server(string& ep_server, string& ep_port)
325 kbps->set_io(io, io); 316 kbps->set_io(io, io);
326 317
327 srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", 318 srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
328 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); 319 + _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port);
329 320
330 return ret; 321 return ret;
331 } 322 }
332 323
333 // TODO: FIXME: refine the connect_app. 324 // TODO: FIXME: refine the connect_app.
334 -int SrsForwarder::connect_app(string ep_server, string ep_port) 325 +int SrsForwarder::connect_app(string ep_server, int ep_port)
335 { 326 {
336 int ret = ERROR_SUCCESS; 327 int ret = ERROR_SUCCESS;
337 328
@@ -100,9 +100,9 @@ public: @@ -100,9 +100,9 @@ public:
100 virtual int cycle(); 100 virtual int cycle();
101 private: 101 private:
102 virtual void close_underlayer_socket(); 102 virtual void close_underlayer_socket();
103 - virtual void discovery_ep(std::string& server, std::string& port, std::string& tc_url);  
104 - virtual int connect_server(std::string& ep_server, std::string& ep_port);  
105 - virtual int connect_app(std::string ep_server, std::string ep_port); 103 + virtual void discovery_ep(std::string& server, int& port, std::string& tc_url);
  104 + virtual int connect_server(std::string& ep_server, int& ep_port);
  105 + virtual int connect_app(std::string ep_server, int ep_port);
106 virtual int forward(); 106 virtual int forward();
107 }; 107 };
108 108
@@ -332,13 +332,13 @@ int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, S @@ -332,13 +332,13 @@ int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, S
332 { 332 {
333 int ret = ERROR_SUCCESS; 333 int ret = ERROR_SUCCESS;
334 334
335 - std::string port; 335 + int port;
336 if (true) { 336 if (true) {
337 std::vector<std::string> ip_ports = _srs_config->get_listens(); 337 std::vector<std::string> ip_ports = _srs_config->get_listens();
338 srs_assert(ip_ports.size() > 0); 338 srs_assert(ip_ports.size() > 0);
339 339
340 - std::string ep = ip_ports[0];  
341 std::string ip; 340 std::string ip;
  341 + std::string ep = ip_ports[0];
342 srs_parse_endpoint(ep, ip, port); 342 srs_parse_endpoint(ep, ip, port);
343 } 343 }
344 344
@@ -346,7 +346,7 @@ int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, S @@ -346,7 +346,7 @@ int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, S
346 // output stream, to other/self server 346 // output stream, to other/self server
347 // ie. rtmp://localhost:1935/live/livestream_sd 347 // ie. rtmp://localhost:1935/live/livestream_sd
348 output = srs_string_replace(output, "[vhost]", vhost->arg0()); 348 output = srs_string_replace(output, "[vhost]", vhost->arg0());
349 - output = srs_string_replace(output, "[port]", port); 349 + output = srs_string_replace(output, "[port]", srs_int2str(port));
350 if (output.empty()) { 350 if (output.empty()) {
351 ret = ERROR_ENCODER_NO_OUTPUT; 351 ret = ERROR_ENCODER_NO_OUTPUT;
352 srs_trace("empty output url, ingest=%s. ret=%d", ingest->arg0().c_str(), ret); 352 srs_trace("empty output url, ingest=%s. ret=%d", ingest->arg0().c_str(), ret);
@@ -198,7 +198,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl) @@ -198,7 +198,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl)
198 string output = tmpl; 198 string output = tmpl;
199 199
200 output = srs_string_replace(output, "[vhost]", req->vhost); 200 output = srs_string_replace(output, "[vhost]", req->vhost);
201 - output = srs_string_replace(output, "[port]", req->port); 201 + output = srs_string_replace(output, "[port]", srs_int2str(req->port));
202 output = srs_string_replace(output, "[app]", req->app); 202 output = srs_string_replace(output, "[app]", req->app);
203 output = srs_string_replace(output, "[stream]", req->stream); 203 output = srs_string_replace(output, "[stream]", req->stream);
204 204
@@ -207,7 +207,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl) @@ -207,7 +207,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl)
207 output = srs_string_replace(output, "[pageUrl]", req->pageUrl); 207 output = srs_string_replace(output, "[pageUrl]", req->pageUrl);
208 208
209 if (output.find("[url]") != string::npos) { 209 if (output.find("[url]") != string::npos) {
210 - string url = srs_generate_rtmp_url(req->host, ::atoi(req->port.c_str()), req->vhost, req->app, req->stream); 210 + string url = srs_generate_rtmp_url(req->host, req->port, req->vhost, req->app, req->stream);
211 output = srs_string_replace(output, "[url]", url); 211 output = srs_string_replace(output, "[url]", url);
212 } 212 }
213 213
@@ -159,11 +159,11 @@ int SrsRtmpConn::do_cycle() @@ -159,11 +159,11 @@ int SrsRtmpConn::do_cycle()
159 srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", 159 srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
160 req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str()); 160 req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
161 161
162 - if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) { 162 + if (req->schema.empty() || req->vhost.empty() || req->app.empty()) {
163 ret = ERROR_RTMP_REQ_TCURL; 163 ret = ERROR_RTMP_REQ_TCURL;
164 srs_error("discovery tcUrl failed. " 164 srs_error("discovery tcUrl failed. "
165 - "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",  
166 - req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret); 165 + "tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, ret=%d",
  166 + req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str(), ret);
167 return ret; 167 return ret;
168 } 168 }
169 169
@@ -175,9 +175,9 @@ int SrsRtmpConn::do_cycle() @@ -175,9 +175,9 @@ int SrsRtmpConn::do_cycle()
175 srs_verbose("check vhost success."); 175 srs_verbose("check vhost success.");
176 176
177 srs_trace("connect app, " 177 srs_trace("connect app, "
178 - "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 178 + "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
179 req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 179 req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
180 - req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), 180 + req->schema.c_str(), req->vhost.c_str(), req->port,
181 req->app.c_str(), (req->args? "(obj)":"null")); 181 req->app.c_str(), (req->args? "(obj)":"null"));
182 182
183 // show client identity 183 // show client identity
@@ -1230,9 +1230,10 @@ int SrsRtmpConn::check_edge_token_traverse_auth() @@ -1230,9 +1230,10 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
1230 srs_assert(req); 1230 srs_assert(req);
1231 1231
1232 st_netfd_t stsock = NULL; 1232 st_netfd_t stsock = NULL;
1233 - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);  
1234 - for (int i = 0; i < (int)conf->args.size(); i++) {  
1235 - if ((ret = connect_server(i, &stsock)) == ERROR_SUCCESS) { 1233 + vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args;
  1234 + for (int i = 0; i < (int)args.size(); i++) {
  1235 + string hostport = args.at(i);
  1236 + if ((ret = connect_server(hostport, &stsock)) == ERROR_SUCCESS) {
1236 break; 1237 break;
1237 } 1238 }
1238 } 1239 }
@@ -1254,7 +1255,7 @@ int SrsRtmpConn::check_edge_token_traverse_auth() @@ -1254,7 +1255,7 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
1254 return ret; 1255 return ret;
1255 } 1256 }
1256 1257
1257 -int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) 1258 +int SrsRtmpConn::connect_server(string hostport, st_netfd_t* pstsock)
1258 { 1259 {
1259 int ret = ERROR_SUCCESS; 1260 int ret = ERROR_SUCCESS;
1260 1261
@@ -1262,17 +1263,9 @@ int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) @@ -1262,17 +1263,9 @@ int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
1262 srs_assert(conf); 1263 srs_assert(conf);
1263 1264
1264 // select the origin. 1265 // select the origin.
1265 - std::string server = conf->args.at(origin_index % conf->args.size());  
1266 - origin_index = (origin_index + 1) % conf->args.size();  
1267 -  
1268 - std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
1269 - int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);  
1270 - size_t pos = server.find(":");  
1271 - if (pos != std::string::npos) {  
1272 - s_port = server.substr(pos + 1);  
1273 - server = server.substr(0, pos);  
1274 - port = ::atoi(s_port.c_str());  
1275 - } 1266 + string server;
  1267 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  1268 + srs_parse_hostport(hostport, server, port);
1276 1269
1277 // open socket. 1270 // open socket.
1278 st_netfd_t stsock = NULL; 1271 st_netfd_t stsock = NULL;
@@ -30,6 +30,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,6 +30,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
  33 +#include <string>
  34 +
33 #include <srs_app_st.hpp> 35 #include <srs_app_st.hpp>
34 #include <srs_app_conn.hpp> 36 #include <srs_app_conn.hpp>
35 #include <srs_app_reload.hpp> 37 #include <srs_app_reload.hpp>
@@ -132,7 +134,7 @@ private: @@ -132,7 +134,7 @@ private:
132 virtual void set_sock_options(); 134 virtual void set_sock_options();
133 private: 135 private:
134 virtual int check_edge_token_traverse_auth(); 136 virtual int check_edge_token_traverse_auth();
135 - virtual int connect_server(int origin_index, st_netfd_t* pstsock); 137 + virtual int connect_server(std::string hostport, st_netfd_t* pstsock);
136 virtual int do_token_traverse_auth(SrsRtmpClient* client); 138 virtual int do_token_traverse_auth(SrsRtmpClient* client);
137 /** 139 /**
138 * when the connection disconnect, call this method. 140 * when the connection disconnect, call this method.
@@ -205,29 +205,6 @@ string srs_path_build_timestamp(string template_path) @@ -205,29 +205,6 @@ string srs_path_build_timestamp(string template_path)
205 return path; 205 return path;
206 } 206 }
207 207
208 -void srs_parse_endpoint(string ip_port, string& ip, string& port)  
209 -{  
210 - ip = "0.0.0.0";  
211 - port = ip_port;  
212 -  
213 - size_t pos = string::npos;  
214 - if ((pos = port.find(":")) != string::npos) {  
215 - ip = port.substr(0, pos);  
216 - port = port.substr(pos + 1);  
217 - }  
218 -}  
219 -  
220 -void srs_parse_endpoint(string ip_port, string& ip, int& port)  
221 -{  
222 - std::string the_port;  
223 - srs_parse_endpoint(ip_port, ip, the_port);  
224 - port = ::atoi(the_port.c_str());  
225 -}  
226 -  
227 -string srs_bool2switch(bool v) {  
228 - return v? "on" : "off";  
229 -}  
230 -  
231 int srs_kill_forced(int& pid) 208 int srs_kill_forced(int& pid)
232 { 209 {
233 int ret = ERROR_SUCCESS; 210 int ret = ERROR_SUCCESS;
@@ -76,18 +76,6 @@ extern std::string srs_path_build_stream(std::string template_path, std::string @@ -76,18 +76,6 @@ extern std::string srs_path_build_stream(std::string template_path, std::string
76 extern std::string srs_path_build_timestamp(std::string template_path); 76 extern std::string srs_path_build_timestamp(std::string template_path);
77 77
78 /** 78 /**
79 -* parse the endpoint to ip and port.  
80 -* @param ip_port the ip and port which formats in <[ip:]port>  
81 - */  
82 -extern void srs_parse_endpoint(std::string ip_port, std::string& ip, std::string& port);  
83 -extern void srs_parse_endpoint(std::string ip_port, std::string& ip, int& port);  
84 -  
85 -/**  
86 - * convert bool to switch value, true to "on", false to "off".  
87 - */  
88 -extern std::string srs_bool2switch(bool v);  
89 -  
90 -/**  
91 * kill the pid by SIGINT, then wait to quit, 79 * kill the pid by SIGINT, then wait to quit,
92 * kill the pid by SIGKILL again when exceed the timeout. 80 * kill the pid by SIGKILL again when exceed the timeout.
93 * @param pid the pid to kill. ignore for -1. set to -1 when killed. 81 * @param pid the pid to kill. ignore for -1. set to -1 when killed.
@@ -44,7 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -44,7 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
44 #define SRS_CONSTS_RTMP_DEFAULT_VHOST "__defaultVhost__" 44 #define SRS_CONSTS_RTMP_DEFAULT_VHOST "__defaultVhost__"
45 #define SRS_CONSTS_RTMP_DEFAULT_APP "__defaultApp__" 45 #define SRS_CONSTS_RTMP_DEFAULT_APP "__defaultApp__"
46 // default port of rtmp 46 // default port of rtmp
47 -#define SRS_CONSTS_RTMP_DEFAULT_PORT "1935" 47 +#define SRS_CONSTS_RTMP_DEFAULT_PORT 1935
48 48
49 // the default chunk size for system. 49 // the default chunk size for system.
50 #define SRS_CONSTS_RTMP_SRS_CHUNK_SIZE 60000 50 #define SRS_CONSTS_RTMP_SRS_CHUNK_SIZE 60000
@@ -180,13 +180,27 @@ string srs_dns_resolve(string host) @@ -180,13 +180,27 @@ string srs_dns_resolve(string host)
180 180
181 void srs_parse_hostport(const string& hostport, string& host, int& port) 181 void srs_parse_hostport(const string& hostport, string& host, int& port)
182 { 182 {
183 - host = hostport;  
184 -  
185 size_t pos = hostport.find(":"); 183 size_t pos = hostport.find(":");
186 if (pos != std::string::npos) { 184 if (pos != std::string::npos) {
187 string p = hostport.substr(pos + 1); 185 string p = hostport.substr(pos + 1);
188 host = hostport.substr(0, pos); 186 host = hostport.substr(0, pos);
189 port = ::atoi(p.c_str()); 187 port = ::atoi(p.c_str());
  188 + } else {
  189 + host = hostport;
  190 + }
  191 +}
  192 +
  193 +void srs_parse_endpoint(string hostport, string& ip, int& port)
  194 +{
  195 + ip = "0.0.0.0";
  196 +
  197 + size_t pos = string::npos;
  198 + if ((pos = hostport.find(":")) != string::npos) {
  199 + ip = hostport.substr(0, pos);
  200 + string sport = hostport.substr(pos + 1);
  201 + port = ::atoi(sport.c_str());
  202 + } else {
  203 + port = ::atoi(hostport.c_str());
190 } 204 }
191 } 205 }
192 206
@@ -206,6 +220,10 @@ string srs_float2str(double value) @@ -206,6 +220,10 @@ string srs_float2str(double value)
206 return tmp; 220 return tmp;
207 } 221 }
208 222
  223 +string srs_bool2switch(bool v) {
  224 + return v? "on" : "off";
  225 +}
  226 +
209 bool srs_is_little_endian() 227 bool srs_is_little_endian()
210 { 228 {
211 // convert to network(big-endian) order, if not equals, 229 // convert to network(big-endian) order, if not equals,
@@ -52,13 +52,21 @@ extern int64_t srs_update_system_time_ms(); @@ -52,13 +52,21 @@ extern int64_t srs_update_system_time_ms();
52 52
53 // dns resolve utility, return the resolved ip address. 53 // dns resolve utility, return the resolved ip address.
54 extern std::string srs_dns_resolve(std::string host); 54 extern std::string srs_dns_resolve(std::string host);
  55 +
55 // split the host:port to host and port. 56 // split the host:port to host and port.
  57 +// @remark the hostport format in <host[:port]>, where port is optional.
56 extern void srs_parse_hostport(const std::string& hostport, std::string& host, int& port); 58 extern void srs_parse_hostport(const std::string& hostport, std::string& host, int& port);
57 59
  60 +// parse the endpoint to ip and port.
  61 +// @remark hostport format in <[ip:]port>, where ip is default to "0.0.0.0".
  62 +extern void srs_parse_endpoint(std::string hostport, std::string& ip, int& port);
  63 +
58 // parse the int64 value to string. 64 // parse the int64 value to string.
59 extern std::string srs_int2str(int64_t value); 65 extern std::string srs_int2str(int64_t value);
60 // parse the float value to string, precise is 2. 66 // parse the float value to string, precise is 2.
61 extern std::string srs_float2str(double value); 67 extern std::string srs_float2str(double value);
  68 +// convert bool to switch value, true to "on", false to "off".
  69 +extern std::string srs_bool2switch(bool v);
62 70
63 // whether system is little endian 71 // whether system is little endian
64 extern bool srs_is_little_endian(); 72 extern bool srs_is_little_endian();
@@ -63,7 +63,7 @@ struct Context @@ -63,7 +63,7 @@ struct Context
63 std::string tcUrl; 63 std::string tcUrl;
64 std::string host; 64 std::string host;
65 std::string ip; 65 std::string ip;
66 - std::string port; 66 + int port;
67 std::string vhost; 67 std::string vhost;
68 std::string app; 68 std::string app;
69 std::string stream; 69 std::string stream;
@@ -509,9 +509,7 @@ int srs_librtmp_context_connect(Context* context) @@ -509,9 +509,7 @@ int srs_librtmp_context_connect(Context* context)
509 srs_assert(context->skt); 509 srs_assert(context->skt);
510 510
511 std::string ip = context->ip; 511 std::string ip = context->ip;
512 - int port = ::atoi(context->port.c_str());  
513 -  
514 - if ((ret = context->skt->connect(ip.c_str(), port)) != ERROR_SUCCESS) { 512 + if ((ret = context->skt->connect(ip.c_str(), context->port)) != ERROR_SUCCESS) {
515 return ret; 513 return ret;
516 } 514 }
517 515
@@ -708,7 +708,7 @@ public: @@ -708,7 +708,7 @@ public:
708 */ 708 */
709 virtual int flush_message_queue(); 709 virtual int flush_message_queue();
710 private: 710 private:
711 - virtual int connect_app(std::string ep_server, std::string ep_port); 711 + virtual int connect_app(std::string ep_server, int ep_port);
712 // close the connected io and rtmp to ready to be re-connect. 712 // close the connected io and rtmp to ready to be re-connect.
713 virtual void close(); 713 virtual void close();
714 }; 714 };
@@ -1235,8 +1235,8 @@ int SrsIngestSrsOutput::connect() @@ -1235,8 +1235,8 @@ int SrsIngestSrsOutput::connect()
1235 } 1235 }
1236 1236
1237 // connect host. 1237 // connect host.
1238 - if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {  
1239 - srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret); 1238 + if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
  1239 + srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret);
1240 return ret; 1240 return ret;
1241 } 1241 }
1242 io = new SrsStSocket(stfd); 1242 io = new SrsStSocket(stfd);
@@ -1270,7 +1270,7 @@ int SrsIngestSrsOutput::connect() @@ -1270,7 +1270,7 @@ int SrsIngestSrsOutput::connect()
1270 } 1270 }
1271 1271
1272 // TODO: FIXME: refine the connect_app. 1272 // TODO: FIXME: refine the connect_app.
1273 -int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port) 1273 +int SrsIngestSrsOutput::connect_app(string ep_server, int ep_port)
1274 { 1274 {
1275 int ret = ERROR_SUCCESS; 1275 int ret = ERROR_SUCCESS;
1276 1276
@@ -44,7 +44,7 @@ using namespace std; @@ -44,7 +44,7 @@ using namespace std;
44 void srs_discovery_tc_url( 44 void srs_discovery_tc_url(
45 string tcUrl, 45 string tcUrl,
46 string& schema, string& host, string& vhost, 46 string& schema, string& host, string& vhost,
47 - string& app, string& port, std::string& param 47 + string& app, int& port, std::string& param
48 ) { 48 ) {
49 size_t pos = std::string::npos; 49 size_t pos = std::string::npos;
50 std::string url = tcUrl; 50 std::string url = tcUrl;
@@ -63,8 +63,7 @@ void srs_discovery_tc_url( @@ -63,8 +63,7 @@ void srs_discovery_tc_url(
63 63
64 port = SRS_CONSTS_RTMP_DEFAULT_PORT; 64 port = SRS_CONSTS_RTMP_DEFAULT_PORT;
65 if ((pos = host.find(":")) != std::string::npos) { 65 if ((pos = host.find(":")) != std::string::npos) {
66 - port = host.substr(pos + 1);  
67 - host = host.substr(0, pos); 66 + srs_parse_hostport(host, host, port);
68 srs_info("discovery host=%s, port=%s", host.c_str(), port.c_str()); 67 srs_info("discovery host=%s, port=%s", host.c_str(), port.c_str());
69 } 68 }
70 69
@@ -127,11 +126,6 @@ void srs_random_generate(char* bytes, int size) @@ -127,11 +126,6 @@ void srs_random_generate(char* bytes, int size)
127 126
128 string srs_generate_tc_url(string ip, string vhost, string app, int port, string param) 127 string srs_generate_tc_url(string ip, string vhost, string app, int port, string param)
129 { 128 {
130 - return srs_generate_tc_url(ip, vhost, app, srs_int2str(port), param);  
131 -}  
132 -  
133 -string srs_generate_tc_url(string ip, string vhost, string app, string port, string param)  
134 -{  
135 string tcUrl = "rtmp://"; 129 string tcUrl = "rtmp://";
136 130
137 if (vhost == SRS_CONSTS_RTMP_DEFAULT_VHOST) { 131 if (vhost == SRS_CONSTS_RTMP_DEFAULT_VHOST) {
@@ -142,7 +136,7 @@ string srs_generate_tc_url(string ip, string vhost, string app, string port, str @@ -142,7 +136,7 @@ string srs_generate_tc_url(string ip, string vhost, string app, string port, str
142 136
143 if (port != SRS_CONSTS_RTMP_DEFAULT_PORT) { 137 if (port != SRS_CONSTS_RTMP_DEFAULT_PORT) {
144 tcUrl += ":"; 138 tcUrl += ":";
145 - tcUrl += port; 139 + tcUrl += srs_int2str(port);
146 } 140 }
147 141
148 tcUrl += "/"; 142 tcUrl += "/";
@@ -58,7 +58,7 @@ class ISrsProtocolReaderWriter; @@ -58,7 +58,7 @@ class ISrsProtocolReaderWriter;
58 extern void srs_discovery_tc_url( 58 extern void srs_discovery_tc_url(
59 std::string tcUrl, 59 std::string tcUrl,
60 std::string& schema, std::string& host, std::string& vhost, 60 std::string& schema, std::string& host, std::string& vhost,
61 - std::string& app, std::string& port, std::string& param 61 + std::string& app, int& port, std::string& param
62 ); 62 );
63 63
64 /** 64 /**
@@ -86,10 +86,6 @@ extern void srs_random_generate(char* bytes, int size); @@ -86,10 +86,6 @@ extern void srs_random_generate(char* bytes, int size);
86 * @remark ignore port if port equals to default port 1935. 86 * @remark ignore port if port equals to default port 1935.
87 */ 87 */
88 extern std::string srs_generate_tc_url( 88 extern std::string srs_generate_tc_url(
89 - std::string ip, std::string vhost, std::string app, std::string port,  
90 - std::string param  
91 -);  
92 -extern std::string srs_generate_tc_url(  
93 std::string ip, std::string vhost, std::string app, int port, 89 std::string ip, std::string vhost, std::string app, int port,
94 std::string param 90 std::string param
95 ); 91 );
@@ -1664,6 +1664,7 @@ SrsRequest::SrsRequest() @@ -1664,6 +1664,7 @@ SrsRequest::SrsRequest()
1664 { 1664 {
1665 objectEncoding = RTMP_SIG_AMF0_VER; 1665 objectEncoding = RTMP_SIG_AMF0_VER;
1666 duration = -1; 1666 duration = -1;
  1667 + port = SRS_CONSTS_RTMP_DEFAULT_PORT;
1667 args = NULL; 1668 args = NULL;
1668 } 1669 }
1669 1670
@@ -554,7 +554,7 @@ public: @@ -554,7 +554,7 @@ public:
554 // the host in tcUrl. 554 // the host in tcUrl.
555 std::string host; 555 std::string host;
556 // the port in tcUrl. 556 // the port in tcUrl.
557 - std::string port; 557 + int port;
558 // the app in tcUrl, without param. 558 // the app in tcUrl, without param.
559 std::string app; 559 std::string app;
560 // the param in tcUrl(app). 560 // the param in tcUrl(app).