winlin

refine code, use simple rtmp client.

@@ -179,15 +179,20 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -179,15 +179,20 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
179 { 179 {
180 int ret = ERROR_SUCCESS; 180 int ret = ERROR_SUCCESS;
181 181
  182 + if ((ret = sdk->connect(output, SRS_CONSTS_RTMP_RECV_TIMEOUT_US)) != ERROR_SUCCESS) {
  183 + srs_error("flv: connect %s failed. ret=%d", output.c_str(), ret);
  184 + return ret;
  185 + }
  186 +
  187 + if ((ret = sdk->publish()) != ERROR_SUCCESS) {
  188 + srs_error("flv: publish failed. ret=%d", ret);
  189 + return ret;
  190 + }
  191 +
182 char pps[4]; 192 char pps[4];
183 while (!rr->eof()) { 193 while (!rr->eof()) {
184 pprint->elapse(); 194 pprint->elapse();
185 195
186 - if ((ret = sdk->connect(output, SRS_CONSTS_RTMP_RECV_TIMEOUT_US)) != ERROR_SUCCESS) {  
187 - srs_error("flv: connect %s failed. ret=%d", output.c_str(), ret);  
188 - return ret;  
189 - }  
190 -  
191 char type; 196 char type;
192 int32_t size; 197 int32_t size;
193 u_int32_t time; 198 u_int32_t time;
@@ -45,6 +45,7 @@ using namespace std; @@ -45,6 +45,7 @@ using namespace std;
45 #include <srs_protocol_amf0.hpp> 45 #include <srs_protocol_amf0.hpp>
46 #include <srs_kernel_utility.hpp> 46 #include <srs_kernel_utility.hpp>
47 #include <srs_kernel_balance.hpp> 47 #include <srs_kernel_balance.hpp>
  48 +#include <srs_app_rtmp_conn.hpp>
48 49
49 // when error, edge ingester sleep for a while and retry. 50 // when error, edge ingester sleep for a while and retry.
50 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) 51 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@@ -63,12 +64,11 @@ using namespace std; @@ -63,12 +64,11 @@ using namespace std;
63 64
64 SrsEdgeIngester::SrsEdgeIngester() 65 SrsEdgeIngester::SrsEdgeIngester()
65 { 66 {
66 - transport = new SrsTcpClient();  
67 - kbps = new SrsKbps();  
68 - client = NULL;  
69 - _edge = NULL;  
70 - _req = NULL;  
71 - stream_id = 0; 67 + source = NULL;
  68 + edge = NULL;
  69 + req = NULL;
  70 +
  71 + sdk = new SrsSimpleRtmpClient();
72 lb = new SrsLbRoundRobin(); 72 lb = new SrsLbRoundRobin();
73 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); 73 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
74 } 74 }
@@ -77,19 +77,18 @@ SrsEdgeIngester::~SrsEdgeIngester() @@ -77,19 +77,18 @@ SrsEdgeIngester::~SrsEdgeIngester()
77 { 77 {
78 stop(); 78 stop();
79 79
80 - srs_freep(transport); 80 + srs_freep(sdk);
81 srs_freep(lb); 81 srs_freep(lb);
82 srs_freep(pthread); 82 srs_freep(pthread);
83 - srs_freep(kbps);  
84 } 83 }
85 84
86 -int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req) 85 +int SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r)
87 { 86 {
88 int ret = ERROR_SUCCESS; 87 int ret = ERROR_SUCCESS;
89 88
90 - _source = source;  
91 - _edge = edge;  
92 - _req = req; 89 + source = s;
  90 + edge = e;
  91 + req = r;
93 92
94 return ret; 93 return ret;
95 } 94 }
@@ -98,7 +97,7 @@ int SrsEdgeIngester::start() @@ -98,7 +97,7 @@ int SrsEdgeIngester::start()
98 { 97 {
99 int ret = ERROR_SUCCESS; 98 int ret = ERROR_SUCCESS;
100 99
101 - if ((ret = _source->on_publish()) != ERROR_SUCCESS) { 100 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
102 srs_error("edge pull stream then publish to edge failed. ret=%d", ret); 101 srs_error("edge pull stream then publish to edge failed. ret=%d", ret);
103 return ret; 102 return ret;
104 } 103 }
@@ -109,13 +108,10 @@ int SrsEdgeIngester::start() @@ -109,13 +108,10 @@ int SrsEdgeIngester::start()
109 void SrsEdgeIngester::stop() 108 void SrsEdgeIngester::stop()
110 { 109 {
111 pthread->stop(); 110 pthread->stop();
112 - transport->close();  
113 -  
114 - srs_freep(client);  
115 - kbps->set_io(NULL, NULL); 111 + sdk->close();
116 112
117 // notice to unpublish. 113 // notice to unpublish.
118 - _source->on_unpublish(); 114 + source->on_unpublish();
119 } 115 }
120 116
121 string SrsEdgeIngester::get_curr_origin() 117 string SrsEdgeIngester::get_curr_origin()
@@ -127,39 +123,47 @@ int SrsEdgeIngester::cycle() @@ -127,39 +123,47 @@ int SrsEdgeIngester::cycle()
127 { 123 {
128 int ret = ERROR_SUCCESS; 124 int ret = ERROR_SUCCESS;
129 125
130 - _source->on_source_id_changed(_srs_context->get_id()); 126 + source->on_source_id_changed(_srs_context->get_id());
131 127
132 - std::string ep_server;  
133 - int ep_port;  
134 - if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {  
135 - return ret; 128 + std::string url, vhost;
  129 + if (true) {
  130 + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
  131 +
  132 + // @see https://github.com/simple-rtmp-server/srs/issues/79
  133 + // when origin is error, for instance, server is shutdown,
  134 + // then user remove the vhost then reload, the conf is empty.
  135 + if (!conf) {
  136 + ret = ERROR_EDGE_VHOST_REMOVED;
  137 + srs_warn("vhost %s removed. ret=%d", req->vhost.c_str(), ret);
  138 + return ret;
  139 + }
  140 +
  141 + // select the origin.
  142 + if (true) {
  143 + std::string server = lb->select(conf->args);
  144 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  145 + srs_parse_hostport(server, server, port);
  146 +
  147 + url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream);
  148 + }
  149 +
  150 + // support vhost tranform for edge,
  151 + // @see https://github.com/simple-rtmp-server/srs/issues/372
  152 + vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
  153 + vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
136 } 154 }
137 - srs_assert(client);  
138 -  
139 - client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);  
140 - client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);  
141 -  
142 - SrsRequest* req = _req;  
143 155
144 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
145 - srs_error("handshake with server failed. ret=%d", ret);  
146 - return ret;  
147 - }  
148 - if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {  
149 - return ret;  
150 - }  
151 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
152 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); 156 + if ((ret = sdk->connect(url, vhost, SRS_CONSTS_RTMP_TIMEOUT_US)) != ERROR_SUCCESS) {
  157 + srs_error("edge pull %s failed. ret=%d", url.c_str(), ret);
153 return ret; 158 return ret;
154 } 159 }
155 160
156 - if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {  
157 - srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",  
158 - req->stream.c_str(), stream_id, ret); 161 + if ((ret = sdk->play()) != ERROR_SUCCESS) {
  162 + srs_error("edge pull %s stream failed. ret=%d", url.c_str(), ret);
159 return ret; 163 return ret;
160 } 164 }
161 165
162 - if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) { 166 + if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) {
163 return ret; 167 return ret;
164 } 168 }
165 169
@@ -176,27 +180,22 @@ int SrsEdgeIngester::ingest() @@ -176,27 +180,22 @@ int SrsEdgeIngester::ingest()
176 { 180 {
177 int ret = ERROR_SUCCESS; 181 int ret = ERROR_SUCCESS;
178 182
179 - client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US); 183 + sdk->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
180 184
181 SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); 185 SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
182 SrsAutoFree(SrsPithyPrint, pprint); 186 SrsAutoFree(SrsPithyPrint, pprint);
183 - 187 +
184 while (!pthread->interrupted()) { 188 while (!pthread->interrupted()) {
185 pprint->elapse(); 189 pprint->elapse();
186 190
187 // pithy print 191 // pithy print
188 if (pprint->can_print()) { 192 if (pprint->can_print()) {
189 - kbps->sample();  
190 - srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY  
191 - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",  
192 - pprint->age(),  
193 - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),  
194 - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); 193 + sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
195 } 194 }
196 - 195 +
197 // read from client. 196 // read from client.
198 SrsCommonMessage* msg = NULL; 197 SrsCommonMessage* msg = NULL;
199 - if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { 198 + if ((ret = sdk->recv_message(&msg)) != ERROR_SUCCESS) {
200 if (!srs_is_client_gracefully_close(ret)) { 199 if (!srs_is_client_gracefully_close(ret)) {
201 srs_error("pull origin server message failed. ret=%d", ret); 200 srs_error("pull origin server message failed. ret=%d", ret);
202 } 201 }
@@ -211,68 +210,6 @@ int SrsEdgeIngester::ingest() @@ -211,68 +210,6 @@ int SrsEdgeIngester::ingest()
211 return ret; 210 return ret;
212 } 211 }
213 } 212 }
214 -  
215 - return ret;  
216 -}  
217 -  
218 -// TODO: FIXME: refine the connect_app.  
219 -int SrsEdgeIngester::connect_app(string ep_server, int ep_port)  
220 -{  
221 - int ret = ERROR_SUCCESS;  
222 -  
223 - SrsRequest* req = _req;  
224 -  
225 - // args of request takes the srs info.  
226 - if (req->args == NULL) {  
227 - req->args = SrsAmf0Any::object();  
228 - }  
229 -  
230 - // notify server the edge identity,  
231 - // @see https://github.com/simple-rtmp-server/srs/issues/147  
232 - SrsAmf0Object* data = req->args;  
233 - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));  
234 - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));  
235 - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));  
236 - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));  
237 - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));  
238 - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));  
239 - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));  
240 - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));  
241 - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));  
242 - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));  
243 - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));  
244 - // for edge to directly get the id of client.  
245 - data->set("srs_pid", SrsAmf0Any::number(getpid()));  
246 - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));  
247 -  
248 - // local ip of edge  
249 - std::vector<std::string> ips = srs_get_local_ipv4_ips();  
250 - assert(_srs_config->get_stats_network() < (int)ips.size());  
251 - std::string local_ip = ips[_srs_config->get_stats_network()];  
252 - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));  
253 -  
254 - // support vhost tranform for edge,  
255 - // @see https://github.com/simple-rtmp-server/srs/issues/372  
256 - std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);  
257 - vhost = srs_string_replace(vhost, "[vhost]", req->vhost);  
258 - // generate the tcUrl  
259 - std::string param = "";  
260 - std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param);  
261 - srs_trace("edge ingest from %s:%d at %s", ep_server.c_str(), ep_port, tc_url.c_str());  
262 -  
263 - // replace the tcUrl in request,  
264 - // which will replace the tc_url in client.connect_app().  
265 - req->tcUrl = tc_url;  
266 -  
267 - // upnode server identity will show in the connect_app of client.  
268 - // @see https://github.com/simple-rtmp-server/srs/issues/160  
269 - // the debug_srs_upnode is config in vhost and default to true.  
270 - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);  
271 - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {  
272 - srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d",  
273 - tc_url.c_str(), debug_srs_upnode, ret);  
274 - return ret;  
275 - }  
276 213
277 return ret; 214 return ret;
278 } 215 }
@@ -280,8 +217,6 @@ int SrsEdgeIngester::connect_app(string ep_server, int ep_port) @@ -280,8 +217,6 @@ int SrsEdgeIngester::connect_app(string ep_server, int ep_port)
280 int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) 217 int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
281 { 218 {
282 int ret = ERROR_SUCCESS; 219 int ret = ERROR_SUCCESS;
283 -  
284 - SrsSource* source = _source;  
285 220
286 // process audio packet 221 // process audio packet
287 if (msg->header.is_audio()) { 222 if (msg->header.is_audio()) {
@@ -311,7 +246,7 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) @@ -311,7 +246,7 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
311 // process onMetaData 246 // process onMetaData
312 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { 247 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
313 SrsPacket* pkt = NULL; 248 SrsPacket* pkt = NULL;
314 - if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) { 249 + if ((ret = sdk->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
315 srs_error("decode onMetaData message failed. ret=%d", ret); 250 srs_error("decode onMetaData message failed. ret=%d", ret);
316 return ret; 251 return ret;
317 } 252 }
@@ -334,50 +269,6 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) @@ -334,50 +269,6 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
334 return ret; 269 return ret;
335 } 270 }
336 271
337 -int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)  
338 -{  
339 - int ret = ERROR_SUCCESS;  
340 -  
341 - // reopen  
342 - transport->close();  
343 -  
344 - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);  
345 -  
346 - // @see https://github.com/simple-rtmp-server/srs/issues/79  
347 - // when origin is error, for instance, server is shutdown,  
348 - // then user remove the vhost then reload, the conf is empty.  
349 - if (!conf) {  
350 - ret = ERROR_EDGE_VHOST_REMOVED;  
351 - srs_warn("vhost %s removed. ret=%d", _req->vhost.c_str(), ret);  
352 - return ret;  
353 - }  
354 -  
355 - // select the origin.  
356 - if (true) {  
357 - std::string server = lb->select(conf->args);  
358 - ep_port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
359 - srs_parse_hostport(server, ep_server, ep_port);  
360 - }  
361 -  
362 - // open socket.  
363 - int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;  
364 - if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) {  
365 - srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",  
366 - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);  
367 - return ret;  
368 - }  
369 -  
370 - srs_freep(client);  
371 - client = new SrsRtmpClient(transport);  
372 -  
373 - kbps->set_io(transport, transport);  
374 -  
375 - srs_trace("edge pull connected, url=%s/%s, server=%s:%d",  
376 - _req->tcUrl.c_str(), _req->stream.c_str(), ep_server.c_str(), ep_port);  
377 -  
378 - return ret;  
379 -}  
380 -  
381 SrsEdgeForwarder::SrsEdgeForwarder() 272 SrsEdgeForwarder::SrsEdgeForwarder()
382 { 273 {
383 transport = new SrsTcpClient(); 274 transport = new SrsTcpClient();
@@ -48,6 +48,7 @@ class ISrsProtocolReaderWriter; @@ -48,6 +48,7 @@ class ISrsProtocolReaderWriter;
48 class SrsKbps; 48 class SrsKbps;
49 class SrsLbRoundRobin; 49 class SrsLbRoundRobin;
50 class SrsTcpClient; 50 class SrsTcpClient;
  51 +class SrsSimpleRtmpClient;
51 52
52 /** 53 /**
53 * the state of edge, auto machine 54 * the state of edge, auto machine
@@ -80,21 +81,17 @@ enum SrsEdgeUserState @@ -80,21 +81,17 @@ enum SrsEdgeUserState
80 class SrsEdgeIngester : public ISrsReusableThread2Handler 81 class SrsEdgeIngester : public ISrsReusableThread2Handler
81 { 82 {
82 private: 83 private:
83 - int stream_id;  
84 -private:  
85 - SrsSource* _source;  
86 - SrsPlayEdge* _edge;  
87 - SrsRequest* _req; 84 + SrsSource* source;
  85 + SrsPlayEdge* edge;
  86 + SrsRequest* req;
88 SrsReusableThread2* pthread; 87 SrsReusableThread2* pthread;
89 - SrsTcpClient* transport;  
90 - SrsKbps* kbps;  
91 - SrsRtmpClient* client; 88 + SrsSimpleRtmpClient* sdk;
92 SrsLbRoundRobin* lb; 89 SrsLbRoundRobin* lb;
93 public: 90 public:
94 SrsEdgeIngester(); 91 SrsEdgeIngester();
95 virtual ~SrsEdgeIngester(); 92 virtual ~SrsEdgeIngester();
96 public: 93 public:
97 - virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req); 94 + virtual int initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);
98 virtual int start(); 95 virtual int start();
99 virtual void stop(); 96 virtual void stop();
100 virtual std::string get_curr_origin(); 97 virtual std::string get_curr_origin();
@@ -103,8 +100,6 @@ public: @@ -103,8 +100,6 @@ public:
103 virtual int cycle(); 100 virtual int cycle();
104 private: 101 private:
105 virtual int ingest(); 102 virtual int ingest();
106 - virtual int connect_server(std::string& ep_server, int& ep_port);  
107 - virtual int connect_app(std::string ep_server, int ep_port);  
108 virtual int process_publish_message(SrsCommonMessage* msg); 103 virtual int process_publish_message(SrsCommonMessage* msg);
109 }; 104 };
110 105
@@ -80,6 +80,7 @@ SrsSimpleRtmpClient::SrsSimpleRtmpClient() @@ -80,6 +80,7 @@ SrsSimpleRtmpClient::SrsSimpleRtmpClient()
80 { 80 {
81 req = NULL; 81 req = NULL;
82 client = NULL; 82 client = NULL;
  83 + kbps = new SrsKbps();
83 84
84 transport = new SrsTcpClient(); 85 transport = new SrsTcpClient();
85 stream_id = 0; 86 stream_id = 0;
@@ -89,7 +90,11 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient() @@ -89,7 +90,11 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
89 { 90 {
90 close(); 91 close();
91 92
  93 + srs_freep(kbps);
92 srs_freep(transport); 94 srs_freep(transport);
  95 +
  96 + srs_freep(client);
  97 + kbps->set_io(NULL, NULL);
93 } 98 }
94 99
95 int SrsSimpleRtmpClient::connect(string url, int64_t timeout) 100 int SrsSimpleRtmpClient::connect(string url, int64_t timeout)
@@ -122,6 +127,8 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) @@ -122,6 +127,8 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout)
122 srs_freep(client); 127 srs_freep(client);
123 client = new SrsRtmpClient(transport); 128 client = new SrsRtmpClient(transport);
124 129
  130 + kbps->set_io(transport, transport);
  131 +
125 client->set_recv_timeout(timeout); 132 client->set_recv_timeout(timeout);
126 client->set_send_timeout(timeout); 133 client->set_send_timeout(timeout);
127 134
@@ -139,13 +146,6 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) @@ -139,13 +146,6 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout)
139 return ret; 146 return ret;
140 } 147 }
141 148
142 - // publish.  
143 - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {  
144 - srs_error("sdk: publish failed, stream=%s, stream_id=%d. ret=%d",  
145 - req->stream.c_str(), stream_id, ret);  
146 - return ret;  
147 - }  
148 -  
149 return ret; 149 return ret;
150 } 150 }
151 151
@@ -215,6 +215,47 @@ void SrsSimpleRtmpClient::close() @@ -215,6 +215,47 @@ void SrsSimpleRtmpClient::close()
215 srs_freep(req); 215 srs_freep(req);
216 } 216 }
217 217
  218 +int SrsSimpleRtmpClient::publish()
  219 +{
  220 + int ret = ERROR_SUCCESS;
  221 +
  222 + // publish.
  223 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  224 + srs_error("sdk: publish failed, stream=%s, stream_id=%d. ret=%d",
  225 + req->stream.c_str(), stream_id, ret);
  226 + return ret;
  227 + }
  228 +
  229 + return ret;
  230 +}
  231 +
  232 +int SrsSimpleRtmpClient::play()
  233 +{
  234 + int ret = ERROR_SUCCESS;
  235 +
  236 + if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
  237 + srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
  238 + req->stream.c_str(), stream_id, ret);
  239 + return ret;
  240 + }
  241 +
  242 + return ret;
  243 +}
  244 +
  245 +void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age)
  246 +{
  247 + kbps->sample();
  248 +
  249 + int sr = kbps->get_send_kbps();
  250 + int sr30s = kbps->get_send_kbps_30s();
  251 + int sr5m = kbps->get_send_kbps_5m();
  252 + int rr = kbps->get_recv_kbps();
  253 + int rr30s = kbps->get_recv_kbps_30s();
  254 + int rr5m = kbps->get_recv_kbps_5m();
  255 +
  256 + srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", age, sr, sr30s, sr5m, rr, rr30s, rr5m);
  257 +}
  258 +
218 int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) 259 int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
219 { 260 {
220 int ret = ERROR_SUCCESS; 261 int ret = ERROR_SUCCESS;
@@ -235,6 +276,21 @@ int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* @@ -235,6 +276,21 @@ int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char*
235 return ret; 276 return ret;
236 } 277 }
237 278
  279 +int SrsSimpleRtmpClient::recv_message(SrsCommonMessage** pmsg)
  280 +{
  281 + return client->recv_message(pmsg);
  282 +}
  283 +
  284 +int SrsSimpleRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
  285 +{
  286 + return client->decode_message(msg, ppacket);
  287 +}
  288 +
  289 +void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
  290 +{
  291 + transport->set_recv_timeout(timeout);
  292 +}
  293 +
238 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) 294 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
239 : SrsConnection(svr, c) 295 : SrsConnection(svr, c)
240 { 296 {
@@ -56,6 +56,8 @@ class SrsQueueRecvThread; @@ -56,6 +56,8 @@ class SrsQueueRecvThread;
56 class SrsPublishRecvThread; 56 class SrsPublishRecvThread;
57 class SrsSecurity; 57 class SrsSecurity;
58 class ISrsWakable; 58 class ISrsWakable;
  59 +class SrsCommonMessage;
  60 +class SrsPacket;
59 61
60 /** 62 /**
61 * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. 63 * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs.
@@ -66,6 +68,7 @@ private: @@ -66,6 +68,7 @@ private:
66 SrsRequest* req; 68 SrsRequest* req;
67 SrsTcpClient* transport; 69 SrsTcpClient* transport;
68 SrsRtmpClient* client; 70 SrsRtmpClient* client;
  71 + SrsKbps* kbps;
69 int stream_id; 72 int stream_id;
70 public: 73 public:
71 SrsSimpleRtmpClient(); 74 SrsSimpleRtmpClient();
@@ -78,7 +81,15 @@ private: @@ -78,7 +81,15 @@ private:
78 public: 81 public:
79 virtual void close(); 82 virtual void close();
80 public: 83 public:
  84 + virtual int publish();
  85 + virtual int play();
  86 + virtual void kbps_sample(const char* label, int64_t age);
  87 +public:
81 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); 88 virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
  89 + virtual int recv_message(SrsCommonMessage** pmsg);
  90 + virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
  91 +public:
  92 + virtual void set_recv_timeout(int64_t timeout);
82 }; 93 };
83 94
84 /** 95 /**
@@ -68,6 +68,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -68,6 +68,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
68 // the following is the timeout for rtmp protocol, 68 // the following is the timeout for rtmp protocol,
69 // to avoid death connection. 69 // to avoid death connection.
70 70
  71 +// the common io timeout, for both recv and send.
  72 +#define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL)
  73 +
71 // the timeout to send data to client, 74 // the timeout to send data to client,
72 // if timeout, close the connection. 75 // if timeout, close the connection.
73 #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL) 76 #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL)