winlin

refine tcp client connect, extract to utility srs_socket_connect. 0.9.141

@@ -45,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -45,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
45 #include <srs_app_kbps.hpp> 45 #include <srs_app_kbps.hpp>
46 #include <srs_kernel_utility.hpp> 46 #include <srs_kernel_utility.hpp>
47 #include <srs_protocol_msg_array.hpp> 47 #include <srs_protocol_msg_array.hpp>
  48 +#include <srs_app_utility.hpp>
48 49
49 // when error, edge ingester sleep for a while and retry. 50 // when error, edge ingester sleep for a while and retry.
50 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) 51 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@@ -292,50 +293,23 @@ int SrsEdgeIngester::connect_server() @@ -292,50 +293,23 @@ int SrsEdgeIngester::connect_server()
292 server = server.substr(0, pos); 293 server = server.substr(0, pos);
293 port = ::atoi(s_port.c_str()); 294 port = ::atoi(s_port.c_str());
294 } 295 }
295 -  
296 - // open socket.  
297 - // TODO: FIXME: extract utility method  
298 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
299 - if(sock == -1){  
300 - ret = ERROR_SOCKET_CREATE;  
301 - srs_error("create socket error. ret=%d", ret);  
302 - return ret;  
303 - }  
304 296
305 - srs_assert(!stfd);  
306 - stfd = st_netfd_open_socket(sock);  
307 - if(stfd == NULL){  
308 - ret = ERROR_ST_OPEN_SOCKET;  
309 - srs_error("st_netfd_open_socket failed. ret=%d", ret); 297 + // open socket.
  298 + int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;
  299 + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
  300 + srs_warn("edge ingester failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
  301 + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
310 return ret; 302 return ret;
311 } 303 }
312 304
313 srs_freep(client); 305 srs_freep(client);
314 srs_freep(io); 306 srs_freep(io);
315 307
  308 + srs_assert(stfd);
316 io = new SrsSocket(stfd); 309 io = new SrsSocket(stfd);
317 client = new SrsRtmpClient(io); 310 client = new SrsRtmpClient(io);
318 - kbps->set_io(io, io);  
319 -  
320 - // connect to server.  
321 - std::string ip = srs_dns_resolve(server);  
322 - if (ip.empty()) {  
323 - ret = ERROR_SYSTEM_IP_INVALID;  
324 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
325 - return ret;  
326 - }  
327 311
328 - sockaddr_in addr;  
329 - addr.sin_family = AF_INET;  
330 - addr.sin_port = htons(port);  
331 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
332 -  
333 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_INGESTER_TIMEOUT_US) == -1){  
334 - ret = ERROR_ST_CONNECT;  
335 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
336 - return ret;  
337 - }  
338 - srs_info("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); 312 + kbps->set_io(io, io);
339 313
340 srs_trace("edge connected, can_publish=%d, url=%s/%s, server=%s:%d", 314 srs_trace("edge connected, can_publish=%d, url=%s/%s, server=%s:%d",
341 _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port); 315 _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port);
@@ -576,51 +550,16 @@ int SrsEdgeForwarder::connect_server() @@ -576,51 +550,16 @@ int SrsEdgeForwarder::connect_server()
576 } 550 }
577 551
578 // open socket. 552 // open socket.
579 - srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d",  
580 - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);  
581 -  
582 - // TODO: FIXME: extract utility method  
583 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
584 - if(sock == -1){  
585 - ret = ERROR_SOCKET_CREATE;  
586 - srs_error("create socket error. ret=%d", ret); 553 + int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US;
  554 + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
  555 + srs_warn("edge forwarder failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
  556 + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
587 return ret; 557 return ret;
588 } 558 }
589 559
590 - srs_assert(!stfd);  
591 - stfd = st_netfd_open_socket(sock);  
592 - if(stfd == NULL){  
593 - ret = ERROR_ST_OPEN_SOCKET;  
594 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
595 - return ret;  
596 - }  
597 -  
598 - srs_freep(client);  
599 - srs_freep(io);  
600 -  
601 - io = new SrsSocket(stfd);  
602 - client = new SrsRtmpClient(io);  
603 - kbps->set_io(io, io);  
604 -  
605 - // connect to server.  
606 - std::string ip = srs_dns_resolve(server);  
607 - if (ip.empty()) {  
608 - ret = ERROR_SYSTEM_IP_INVALID;  
609 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
610 - return ret;  
611 - }  
612 -  
613 - sockaddr_in addr;  
614 - addr.sin_family = AF_INET;  
615 - addr.sin_port = htons(port);  
616 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
617 -  
618 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_FORWARDER_TIMEOUT_US) == -1){  
619 - ret = ERROR_ST_CONNECT;  
620 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
621 - return ret;  
622 - }  
623 - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); 560 + // open socket.
  561 + srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d",
  562 + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
624 563
625 return ret; 564 return ret;
626 } 565 }
@@ -42,6 +42,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -42,6 +42,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
42 #include <srs_app_kbps.hpp> 42 #include <srs_app_kbps.hpp>
43 #include <srs_kernel_utility.hpp> 43 #include <srs_kernel_utility.hpp>
44 #include <srs_protocol_msg_array.hpp> 44 #include <srs_protocol_msg_array.hpp>
  45 +#include <srs_app_utility.hpp>
45 46
46 // when error, forwarder sleep for a while and retry. 47 // when error, forwarder sleep for a while and retry.
47 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) 48 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -255,50 +256,24 @@ int SrsForwarder::connect_server() @@ -255,50 +256,24 @@ int SrsForwarder::connect_server()
255 close_underlayer_socket(); 256 close_underlayer_socket();
256 257
257 // open socket. 258 // open socket.
258 - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",  
259 - stream_name.c_str(), tc_url.c_str(), server.c_str(), port);  
260 -  
261 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
262 - if(sock == -1){  
263 - ret = ERROR_SOCKET_CREATE;  
264 - srs_error("create socket error. ret=%d", ret);  
265 - return ret;  
266 - }  
267 -  
268 - srs_assert(!stfd);  
269 - stfd = st_netfd_open_socket(sock);  
270 - if(stfd == NULL){  
271 - ret = ERROR_ST_OPEN_SOCKET;  
272 - srs_error("st_netfd_open_socket failed. ret=%d", ret); 259 + int64_t timeout = SRS_FORWARDER_SLEEP_US;
  260 + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
  261 + srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
  262 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port, timeout, ret);
273 return ret; 263 return ret;
274 } 264 }
275 265
276 srs_freep(client); 266 srs_freep(client);
277 srs_freep(io); 267 srs_freep(io);
278 268
  269 + srs_assert(stfd);
279 io = new SrsSocket(stfd); 270 io = new SrsSocket(stfd);
280 client = new SrsRtmpClient(io); 271 client = new SrsRtmpClient(io);
281 - kbps->set_io(io, io);  
282 272
283 - // connect to server.  
284 - std::string ip = srs_dns_resolve(server);  
285 - if (ip.empty()) {  
286 - ret = ERROR_SYSTEM_IP_INVALID;  
287 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
288 - return ret;  
289 - }  
290 -  
291 - sockaddr_in addr;  
292 - addr.sin_family = AF_INET;  
293 - addr.sin_port = htons(port);  
294 - addr.sin_addr.s_addr = inet_addr(ip.c_str()); 273 + kbps->set_io(io, io);
295 274
296 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_FORWARDER_SLEEP_US) == -1){  
297 - ret = ERROR_ST_CONNECT;  
298 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
299 - return ret;  
300 - }  
301 - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); 275 + srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
  276 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
302 277
303 return ret; 278 return ret;
304 } 279 }
@@ -34,6 +34,10 @@ using namespace std; @@ -34,6 +34,10 @@ using namespace std;
34 #include <srs_kernel_log.hpp> 34 #include <srs_kernel_log.hpp>
35 #include <srs_app_socket.hpp> 35 #include <srs_app_socket.hpp>
36 #include <srs_kernel_utility.hpp> 36 #include <srs_kernel_utility.hpp>
  37 +#include <srs_app_utility.hpp>
  38 +
  39 +// when error, http client sleep for a while and retry.
  40 +#define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL)
37 41
38 SrsHttpClient::SrsHttpClient() 42 SrsHttpClient::SrsHttpClient()
39 { 43 {
@@ -127,36 +131,14 @@ int SrsHttpClient::connect(SrsHttpUri* uri) @@ -127,36 +131,14 @@ int SrsHttpClient::connect(SrsHttpUri* uri)
127 131
128 disconnect(); 132 disconnect();
129 133
130 - std::string ip = srs_dns_resolve(uri->get_host());  
131 - if (ip.empty()) {  
132 - ret = ERROR_SYSTEM_IP_INVALID;  
133 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
134 - return ret;  
135 - }  
136 -  
137 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
138 - if(sock == -1){  
139 - ret = ERROR_SOCKET_CREATE;  
140 - srs_error("create socket error. ret=%d", ret);  
141 - return ret;  
142 - }  
143 -  
144 - stfd = st_netfd_open_socket(sock);  
145 - if(stfd == NULL){  
146 - ret = ERROR_ST_OPEN_SOCKET;  
147 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
148 - return ret;  
149 - }  
150 -  
151 - sockaddr_in addr;  
152 - addr.sin_family = AF_INET;  
153 - addr.sin_port = htons(uri->get_port());  
154 - addr.sin_addr.s_addr = inet_addr(ip.c_str()); 134 + std::string server = uri->get_host();
  135 + int port = uri->get_port();
155 136
156 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){  
157 - ret = ERROR_ST_CONNECT;  
158 - srs_error("connect to server error. "  
159 - "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret); 137 + // open socket.
  138 + int64_t timeout = SRS_HTTP_CLIENT_SLEEP_US;
  139 + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
  140 + srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d",
  141 + server.c_str(), port, timeout, ret);
160 return ret; 142 return ret;
161 } 143 }
162 srs_info("connect to server success. " 144 srs_info("connect to server success. "
@@ -52,6 +52,7 @@ using namespace std; @@ -52,6 +52,7 @@ using namespace std;
52 #include <srs_kernel_utility.hpp> 52 #include <srs_kernel_utility.hpp>
53 #include <srs_protocol_msg_array.hpp> 53 #include <srs_protocol_msg_array.hpp>
54 #include <srs_protocol_amf0.hpp> 54 #include <srs_protocol_amf0.hpp>
  55 +#include <srs_app_utility.hpp>
55 56
56 // when stream is busy, for example, streaming is already 57 // when stream is busy, for example, streaming is already
57 // publishing, when a new client to request to publish, 58 // publishing, when a new client to request to publish,
@@ -921,7 +922,6 @@ int SrsRtmpConn::check_edge_token_traverse_auth() @@ -921,7 +922,6 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
921 return ret; 922 return ret;
922 } 923 }
923 924
924 -// TODO: FIXME: refine the connect server serials functions.  
925 int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) 925 int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
926 { 926 {
927 int ret = ERROR_SUCCESS; 927 int ret = ERROR_SUCCESS;
@@ -942,39 +942,12 @@ int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) @@ -942,39 +942,12 @@ int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
942 port = ::atoi(s_port.c_str()); 942 port = ::atoi(s_port.c_str());
943 } 943 }
944 944
945 - // connect to server.  
946 - std::string ip = srs_dns_resolve(server);  
947 - if (ip.empty()) {  
948 - ret = ERROR_SYSTEM_IP_INVALID;  
949 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
950 - return ret;  
951 - }  
952 -  
953 // open socket. 945 // open socket.
954 - // TODO: FIXME: extract utility method  
955 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
956 - if(sock == -1){  
957 - ret = ERROR_SOCKET_CREATE;  
958 - srs_error("create socket error. ret=%d", ret);  
959 - return ret;  
960 - }  
961 -  
962 - st_netfd_t stsock = st_netfd_open_socket(sock);  
963 - if(stsock == NULL){  
964 - ret = ERROR_ST_OPEN_SOCKET;  
965 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
966 - return ret;  
967 - }  
968 -  
969 - sockaddr_in addr;  
970 - addr.sin_family = AF_INET;  
971 - addr.sin_port = htons(port);  
972 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
973 -  
974 - if (st_connect(stsock, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US) == -1){  
975 - ret = ERROR_ST_CONNECT;  
976 - srs_close_stfd(stsock);  
977 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); 946 + st_netfd_t stsock = NULL;
  947 + int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US;
  948 + if ((ret = srs_socket_connect(server, port, timeout, &stsock)) != ERROR_SUCCESS) {
  949 + srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
  950 + req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
978 return ret; 951 return ret;
979 } 952 }
980 srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); 953 srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);
@@ -39,6 +39,58 @@ using namespace std; @@ -39,6 +39,58 @@ using namespace std;
39 39
40 #define SRS_LOCAL_LOOP_IP "127.0.0.1" 40 #define SRS_LOCAL_LOOP_IP "127.0.0.1"
41 41
  42 +int srs_socket_connect(std::string server, int port, int64_t timeout, st_netfd_t* pstfd)
  43 +{
  44 + int ret = ERROR_SUCCESS;
  45 +
  46 + *pstfd = NULL;
  47 + st_netfd_t stfd = NULL;
  48 + sockaddr_in addr;
  49 +
  50 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  51 + if(sock == -1){
  52 + ret = ERROR_SOCKET_CREATE;
  53 + srs_error("create socket error. ret=%d", ret);
  54 + return ret;
  55 + }
  56 +
  57 + srs_assert(!stfd);
  58 + stfd = st_netfd_open_socket(sock);
  59 + if(stfd == NULL){
  60 + ret = ERROR_ST_OPEN_SOCKET;
  61 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  62 + return ret;
  63 + }
  64 +
  65 + // connect to server.
  66 + std::string ip = srs_dns_resolve(server);
  67 + if (ip.empty()) {
  68 + ret = ERROR_SYSTEM_IP_INVALID;
  69 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  70 + goto failed;
  71 + }
  72 +
  73 + addr.sin_family = AF_INET;
  74 + addr.sin_port = htons(port);
  75 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  76 +
  77 + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
  78 + ret = ERROR_ST_CONNECT;
  79 + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
  80 + goto failed;
  81 + }
  82 + srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
  83 +
  84 + *pstfd = stfd;
  85 + return ret;
  86 +
  87 +failed:
  88 + if (stfd) {
  89 + srs_close_stfd(stfd);
  90 + }
  91 + return ret;
  92 +}
  93 +
42 int srs_get_log_level(std::string level) 94 int srs_get_log_level(std::string level)
43 { 95 {
44 if ("verbose" == _srs_config->get_log_level()) { 96 if ("verbose" == _srs_config->get_log_level()) {
@@ -36,8 +36,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,8 +36,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 36
37 #include <sys/resource.h> 37 #include <sys/resource.h>
38 38
  39 +#include <srs_app_st.hpp>
  40 +
39 class SrsKbps; 41 class SrsKbps;
40 42
  43 +// client open socket and connect to server.
  44 +extern int srs_socket_connect(std::string server, int port, int64_t timeout, st_netfd_t* pstfd);
  45 +
41 /** 46 /**
42 * convert level in string to log level in int. 47 * convert level in string to log level in int.
43 * @return the log level defined in SrsLogLevel. 48 * @return the log level defined in SrsLogLevel.
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "140" 34 +#define VERSION_REVISION "141"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"