winlin

use tcp client for raw connect.

@@ -119,9 +119,8 @@ SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, Sr @@ -119,9 +119,8 @@ SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, Sr
119 { 119 {
120 120
121 req = NULL; 121 req = NULL;
122 - io = NULL; 122 + transport = new SrsTcpClient();
123 client = NULL; 123 client = NULL;
124 - stfd = NULL;  
125 stream_id = 0; 124 stream_id = 0;
126 125
127 pprint = SrsPithyPrint::create_caster(); 126 pprint = SrsPithyPrint::create_caster();
@@ -131,6 +130,7 @@ SrsDynamicHttpConn::~SrsDynamicHttpConn() @@ -131,6 +130,7 @@ SrsDynamicHttpConn::~SrsDynamicHttpConn()
131 { 130 {
132 close(); 131 close();
133 132
  133 + srs_freep(transport);
134 srs_freep(pprint); 134 srs_freep(pprint);
135 } 135 }
136 136
@@ -261,7 +261,7 @@ int SrsDynamicHttpConn::connect() @@ -261,7 +261,7 @@ int SrsDynamicHttpConn::connect()
261 261
262 // when ok, ignore. 262 // when ok, ignore.
263 // TODO: FIXME: should reconnect when disconnected. 263 // TODO: FIXME: should reconnect when disconnected.
264 - if (io || client) { 264 + if (transport->connected()) {
265 return ret; 265 return ret;
266 } 266 }
267 267
@@ -273,12 +273,10 @@ int SrsDynamicHttpConn::connect() @@ -273,12 +273,10 @@ int SrsDynamicHttpConn::connect()
273 } 273 }
274 274
275 // connect host. 275 // connect host.
276 - if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {  
277 - srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); 276 + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
278 return ret; 277 return ret;
279 } 278 }
280 - io = new SrsStSocket(stfd);  
281 - client = new SrsRtmpClient(io); 279 + client = new SrsRtmpClient(transport);
282 280
283 client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); 281 client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
284 client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); 282 client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
@@ -360,10 +358,10 @@ int SrsDynamicHttpConn::connect_app(string ep_server, int ep_port) @@ -360,10 +358,10 @@ int SrsDynamicHttpConn::connect_app(string ep_server, int ep_port)
360 358
361 void SrsDynamicHttpConn::close() 359 void SrsDynamicHttpConn::close()
362 { 360 {
  361 + transport->close();
  362 +
363 srs_freep(client); 363 srs_freep(client);
364 - srs_freep(io);  
365 srs_freep(req); 364 srs_freep(req);
366 - srs_close_stfd(stfd);  
367 } 365 }
368 366
369 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) 367 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
@@ -43,6 +43,7 @@ class SrsRequest; @@ -43,6 +43,7 @@ class SrsRequest;
43 class SrsPithyPrint; 43 class SrsPithyPrint;
44 class ISrsHttpResponseReader; 44 class ISrsHttpResponseReader;
45 class SrsFlvDecoder; 45 class SrsFlvDecoder;
  46 +class SrsTcpClient;
46 47
47 #include <srs_app_st.hpp> 48 #include <srs_app_st.hpp>
48 #include <srs_app_listener.hpp> 49 #include <srs_app_listener.hpp>
@@ -86,8 +87,7 @@ private: @@ -86,8 +87,7 @@ private:
86 SrsPithyPrint* pprint; 87 SrsPithyPrint* pprint;
87 private: 88 private:
88 SrsRequest* req; 89 SrsRequest* req;
89 - st_netfd_t stfd;  
90 - SrsStSocket* io; 90 + SrsTcpClient* transport;
91 SrsRtmpClient* client; 91 SrsRtmpClient* client;
92 int stream_id; 92 int stream_id;
93 public: 93 public:
@@ -28,6 +28,7 @@ using namespace std; @@ -28,6 +28,7 @@ using namespace std;
28 28
29 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
30 #include <srs_kernel_log.hpp> 30 #include <srs_kernel_log.hpp>
  31 +#include <srs_app_utility.hpp>
31 32
32 namespace internal 33 namespace internal
33 { 34 {
@@ -411,18 +412,106 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) @@ -411,18 +412,106 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
411 412
412 SrsTcpClient::SrsTcpClient() 413 SrsTcpClient::SrsTcpClient()
413 { 414 {
  415 + io = NULL;
  416 + stfd = NULL;
414 } 417 }
415 418
416 SrsTcpClient::~SrsTcpClient() 419 SrsTcpClient::~SrsTcpClient()
417 { 420 {
  421 + close();
  422 +}
  423 +
  424 +bool SrsTcpClient::connected()
  425 +{
  426 + return io;
418 } 427 }
419 428
420 int SrsTcpClient::connect(string host, int port, int64_t timeout) 429 int SrsTcpClient::connect(string host, int port, int64_t timeout)
421 { 430 {
422 int ret = ERROR_SUCCESS; 431 int ret = ERROR_SUCCESS;
  432 +
  433 + // when connected, ignore.
  434 + if (io) {
  435 + return ret;
  436 + }
  437 +
  438 + // connect host.
  439 + if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {
  440 + srs_error("mpegts: connect server %s:%d failed. ret=%d", host.c_str(), port, ret);
  441 + return ret;
  442 + }
  443 +
  444 + io = new SrsStSocket(stfd);
  445 +
423 return ret; 446 return ret;
424 } 447 }
425 448
  449 +void SrsTcpClient::close()
  450 +{
  451 + // when closed, ignore.
  452 + if (!io) {
  453 + return;
  454 + }
  455 +
  456 + srs_freep(io);
  457 + srs_close_stfd(stfd);
  458 +}
  459 +
  460 +bool SrsTcpClient::is_never_timeout(int64_t timeout_us)
  461 +{
  462 + return io->is_never_timeout(timeout_us);
  463 +}
  464 +
  465 +void SrsTcpClient::set_recv_timeout(int64_t timeout_us)
  466 +{
  467 + io->set_recv_timeout(timeout_us);
  468 +}
  469 +
  470 +int64_t SrsTcpClient::get_recv_timeout()
  471 +{
  472 + return io->get_recv_timeout();
  473 +}
  474 +
  475 +void SrsTcpClient::set_send_timeout(int64_t timeout_us)
  476 +{
  477 + io->set_send_timeout(timeout_us);
  478 +}
  479 +
  480 +int64_t SrsTcpClient::get_send_timeout()
  481 +{
  482 + return io->get_send_timeout();
  483 +}
  484 +
  485 +int64_t SrsTcpClient::get_recv_bytes()
  486 +{
  487 + return io->get_recv_bytes();
  488 +}
  489 +
  490 +int64_t SrsTcpClient::get_send_bytes()
  491 +{
  492 + return io->get_send_bytes();
  493 +}
  494 +
  495 +int SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
  496 +{
  497 + return io->read(buf, size, nread);
  498 +}
  499 +
  500 +int SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
  501 +{
  502 + return io->read_fully(buf, size, nread);
  503 +}
  504 +
  505 +int SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
  506 +{
  507 + return io->write(buf, size, nwrite);
  508 +}
  509 +
  510 +int SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
  511 +{
  512 + return io->writev(iov, iov_size, nwrite);
  513 +}
  514 +
426 #ifdef __linux__ 515 #ifdef __linux__
427 #include <sys/epoll.h> 516 #include <sys/epoll.h>
428 517
@@ -208,19 +208,46 @@ public: @@ -208,19 +208,46 @@ public:
208 * the common tcp client, to connect to specified TCP server, 208 * the common tcp client, to connect to specified TCP server,
209 * reconnect and close the connection. 209 * reconnect and close the connection.
210 */ 210 */
211 -class SrsTcpClient 211 +class SrsTcpClient : public ISrsProtocolReaderWriter
212 { 212 {
  213 +private:
  214 + st_netfd_t stfd;
  215 + SrsStSocket* io;
213 public: 216 public:
214 SrsTcpClient(); 217 SrsTcpClient();
215 virtual ~SrsTcpClient(); 218 virtual ~SrsTcpClient();
216 public: 219 public:
217 /** 220 /**
  221 + * whether connected to server.
  222 + */
  223 + virtual bool connected();
  224 +public:
  225 + /**
218 * connect to server over TCP. 226 * connect to server over TCP.
219 * @param host the ip or hostname of server. 227 * @param host the ip or hostname of server.
220 * @param port the port to connect to. 228 * @param port the port to connect to.
221 * @param timeout the timeout in us. 229 * @param timeout the timeout in us.
  230 + * @remark ignore when connected.
222 */ 231 */
223 virtual int connect(std::string host, int port, int64_t timeout); 232 virtual int connect(std::string host, int port, int64_t timeout);
  233 + /**
  234 + * close the connection.
  235 + * @remark ignore when closed.
  236 + */
  237 + virtual void close();
  238 +// interface ISrsProtocolReaderWriter
  239 +public:
  240 + virtual bool is_never_timeout(int64_t timeout_us);
  241 + virtual void set_recv_timeout(int64_t timeout_us);
  242 + virtual int64_t get_recv_timeout();
  243 + virtual void set_send_timeout(int64_t timeout_us);
  244 + virtual int64_t get_send_timeout();
  245 + virtual int64_t get_recv_bytes();
  246 + virtual int64_t get_send_bytes();
  247 + virtual int read(void* buf, size_t size, ssize_t* nread);
  248 + virtual int read_fully(void* buf, size_t size, ssize_t* nread);
  249 + virtual int write(void* buf, size_t size, ssize_t* nwrite);
  250 + virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
224 }; 251 };
225 252
226 // initialize st, requires epoll. 253 // initialize st, requires epoll.