winlin

for #742, refine the object live cycle. 3.0.15

@@ -183,6 +183,7 @@ Please select your language: @@ -183,6 +183,7 @@ Please select your language:
183 183
184 ### V3 changes 184 ### V3 changes
185 185
  186 +* v3.0, 2017-01-17, for [#742][bug #742] refine source, timeout, live cycle. 3.0.15
186 * v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14 187 * v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14
187 * v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13 188 * v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13
188 * v3.0, 2017-01-06, for [#711][bug #711] support perfile for transcode. 3.0.12 189 * v3.0, 2017-01-06, for [#711][bug #711] support perfile for transcode. 3.0.12
@@ -1365,7 +1366,8 @@ Winlin @@ -1365,7 +1366,8 @@ Winlin
1365 [bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx 1366 [bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx
1366 1367
1367 [bug #735]: https://github.com/ossrs/srs/issues/735 1368 [bug #735]: https://github.com/ossrs/srs/issues/735
1368 -[bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx 1369 +[bug #742]: https://github.com/ossrs/srs/issues/742
  1370 +[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx
1369 1371
1370 [exo #828]: https://github.com/google/ExoPlayer/pull/828 1372 [exo #828]: https://github.com/google/ExoPlayer/pull/828
1371 1373
@@ -120,7 +120,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -120,7 +120,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
120 SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip) 120 SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
121 : SrsHttpConn(cm, fd, m, cip) 121 : SrsHttpConn(cm, fd, m, cip)
122 { 122 {
123 - sdk = new SrsSimpleRtmpClient(); 123 + sdk = NULL;
124 pprint = SrsPithyPrint::create_caster(); 124 pprint = SrsPithyPrint::create_caster();
125 } 125 }
126 126
@@ -181,9 +181,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -181,9 +181,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
181 { 181 {
182 int ret = ERROR_SUCCESS; 182 int ret = ERROR_SUCCESS;
183 183
  184 + srs_freep(sdk);
  185 +
184 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; 186 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
185 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; 187 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
186 - if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { 188 + sdk = new SrsSimpleRtmpClient(output, cto / 1000, sto / 1000);
  189 +
  190 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
187 srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); 191 srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
188 return ret; 192 return ret;
189 } 193 }
@@ -73,14 +73,12 @@ SrsEdgeUpstream::~SrsEdgeUpstream() @@ -73,14 +73,12 @@ SrsEdgeUpstream::~SrsEdgeUpstream()
73 SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r) 73 SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r)
74 { 74 {
75 redirect = r; 75 redirect = r;
76 - sdk = new SrsSimpleRtmpClient(); 76 + sdk = NULL;
77 } 77 }
78 78
79 SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream() 79 SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream()
80 { 80 {
81 close(); 81 close();
82 -  
83 - srs_freep(sdk);  
84 } 82 }
85 83
86 int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) 84 int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
@@ -126,9 +124,12 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) @@ -126,9 +124,12 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
126 url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); 124 url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);
127 } 125 }
128 126
  127 + srs_freep(sdk);
129 int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US; 128 int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;
130 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; 129 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
131 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { 130 + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
  131 +
  132 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
132 srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 133 srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
133 return ret; 134 return ret;
134 } 135 }
@@ -153,7 +154,7 @@ int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppack @@ -153,7 +154,7 @@ int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppack
153 154
154 void SrsEdgeRtmpUpstream::close() 155 void SrsEdgeRtmpUpstream::close()
155 { 156 {
156 - sdk->close(); 157 + srs_freep(sdk);
157 } 158 }
158 159
159 void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout) 160 void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout)
@@ -406,7 +407,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() @@ -406,7 +407,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
406 req = NULL; 407 req = NULL;
407 send_error_code = ERROR_SUCCESS; 408 send_error_code = ERROR_SUCCESS;
408 409
409 - sdk = new SrsSimpleRtmpClient(); 410 + sdk = NULL;
410 lb = new SrsLbRoundRobin(); 411 lb = new SrsLbRoundRobin();
411 pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); 412 pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
412 queue = new SrsMessageQueue(); 413 queue = new SrsMessageQueue();
@@ -416,7 +417,6 @@ SrsEdgeForwarder::~SrsEdgeForwarder() @@ -416,7 +417,6 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
416 { 417 {
417 stop(); 418 stop();
418 419
419 - srs_freep(sdk);  
420 srs_freep(lb); 420 srs_freep(lb);
421 srs_freep(pthread); 421 srs_freep(pthread);
422 srs_freep(queue); 422 srs_freep(queue);
@@ -464,9 +464,12 @@ int SrsEdgeForwarder::start() @@ -464,9 +464,12 @@ int SrsEdgeForwarder::start()
464 } 464 }
465 465
466 // open socket. 466 // open socket.
  467 + srs_freep(sdk);
467 int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US; 468 int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US;
468 int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; 469 int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
469 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { 470 + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
  471 +
  472 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
470 srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 473 srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
471 return ret; 474 return ret;
472 } 475 }
@@ -482,8 +485,9 @@ int SrsEdgeForwarder::start() @@ -482,8 +485,9 @@ int SrsEdgeForwarder::start()
482 void SrsEdgeForwarder::stop() 485 void SrsEdgeForwarder::stop()
483 { 486 {
484 pthread->stop(); 487 pthread->stop();
485 - sdk->close();  
486 queue->clear(); 488 queue->clear();
  489 +
  490 + srs_freep(sdk);
487 } 491 }
488 492
489 #define SYS_MAX_EDGE_SEND_MSGS 128 493 #define SYS_MAX_EDGE_SEND_MSGS 128
@@ -57,7 +57,7 @@ SrsForwarder::SrsForwarder(SrsSource* s) @@ -57,7 +57,7 @@ SrsForwarder::SrsForwarder(SrsSource* s)
57 req = NULL; 57 req = NULL;
58 sh_video = sh_audio = NULL; 58 sh_video = sh_audio = NULL;
59 59
60 - sdk = new SrsSimpleRtmpClient(); 60 + sdk = NULL;
61 pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US); 61 pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
62 queue = new SrsMessageQueue(); 62 queue = new SrsMessageQueue();
63 jitter = new SrsRtmpJitter(); 63 jitter = new SrsRtmpJitter();
@@ -236,9 +236,12 @@ int SrsForwarder::cycle() @@ -236,9 +236,12 @@ int SrsForwarder::cycle()
236 url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); 236 url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream);
237 } 237 }
238 238
  239 + srs_freep(sdk);
239 int64_t cto = SRS_FORWARDER_SLEEP_US; 240 int64_t cto = SRS_FORWARDER_SLEEP_US;
240 int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; 241 int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
241 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { 242 + sdk = new SrsSimpleRtmpClient(url, cto, sto);
  243 +
  244 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
242 srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 245 srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
243 return ret; 246 return ret;
244 } 247 }
@@ -40,7 +40,7 @@ using namespace std; @@ -40,7 +40,7 @@ using namespace std;
40 40
41 SrsHttpClient::SrsHttpClient() 41 SrsHttpClient::SrsHttpClient()
42 { 42 {
43 - transport = new SrsTcpClient(); 43 + transport = NULL;
44 kbps = new SrsKbps(); 44 kbps = new SrsKbps();
45 parser = NULL; 45 parser = NULL;
46 timeout_us = 0; 46 timeout_us = 0;
@@ -52,19 +52,14 @@ SrsHttpClient::~SrsHttpClient() @@ -52,19 +52,14 @@ SrsHttpClient::~SrsHttpClient()
52 disconnect(); 52 disconnect();
53 53
54 srs_freep(kbps); 54 srs_freep(kbps);
55 - srs_freep(transport);  
56 srs_freep(parser); 55 srs_freep(parser);
57 } 56 }
58 57
  58 +// TODO: FIXME: use ms for timeout.
59 int SrsHttpClient::initialize(string h, int p, int64_t t_us) 59 int SrsHttpClient::initialize(string h, int p, int64_t t_us)
60 { 60 {
61 int ret = ERROR_SUCCESS; 61 int ret = ERROR_SUCCESS;
62 62
63 - // disconnect first when h:p changed.  
64 - if ((!host.empty() && host != h) || (port != 0 && port != p)) {  
65 - disconnect();  
66 - }  
67 -  
68 srs_freep(parser); 63 srs_freep(parser);
69 parser = new SrsHttpParser(); 64 parser = new SrsHttpParser();
70 65
@@ -73,9 +68,11 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us) @@ -73,9 +68,11 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
73 return ret; 68 return ret;
74 } 69 }
75 70
  71 + // Always disconnect the transport.
76 host = h; 72 host = h;
77 port = p; 73 port = p;
78 timeout_us = t_us; 74 timeout_us = t_us;
  75 + disconnect();
79 76
80 // ep used for host in header. 77 // ep used for host in header.
81 string ep = host; 78 string ep = host;
@@ -83,7 +80,7 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us) @@ -83,7 +80,7 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
83 ep += ":" + srs_int2str(port); 80 ep += ":" + srs_int2str(port);
84 } 81 }
85 82
86 - // set default value for headers. 83 + // Set default value for headers.
87 headers["Host"] = ep; 84 headers["Host"] = ep;
88 headers["Connection"] = "Keep-Alive"; 85 headers["Connection"] = "Keep-Alive";
89 headers["User-Agent"] = RTMP_SIG_SRS_SERVER; 86 headers["User-Agent"] = RTMP_SIG_SRS_SERVER;
@@ -126,9 +123,8 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) @@ -126,9 +123,8 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
126 123
127 std::string data = ss.str(); 124 std::string data = ss.str();
128 if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { 125 if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
129 - // disconnect when error. 126 + // Disconnect the transport when channel error, reconnect for next operation.
130 disconnect(); 127 disconnect();
131 -  
132 srs_error("write http post failed. ret=%d", ret); 128 srs_error("write http post failed. ret=%d", ret);
133 return ret; 129 return ret;
134 } 130 }
@@ -138,9 +134,13 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) @@ -138,9 +134,13 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
138 srs_error("parse http post response failed. ret=%d", ret); 134 srs_error("parse http post response failed. ret=%d", ret);
139 return ret; 135 return ret;
140 } 136 }
141 -  
142 srs_assert(msg); 137 srs_assert(msg);
143 - *ppmsg = msg; 138 +
  139 + if (ppmsg) {
  140 + *ppmsg = msg;
  141 + } else {
  142 + srs_freep(msg);
  143 + }
144 srs_info("parse http post response success."); 144 srs_info("parse http post response success.");
145 145
146 return ret; 146 return ret;
@@ -173,9 +173,8 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) @@ -173,9 +173,8 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
173 173
174 std::string data = ss.str(); 174 std::string data = ss.str();
175 if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { 175 if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
176 - // disconnect when error. 176 + // Disconnect the transport when channel error, reconnect for next operation.
177 disconnect(); 177 disconnect();
178 -  
179 srs_error("write http get failed. ret=%d", ret); 178 srs_error("write http get failed. ret=%d", ret);
180 return ret; 179 return ret;
181 } 180 }
@@ -187,7 +186,11 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) @@ -187,7 +186,11 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
187 } 186 }
188 srs_assert(msg); 187 srs_assert(msg);
189 188
190 - *ppmsg = msg; 189 + if (ppmsg) {
  190 + *ppmsg = msg;
  191 + } else {
  192 + srs_freep(msg);
  193 + }
191 srs_info("parse http get response success."); 194 srs_info("parse http get response success.");
192 195
193 return ret; 196 return ret;
@@ -215,23 +218,24 @@ void SrsHttpClient::kbps_sample(const char* label, int64_t age) @@ -215,23 +218,24 @@ void SrsHttpClient::kbps_sample(const char* label, int64_t age)
215 void SrsHttpClient::disconnect() 218 void SrsHttpClient::disconnect()
216 { 219 {
217 kbps->set_io(NULL, NULL); 220 kbps->set_io(NULL, NULL);
  221 +
218 transport->close(); 222 transport->close();
  223 + srs_freep(transport);
219 } 224 }
220 225
221 int SrsHttpClient::connect() 226 int SrsHttpClient::connect()
222 { 227 {
223 int ret = ERROR_SUCCESS; 228 int ret = ERROR_SUCCESS;
224 229
225 - if (transport->connected()) { 230 + // When transport connected, ignore.
  231 + if (transport) {
226 return ret; 232 return ret;
227 } 233 }
228 234
229 - disconnect();  
230 -  
231 - // open socket.  
232 - if ((ret = transport->connect(host, port, timeout_us)) != ERROR_SUCCESS) {  
233 - srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d",  
234 - host.c_str(), port, timeout_us, ret); 235 + transport = new SrsTcpClient(host, port, timeout_us / 1000);
  236 + if ((ret = transport->connect()) != ERROR_SUCCESS) {
  237 + disconnect();
  238 + srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret);
235 return ret; 239 return ret;
236 } 240 }
237 srs_info("connect to server success. server=%s, port=%d", host.c_str(), port); 241 srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
@@ -46,11 +46,19 @@ class SrsKbps; @@ -46,11 +46,19 @@ class SrsKbps;
46 #define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL) 46 #define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL)
47 47
48 /** 48 /**
49 -* http client to GET/POST/PUT/DELETE uri  
50 -*/ 49 + * The client to GET/POST/PUT/DELETE over HTTP.
  50 + * @remark We will reuse the TCP transport until initialize or channel error,
  51 + * such as send/recv failed.
  52 + * Usage:
  53 + * SrsHttpClient hc;
  54 + * hc.initialize("127.0.0.1", 80, 9000);
  55 + * hc.post("/api/v1/version", "Hello world!", NULL);
  56 + */
51 class SrsHttpClient 57 class SrsHttpClient
52 { 58 {
53 private: 59 private:
  60 + // The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.
  61 + // We will disconnect transport when initialize or channel error, such as send/recv error.
54 SrsTcpClient* transport; 62 SrsTcpClient* transport;
55 SrsHttpParser* parser; 63 SrsHttpParser* parser;
56 std::map<std::string, std::string> headers; 64 std::map<std::string, std::string> headers;
@@ -65,12 +73,13 @@ public: @@ -65,12 +73,13 @@ public:
65 virtual ~SrsHttpClient(); 73 virtual ~SrsHttpClient();
66 public: 74 public:
67 /** 75 /**
68 - * initialize the client, connect to host and port. 76 + * Initliaze the client, disconnect the transport, renew the HTTP parser.
69 * @remark we will set default values in headers, which can be override by set_header. 77 * @remark we will set default values in headers, which can be override by set_header.
70 */ 78 */
71 virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US); 79 virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US);
72 /** 80 /**
73 - * set the header[k]=v and return the client itself. 81 + * Set HTTP request header in header[k]=v.
  82 + * @return the HTTP client itself.
74 */ 83 */
75 virtual SrsHttpClient* set_header(std::string k, std::string v); 84 virtual SrsHttpClient* set_header(std::string k, std::string v);
76 public: 85 public:
@@ -135,14 +135,13 @@ SrsKafkaPartition::SrsKafkaPartition() @@ -135,14 +135,13 @@ SrsKafkaPartition::SrsKafkaPartition()
135 id = broker = 0; 135 id = broker = 0;
136 port = SRS_CONSTS_KAFKA_DEFAULT_PORT; 136 port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
137 137
138 - transport = new SrsTcpClient();  
139 - kafka = new SrsKafkaClient(transport); 138 + transport = NULL;
  139 + kafka = NULL;
140 } 140 }
141 141
142 SrsKafkaPartition::~SrsKafkaPartition() 142 SrsKafkaPartition::~SrsKafkaPartition()
143 { 143 {
144 - srs_freep(kafka);  
145 - srs_freep(transport); 144 + disconnect();
146 } 145 }
147 146
148 string SrsKafkaPartition::hostport() 147 string SrsKafkaPartition::hostport()
@@ -158,13 +157,15 @@ int SrsKafkaPartition::connect() @@ -158,13 +157,15 @@ int SrsKafkaPartition::connect()
158 { 157 {
159 int ret = ERROR_SUCCESS; 158 int ret = ERROR_SUCCESS;
160 159
161 - if (transport->connected()) { 160 + if (transport) {
162 return ret; 161 return ret;
163 } 162 }
  163 + transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT);
  164 + kafka = new SrsKafkaClient(transport);
164 165
165 - int64_t timeout = SRS_KAFKA_PRODUCER_TIMEOUT * 1000;  
166 - if ((ret = transport->connect(host, port, timeout)) != ERROR_SUCCESS) {  
167 - srs_error("connect to %s partition=%d failed, timeout=%"PRId64". ret=%d", hostport().c_str(), id, timeout, ret); 166 + if ((ret = transport->connect()) != ERROR_SUCCESS) {
  167 + disconnect();
  168 + srs_error("connect to %s partition=%d failed. ret=%d", hostport().c_str(), id, ret);
168 return ret; 169 return ret;
169 } 170 }
170 171
@@ -178,6 +179,12 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) @@ -178,6 +179,12 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
178 return kafka->write_messages(topic, id, *pc); 179 return kafka->write_messages(topic, id, *pc);
179 } 180 }
180 181
  182 +void SrsKafkaPartition::disconnect()
  183 +{
  184 + srs_freep(kafka);
  185 + srs_freep(transport);
  186 +}
  187 +
181 SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j) 188 SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j)
182 { 189 {
183 producer = p; 190 producer = p;
@@ -562,12 +569,6 @@ int SrsKafkaProducer::request_metadata() @@ -562,12 +569,6 @@ int SrsKafkaProducer::request_metadata()
562 return ret; 569 return ret;
563 } 570 }
564 571
565 - SrsTcpClient* transport = new SrsTcpClient();  
566 - SrsAutoFree(SrsTcpClient, transport);  
567 -  
568 - SrsKafkaClient* kafka = new SrsKafkaClient(transport);  
569 - SrsAutoFree(SrsKafkaClient, kafka);  
570 -  
571 std::string server; 572 std::string server;
572 int port = SRS_CONSTS_KAFKA_DEFAULT_PORT; 573 int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
573 if (true) { 574 if (true) {
@@ -584,8 +585,14 @@ int SrsKafkaProducer::request_metadata() @@ -584,8 +585,14 @@ int SrsKafkaProducer::request_metadata()
584 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); 585 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
585 } 586 }
586 587
  588 + SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US / 1000);
  589 + SrsAutoFree(SrsTcpClient, transport);
  590 +
  591 + SrsKafkaClient* kafka = new SrsKafkaClient(transport);
  592 + SrsAutoFree(SrsKafkaClient, kafka);
  593 +
587 // reconnect to kafka server. 594 // reconnect to kafka server.
588 - if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { 595 + if ((ret = transport->connect()) != ERROR_SUCCESS) {
589 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); 596 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
590 return ret; 597 return ret;
591 } 598 }
@@ -57,6 +57,7 @@ struct SrsKafkaPartition @@ -57,6 +57,7 @@ struct SrsKafkaPartition
57 { 57 {
58 private: 58 private:
59 std::string ep; 59 std::string ep;
  60 + // Not NULL when connected.
60 SrsTcpClient* transport; 61 SrsTcpClient* transport;
61 SrsKafkaClient* kafka; 62 SrsKafkaClient* kafka;
62 public: 63 public:
@@ -73,6 +74,8 @@ public: @@ -73,6 +74,8 @@ public:
73 virtual std::string hostport(); 74 virtual std::string hostport();
74 virtual int connect(); 75 virtual int connect();
75 virtual int flush(SrsKafkaPartitionCache* pc); 76 virtual int flush(SrsKafkaPartitionCache* pc);
  77 +private:
  78 + virtual void disconnect();
76 }; 79 };
77 80
78 /** 81 /**
@@ -133,8 +133,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) @@ -133,8 +133,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
133 buffer = new SrsSimpleStream(); 133 buffer = new SrsSimpleStream();
134 output = _srs_config->get_stream_caster_output(c); 134 output = _srs_config->get_stream_caster_output(c);
135 135
136 - req = NULL;  
137 - sdk = new SrsSimpleRtmpClient(); 136 + sdk = NULL;
138 137
139 avc = new SrsRawH264Stream(); 138 avc = new SrsRawH264Stream();
140 aac = new SrsRawAacStream(); 139 aac = new SrsRawAacStream();
@@ -149,7 +148,6 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() @@ -149,7 +148,6 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
149 { 148 {
150 close(); 149 close();
151 150
152 - srs_freep(sdk);  
153 srs_freep(buffer); 151 srs_freep(buffer);
154 srs_freep(stream); 152 srs_freep(stream);
155 srs_freep(context); 153 srs_freep(context);
@@ -570,6 +568,10 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da @@ -570,6 +568,10 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
570 { 568 {
571 int ret = ERROR_SUCCESS; 569 int ret = ERROR_SUCCESS;
572 570
  571 + if ((ret = connect()) != ERROR_SUCCESS) {
  572 + return ret;
  573 + }
  574 +
573 SrsSharedPtrMessage* msg = NULL; 575 SrsSharedPtrMessage* msg = NULL;
574 576
575 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { 577 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) {
@@ -597,6 +599,7 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da @@ -597,6 +599,7 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
597 599
598 // send out encoded msg. 600 // send out encoded msg.
599 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { 601 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
  602 + close();
600 return ret; 603 return ret;
601 } 604 }
602 } 605 }
@@ -608,20 +611,23 @@ int SrsMpegtsOverUdp::connect() @@ -608,20 +611,23 @@ int SrsMpegtsOverUdp::connect()
608 { 611 {
609 int ret = ERROR_SUCCESS; 612 int ret = ERROR_SUCCESS;
610 613
611 - // when ok, ignore.  
612 - // TODO: FIXME: should reconnect when disconnected.  
613 - if (sdk->connected()) { 614 + // Ignore when connected.
  615 + if (sdk) {
614 return ret; 616 return ret;
615 } 617 }
616 618
617 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; 619 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
618 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; 620 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
619 - if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { 621 + sdk = new SrsSimpleRtmpClient(output, cto/1000, sto/1000);
  622 +
  623 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
  624 + close();
620 srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); 625 srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
621 return ret; 626 return ret;
622 } 627 }
623 628
624 if ((ret = sdk->publish()) != ERROR_SUCCESS) { 629 if ((ret = sdk->publish()) != ERROR_SUCCESS) {
  630 + close();
625 srs_error("mpegts: publish failed. ret=%d", ret); 631 srs_error("mpegts: publish failed. ret=%d", ret);
626 return ret; 632 return ret;
627 } 633 }
@@ -631,8 +637,7 @@ int SrsMpegtsOverUdp::connect() @@ -631,8 +637,7 @@ int SrsMpegtsOverUdp::connect()
631 637
632 void SrsMpegtsOverUdp::close() 638 void SrsMpegtsOverUdp::close()
633 { 639 {
634 - srs_freep(req);  
635 - sdk->close(); 640 + srs_freep(sdk);
636 } 641 }
637 642
638 #endif 643 #endif
@@ -86,7 +86,6 @@ private: @@ -86,7 +86,6 @@ private:
86 SrsSimpleStream* buffer; 86 SrsSimpleStream* buffer;
87 std::string output; 87 std::string output;
88 private: 88 private:
89 - SrsRequest* req;  
90 SrsSimpleRtmpClient* sdk; 89 SrsSimpleRtmpClient* sdk;
91 private: 90 private:
92 SrsRawH264Stream* avc; 91 SrsRawH264Stream* avc;
@@ -121,10 +120,9 @@ private: @@ -121,10 +120,9 @@ private:
121 private: 120 private:
122 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); 121 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
123 private: 122 private:
124 - // connect to rtmp output url.  
125 - // @remark ignore when not connected, reconnect when disconnected. 123 + // Connect to RTMP server.
126 virtual int connect(); 124 virtual int connect();
127 - // close the connected io and rtmp to ready to be re-connect. 125 + // Close the connection to RTMP server.
128 virtual void close(); 126 virtual void close();
129 }; 127 };
130 128
@@ -77,51 +77,45 @@ using namespace std; @@ -77,51 +77,45 @@ using namespace std;
77 // when edge timeout, retry next. 77 // when edge timeout, retry next.
78 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) 78 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
79 79
80 -SrsSimpleRtmpClient::SrsSimpleRtmpClient() 80 +SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm)
81 { 81 {
82 - req = NULL;  
83 - client = NULL;  
84 kbps = new SrsKbps(); 82 kbps = new SrsKbps();
85 83
86 - transport = new SrsTcpClient(); 84 + url = u;
  85 + connect_timeout = ctm;
  86 + stream_timeout = stm;
  87 +
  88 + req = new SrsRequest();
  89 + srs_parse_rtmp_url(url, req->tcUrl, req->stream);
  90 + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);
  91 +
  92 + transport = NULL;
  93 + client = NULL;
  94 +
87 stream_id = 0; 95 stream_id = 0;
88 } 96 }
89 97
90 SrsSimpleRtmpClient::~SrsSimpleRtmpClient() 98 SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
91 { 99 {
92 close(); 100 close();
93 -  
94 srs_freep(kbps); 101 srs_freep(kbps);
95 - srs_freep(transport);  
96 - srs_freep(client);  
97 } 102 }
98 103
99 -int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t stream_timeout) 104 +int SrsSimpleRtmpClient::connect()
100 { 105 {
101 int ret = ERROR_SUCCESS; 106 int ret = ERROR_SUCCESS;
102 107
103 - // when ok, ignore.  
104 - // TODO: FIXME: should reconnect when disconnected.  
105 - if (transport->connected()) {  
106 - return ret;  
107 - } 108 + close();
108 109
109 - // parse uri  
110 - srs_freep(req);  
111 - req = new SrsRequest();  
112 - srs_parse_rtmp_url(url, req->tcUrl, req->stream);  
113 - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); 110 + transport = new SrsTcpClient(req->host, req->port, connect_timeout);
  111 + client = new SrsRtmpClient(transport);
  112 + kbps->set_io(transport, transport);
114 113
115 - // connect host.  
116 - if ((ret = transport->connect(req->host, req->port, connect_timeout)) != ERROR_SUCCESS) { 114 + if ((ret = transport->connect()) != ERROR_SUCCESS) {
  115 + close();
117 return ret; 116 return ret;
118 } 117 }
119 118
120 - srs_freep(client);  
121 - client = new SrsRtmpClient(transport);  
122 -  
123 - kbps->set_io(transport, transport);  
124 -  
125 client->set_recv_timeout(stream_timeout); 119 client->set_recv_timeout(stream_timeout);
126 client->set_send_timeout(stream_timeout); 120 client->set_send_timeout(stream_timeout);
127 121
@@ -142,6 +136,13 @@ int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t st @@ -142,6 +136,13 @@ int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t st
142 return ret; 136 return ret;
143 } 137 }
144 138
  139 +void SrsSimpleRtmpClient::close()
  140 +{
  141 + srs_freep(client);
  142 + srs_freep(transport);
  143 + kbps->set_io(NULL, NULL);
  144 +}
  145 +
145 int SrsSimpleRtmpClient::connect_app() 146 int SrsSimpleRtmpClient::connect_app()
146 { 147 {
147 int ret = ERROR_SUCCESS; 148 int ret = ERROR_SUCCESS;
@@ -197,19 +198,6 @@ int SrsSimpleRtmpClient::connect_app() @@ -197,19 +198,6 @@ int SrsSimpleRtmpClient::connect_app()
197 return ret; 198 return ret;
198 } 199 }
199 200
200 -bool SrsSimpleRtmpClient::connected()  
201 -{  
202 - return transport->connected();  
203 -}  
204 -  
205 -void SrsSimpleRtmpClient::close()  
206 -{  
207 - transport->close();  
208 -  
209 - srs_freep(client);  
210 - srs_freep(req);  
211 -}  
212 -  
213 int SrsSimpleRtmpClient::publish() 201 int SrsSimpleRtmpClient::publish()
214 { 202 {
215 int ret = ERROR_SUCCESS; 203 int ret = ERROR_SUCCESS;
@@ -1464,48 +1452,35 @@ int SrsRtmpConn::check_edge_token_traverse_auth() @@ -1464,48 +1452,35 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
1464 1452
1465 srs_assert(req); 1453 srs_assert(req);
1466 1454
1467 - SrsTcpClient* transport = new SrsTcpClient();  
1468 - SrsAutoFree(SrsTcpClient, transport);  
1469 -  
1470 vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args; 1455 vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args;
1471 - for (int i = 0; i < (int)args.size(); i++) {  
1472 - string hostport = args.at(i);  
1473 - if ((ret = connect_server(hostport, transport)) == ERROR_SUCCESS) {  
1474 - break;  
1475 - }  
1476 - }  
1477 - if (ret != ERROR_SUCCESS) {  
1478 - srs_warn("token traverse connect failed. ret=%d", ret); 1456 + if (args.empty()) {
1479 return ret; 1457 return ret;
1480 } 1458 }
1481 1459
1482 - SrsRtmpClient* client = new SrsRtmpClient(transport);  
1483 - SrsAutoFree(SrsRtmpClient, client);  
1484 -  
1485 - return do_token_traverse_auth(client);  
1486 -}  
1487 -  
1488 -int SrsRtmpConn::connect_server(string hostport, SrsTcpClient* transport)  
1489 -{  
1490 - int ret = ERROR_SUCCESS;  
1491 -  
1492 - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);  
1493 - srs_assert(conf);  
1494 -  
1495 - // select the origin.  
1496 - string server;  
1497 - int port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
1498 - srs_parse_hostport(hostport, server, port); 1460 + for (int i = 0; i < (int)args.size(); i++) {
  1461 + string hostport = args.at(i);
  1462 +
  1463 + // select the origin.
  1464 + string server;
  1465 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  1466 + srs_parse_hostport(hostport, server, port);
1499 1467
1500 - // open socket.  
1501 - int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US;  
1502 - if ((ret = transport->connect(server, port, timeout)) != ERROR_SUCCESS) {  
1503 - srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",  
1504 - req->tcUrl.c_str(), server.c_str(), port, timeout, ret);  
1505 - return ret; 1468 + SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US / 1000);
  1469 + SrsAutoFree(SrsTcpClient, transport);
  1470 +
  1471 + if ((ret = transport->connect()) != ERROR_SUCCESS) {
  1472 + srs_warn("Illegal edge token, tcUrl=%s to server=%s, port=%d. ret=%d", req->tcUrl.c_str(), server.c_str(), port, ret);
  1473 + continue;
  1474 + }
  1475 +
  1476 + SrsRtmpClient* client = new SrsRtmpClient(transport);
  1477 + SrsAutoFree(SrsRtmpClient, client);
  1478 +
  1479 + return do_token_traverse_auth(client);
1506 } 1480 }
1507 - srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);  
1508 1481
  1482 + ret = ERROR_EDGE_PORT_INVALID;
  1483 + srs_error("Illegal edge token, server=%d. ret=%d", (int)args.size(), ret);
1509 return ret; 1484 return ret;
1510 } 1485 }
1511 1486
@@ -63,27 +63,41 @@ class ISrsKafkaCluster; @@ -63,27 +63,41 @@ class ISrsKafkaCluster;
63 #endif 63 #endif
64 64
65 /** 65 /**
66 - * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. 66 + * The simple RTMP client, provides friendly APIs.
  67 + * @remark Should never use client when closed.
  68 + * Usage:
  69 + * SrsSimpleRtmpClient client("rtmp://127.0.0.1:1935/live/livestream", 3000, 9000);
  70 + * client.connect();
  71 + * client.play();
  72 + * client.close();
67 */ 73 */
68 class SrsSimpleRtmpClient 74 class SrsSimpleRtmpClient
69 { 75 {
70 private: 76 private:
  77 + std::string url;
  78 + int64_t connect_timeout;
  79 + int64_t stream_timeout;
  80 +private:
71 SrsRequest* req; 81 SrsRequest* req;
72 SrsTcpClient* transport; 82 SrsTcpClient* transport;
73 SrsRtmpClient* client; 83 SrsRtmpClient* client;
74 SrsKbps* kbps; 84 SrsKbps* kbps;
75 int stream_id; 85 int stream_id;
76 public: 86 public:
77 - SrsSimpleRtmpClient(); 87 + // Constructor.
  88 + // @param u The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
  89 + // @param ctm The timeout in ms to connect to server.
  90 + // @param stm The timeout in ms to delivery A/V stream.
  91 + SrsSimpleRtmpClient(std::string u, int64_t ctm, int64_t stm);
78 virtual ~SrsSimpleRtmpClient(); 92 virtual ~SrsSimpleRtmpClient();
79 public: 93 public:
80 - virtual int connect(std::string url, int64_t connect_timeout, int64_t stream_timeout); 94 + // Connect, handshake and connect app to RTMP server.
  95 + // @remark We always close the transport.
  96 + virtual int connect();
  97 + virtual void close();
81 private: 98 private:
82 virtual int connect_app(); 99 virtual int connect_app();
83 public: 100 public:
84 - virtual bool connected();  
85 - virtual void close();  
86 -public:  
87 virtual int publish(); 101 virtual int publish();
88 virtual int play(); 102 virtual int play();
89 virtual void kbps_sample(const char* label, int64_t age); 103 virtual void kbps_sample(const char* label, int64_t age);
@@ -175,8 +189,8 @@ private: @@ -175,8 +189,8 @@ private:
175 virtual void set_sock_options(); 189 virtual void set_sock_options();
176 private: 190 private:
177 virtual int check_edge_token_traverse_auth(); 191 virtual int check_edge_token_traverse_auth();
178 - virtual int connect_server(std::string hostport, SrsTcpClient* transport);  
179 virtual int do_token_traverse_auth(SrsRtmpClient* client); 192 virtual int do_token_traverse_auth(SrsRtmpClient* client);
  193 +private:
180 /** 194 /**
181 * when the connection disconnect, call this method. 195 * when the connection disconnect, call this method.
182 * e.g. log msg of connection and report to other system. 196 * e.g. log msg of connection and report to other system.
@@ -197,7 +197,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) @@ -197,7 +197,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
197 trd = new SrsOneCycleThread("rtsp", this); 197 trd = new SrsOneCycleThread("rtsp", this);
198 198
199 req = NULL; 199 req = NULL;
200 - sdk = new SrsSimpleRtmpClient(); 200 + sdk = NULL;
201 vjitter = new SrsRtspJitter(); 201 vjitter = new SrsRtspJitter();
202 ajitter = new SrsRtspJitter(); 202 ajitter = new SrsRtspJitter();
203 203
@@ -209,6 +209,8 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) @@ -209,6 +209,8 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
209 209
210 SrsRtspConn::~SrsRtspConn() 210 SrsRtspConn::~SrsRtspConn()
211 { 211 {
  212 + close();
  213 +
212 srs_close_stfd(stfd); 214 srs_close_stfd(stfd);
213 215
214 srs_freep(video_rtp); 216 srs_freep(video_rtp);
@@ -623,6 +625,10 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i @@ -623,6 +625,10 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i
623 { 625 {
624 int ret = ERROR_SUCCESS; 626 int ret = ERROR_SUCCESS;
625 627
  628 + if ((ret = connect()) != ERROR_SUCCESS) {
  629 + return ret;
  630 + }
  631 +
626 SrsSharedPtrMessage* msg = NULL; 632 SrsSharedPtrMessage* msg = NULL;
627 633
628 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { 634 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) {
@@ -633,20 +639,19 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i @@ -633,20 +639,19 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i
633 639
634 // send out encoded msg. 640 // send out encoded msg.
635 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { 641 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
  642 + close();
636 return ret; 643 return ret;
637 } 644 }
638 645
639 return ret; 646 return ret;
640 } 647 }
641 648
642 -// TODO: FIXME: merge all client code.  
643 int SrsRtspConn::connect() 649 int SrsRtspConn::connect()
644 { 650 {
645 int ret = ERROR_SUCCESS; 651 int ret = ERROR_SUCCESS;
646 652
647 - // when ok, ignore.  
648 - // TODO: FIXME: support reconnect.  
649 - if (sdk->connected()) { 653 + // Ignore when connected.
  654 + if (sdk) {
650 return ret; 655 return ret;
651 } 656 }
652 657
@@ -666,13 +671,17 @@ int SrsRtspConn::connect() @@ -666,13 +671,17 @@ int SrsRtspConn::connect()
666 // connect host. 671 // connect host.
667 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; 672 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
668 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; 673 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
669 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { 674 + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
  675 +
  676 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
  677 + close();
670 srs_error("rtsp: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 678 srs_error("rtsp: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
671 return ret; 679 return ret;
672 } 680 }
673 681
674 // publish. 682 // publish.
675 if ((ret = sdk->publish()) != ERROR_SUCCESS) { 683 if ((ret = sdk->publish()) != ERROR_SUCCESS) {
  684 + close();
676 srs_error("rtsp: publish %s failed. ret=%d", url.c_str(), ret); 685 srs_error("rtsp: publish %s failed. ret=%d", url.c_str(), ret);
677 return ret; 686 return ret;
678 } 687 }
@@ -680,6 +689,11 @@ int SrsRtspConn::connect() @@ -680,6 +689,11 @@ int SrsRtspConn::connect()
680 return write_sequence_header(); 689 return write_sequence_header();
681 } 690 }
682 691
  692 +void SrsRtspConn::close()
  693 +{
  694 + srs_freep(sdk);
  695 +}
  696 +
683 SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) 697 SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
684 { 698 {
685 // TODO: FIXME: support reload. 699 // TODO: FIXME: support reload.
@@ -177,9 +177,10 @@ private: @@ -177,9 +177,10 @@ private:
177 virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts); 177 virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts);
178 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); 178 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
179 private: 179 private:
180 - // connect to rtmp output url.  
181 - // @remark ignore when not connected, reconnect when disconnected. 180 + // Connect to RTMP server.
182 virtual int connect(); 181 virtual int connect();
  182 + // Close the connection to RTMP server.
  183 + virtual void close();
183 }; 184 };
184 185
185 /** 186 /**
@@ -421,10 +421,14 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) @@ -421,10 +421,14 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
421 return ret; 421 return ret;
422 } 422 }
423 423
424 -SrsTcpClient::SrsTcpClient() 424 +SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm)
425 { 425 {
426 io = NULL; 426 io = NULL;
427 stfd = NULL; 427 stfd = NULL;
  428 +
  429 + host = h;
  430 + port = p;
  431 + timeout = tm;
428 } 432 }
429 433
430 SrsTcpClient::~SrsTcpClient() 434 SrsTcpClient::~SrsTcpClient()
@@ -432,26 +436,19 @@ SrsTcpClient::~SrsTcpClient() @@ -432,26 +436,19 @@ SrsTcpClient::~SrsTcpClient()
432 close(); 436 close();
433 } 437 }
434 438
435 -bool SrsTcpClient::connected()  
436 -{  
437 - return io;  
438 -}  
439 -  
440 -int SrsTcpClient::connect(string host, int port, int64_t timeout) 439 +int SrsTcpClient::connect()
441 { 440 {
442 int ret = ERROR_SUCCESS; 441 int ret = ERROR_SUCCESS;
443 442
444 - // when connected, ignore.  
445 - if (io) {  
446 - return ret;  
447 - } 443 + close();
448 444
449 - // connect host.  
450 - if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {  
451 - srs_error("connect server %s:%d failed. ret=%d", host.c_str(), port, ret); 445 + srs_assert(stfd == NULL);
  446 + if ((ret = srs_socket_connect(host, port, timeout * 1000, &stfd)) != ERROR_SUCCESS) {
  447 + srs_error("connect tcp://%s:%d failed, to=%"PRId64"ms. ret=%d", host.c_str(), port, timeout, ret);
452 return ret; 448 return ret;
453 } 449 }
454 450
  451 + srs_assert(io == NULL);
455 io = new SrsStSocket(stfd); 452 io = new SrsStSocket(stfd);
456 453
457 return ret; 454 return ret;
@@ -459,7 +456,7 @@ int SrsTcpClient::connect(string host, int port, int64_t timeout) @@ -459,7 +456,7 @@ int SrsTcpClient::connect(string host, int port, int64_t timeout)
459 456
460 void SrsTcpClient::close() 457 void SrsTcpClient::close()
461 { 458 {
462 - // when closed, ignore. 459 + // Ignore when already closed.
463 if (!io) { 460 if (!io) {
464 return; 461 return;
465 } 462 }
@@ -205,34 +205,41 @@ public: @@ -205,34 +205,41 @@ public:
205 }; 205 };
206 206
207 /** 207 /**
208 - * the common tcp client, to connect to specified TCP server,  
209 - * reconnect and close the connection. 208 + * The client to connect to server over TCP.
  209 + * User must never reuse the client when close it.
  210 + * Usage:
  211 + * SrsTcpClient client("127.0.0.1", 1935,9000);
  212 + * client.connect();
  213 + * client.write("Hello world!", 12, NULL);
  214 + * client.read(buf, 4096, NULL);
210 */ 215 */
211 class SrsTcpClient : public ISrsProtocolReaderWriter 216 class SrsTcpClient : public ISrsProtocolReaderWriter
212 { 217 {
213 private: 218 private:
214 st_netfd_t stfd; 219 st_netfd_t stfd;
215 SrsStSocket* io; 220 SrsStSocket* io;
216 -public:  
217 - SrsTcpClient();  
218 - virtual ~SrsTcpClient(); 221 +private:
  222 + std::string host;
  223 + int port;
  224 + int64_t timeout;
219 public: 225 public:
220 /** 226 /**
221 - * whether connected to server. 227 + * Constructor.
  228 + * @param h the ip or hostname of server.
  229 + * @param p the port to connect to.
  230 + * @param tm the timeout in ms.
222 */ 231 */
223 - virtual bool connected(); 232 + SrsTcpClient(std::string h, int p, int64_t tm);
  233 + virtual ~SrsTcpClient();
224 public: 234 public:
225 /** 235 /**
226 - * connect to server over TCP.  
227 - * @param host the ip or hostname of server.  
228 - * @param port the port to connect to.  
229 - * @param timeout the timeout in us.  
230 - * @remark ignore when connected. 236 + * Connect to server over TCP.
  237 + * @remark We will close the exists connection before do connect.
231 */ 238 */
232 - virtual int connect(std::string host, int port, int64_t timeout); 239 + virtual int connect();
233 /** 240 /**
234 - * close the connection.  
235 - * @remark ignore when closed. 241 + * Close the connection to server.
  242 + * @remark User should never use the client when close it.
236 */ 243 */
237 virtual void close(); 244 virtual void close();
238 // interface ISrsProtocolReaderWriter 245 // interface ISrsProtocolReaderWriter
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 3 32 #define VERSION_MAJOR 3
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 14 34 +#define VERSION_REVISION 15
35 35
36 // generated by configure, only macros. 36 // generated by configure, only macros.
37 #include <srs_auto_headers.hpp> 37 #include <srs_auto_headers.hpp>
@@ -69,6 +69,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -69,6 +69,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
69 // to avoid death connection. 69 // to avoid death connection.
70 70
71 // the common io timeout, for both recv and send. 71 // the common io timeout, for both recv and send.
  72 +// TODO: FIXME: use ms for timeout.
72 #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL) 73 #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL)
73 74
74 // the timeout to wait for client control message, 75 // the timeout to wait for client control message,
@@ -409,6 +410,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -409,6 +410,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
409 #define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092 410 #define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092
410 411
411 // the common io timeout, for both recv and send. 412 // the common io timeout, for both recv and send.
  413 +// TODO: FIXME: use ms for timeout.
412 #define SRS_CONSTS_KAFKA_TIMEOUT_US (int64_t)(30*1000*1000LL) 414 #define SRS_CONSTS_KAFKA_TIMEOUT_US (int64_t)(30*1000*1000LL)
413 415
414 #endif 416 #endif
@@ -647,34 +647,8 @@ private: @@ -647,34 +647,8 @@ private:
647 SrsRawAacStream* aac; 647 SrsRawAacStream* aac;
648 std::string aac_specific_config; 648 std::string aac_specific_config;
649 public: 649 public:
650 - SrsIngestSrsOutput(SrsHttpUri* rtmp) {  
651 - out_rtmp = rtmp;  
652 - disconnected = false;  
653 - raw_aac_dts = srs_update_system_time_ms();  
654 -  
655 - req = NULL;  
656 - sdk = new SrsSimpleRtmpClient();  
657 -  
658 - avc = new SrsRawH264Stream();  
659 - aac = new SrsRawAacStream();  
660 - h264_sps_changed = false;  
661 - h264_pps_changed = false;  
662 - h264_sps_pps_sent = false;  
663 - }  
664 - virtual ~SrsIngestSrsOutput() {  
665 - close();  
666 -  
667 - srs_freep(sdk);  
668 - srs_freep(avc);  
669 - srs_freep(aac);  
670 -  
671 - std::multimap<int64_t, SrsTsMessage*>::iterator it;  
672 - for (it = queue.begin(); it != queue.end(); ++it) {  
673 - SrsTsMessage* msg = it->second;  
674 - srs_freep(msg);  
675 - }  
676 - queue.clear();  
677 - } 650 + SrsIngestSrsOutput(SrsHttpUri* rtmp);
  651 + virtual ~SrsIngestSrsOutput();
678 // interface ISrsTsHandler 652 // interface ISrsTsHandler
679 public: 653 public:
680 virtual int on_ts_message(SrsTsMessage* msg); 654 virtual int on_ts_message(SrsTsMessage* msg);
@@ -705,6 +679,37 @@ private: @@ -705,6 +679,37 @@ private:
705 virtual void close(); 679 virtual void close();
706 }; 680 };
707 681
  682 +SrsIngestSrsOutput::SrsIngestSrsOutput(SrsHttpUri* rtmp)
  683 +{
  684 + out_rtmp = rtmp;
  685 + disconnected = false;
  686 + raw_aac_dts = srs_update_system_time_ms();
  687 +
  688 + req = NULL;
  689 + sdk = NULL;
  690 +
  691 + avc = new SrsRawH264Stream();
  692 + aac = new SrsRawAacStream();
  693 + h264_sps_changed = false;
  694 + h264_pps_changed = false;
  695 + h264_sps_pps_sent = false;
  696 +}
  697 +
  698 +SrsIngestSrsOutput::~SrsIngestSrsOutput()
  699 +{
  700 + close();
  701 +
  702 + srs_freep(avc);
  703 + srs_freep(aac);
  704 +
  705 + std::multimap<int64_t, SrsTsMessage*>::iterator it;
  706 + for (it = queue.begin(); it != queue.end(); ++it) {
  707 + SrsTsMessage* msg = it->second;
  708 + srs_freep(msg);
  709 + }
  710 + queue.clear();
  711 +}
  712 +
708 int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) 713 int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
709 { 714 {
710 int ret = ERROR_SUCCESS; 715 int ret = ERROR_SUCCESS;
@@ -1184,6 +1189,10 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* @@ -1184,6 +1189,10 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
1184 { 1189 {
1185 int ret = ERROR_SUCCESS; 1190 int ret = ERROR_SUCCESS;
1186 1191
  1192 + if ((ret = connect()) != ERROR_SUCCESS) {
  1193 + return ret;
  1194 + }
  1195 +
1187 SrsSharedPtrMessage* msg = NULL; 1196 SrsSharedPtrMessage* msg = NULL;
1188 1197
1189 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { 1198 if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) {
@@ -1196,6 +1205,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* @@ -1196,6 +1205,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
1196 1205
1197 // send out encoded msg. 1206 // send out encoded msg.
1198 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { 1207 if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
  1208 + close();
1199 srs_error("send RTMP type=%d, dts=%d, size=%d failed. ret=%d", type, timestamp, size, ret); 1209 srs_error("send RTMP type=%d, dts=%d, size=%d failed. ret=%d", type, timestamp, size, ret);
1200 return ret; 1210 return ret;
1201 } 1211 }
@@ -1207,9 +1217,8 @@ int SrsIngestSrsOutput::connect() @@ -1207,9 +1217,8 @@ int SrsIngestSrsOutput::connect()
1207 { 1217 {
1208 int ret = ERROR_SUCCESS; 1218 int ret = ERROR_SUCCESS;
1209 1219
1210 - // when ok, ignore.  
1211 - // TODO: FIXME: should reconnect when disconnected.  
1212 - if (sdk->connected()) { 1220 + // Ignore when connected.
  1221 + if (sdk) {
1213 return ret; 1222 return ret;
1214 } 1223 }
1215 1224
@@ -1219,13 +1228,17 @@ int SrsIngestSrsOutput::connect() @@ -1219,13 +1228,17 @@ int SrsIngestSrsOutput::connect()
1219 // connect host. 1228 // connect host.
1220 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; 1229 int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
1221 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; 1230 int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
1222 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { 1231 + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
  1232 +
  1233 + if ((ret = sdk->connect()) != ERROR_SUCCESS) {
  1234 + close();
1223 srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 1235 srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
1224 return ret; 1236 return ret;
1225 } 1237 }
1226 1238
1227 // publish. 1239 // publish.
1228 if ((ret = sdk->publish()) != ERROR_SUCCESS) { 1240 if ((ret = sdk->publish()) != ERROR_SUCCESS) {
  1241 + close();
1229 srs_error("mpegts: publish %s failed. ret=%d", url.c_str(), ret); 1242 srs_error("mpegts: publish %s failed. ret=%d", url.c_str(), ret);
1230 return ret; 1243 return ret;
1231 } 1244 }
@@ -1235,11 +1248,9 @@ int SrsIngestSrsOutput::connect() @@ -1235,11 +1248,9 @@ int SrsIngestSrsOutput::connect()
1235 1248
1236 void SrsIngestSrsOutput::close() 1249 void SrsIngestSrsOutput::close()
1237 { 1250 {
1238 - srs_trace("close output=%s", out_rtmp->get_url().c_str());  
1239 h264_sps_pps_sent = false; 1251 h264_sps_pps_sent = false;
1240 -  
1241 srs_freep(req); 1252 srs_freep(req);
1242 - sdk->close(); 1253 + srs_freep(sdk);
1243 } 1254 }
1244 1255
1245 // the context for ingest hls stream. 1256 // the context for ingest hls stream.