winlin

use simple rtmp client for raw connect app

@@ -42,6 +42,7 @@ using namespace std; @@ -42,6 +42,7 @@ using namespace std;
42 #include <srs_app_utility.hpp> 42 #include <srs_app_utility.hpp>
43 #include <srs_protocol_amf0.hpp> 43 #include <srs_protocol_amf0.hpp>
44 #include <srs_kernel_utility.hpp> 44 #include <srs_kernel_utility.hpp>
  45 +#include <srs_app_rtmp_conn.hpp>
45 46
46 #define SRS_HTTP_FLV_STREAM_BUFFER 4096 47 #define SRS_HTTP_FLV_STREAM_BUFFER 4096
47 48
@@ -117,20 +118,13 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -117,20 +118,13 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
117 SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) 118 SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
118 : SrsHttpConn(cm, fd, m) 119 : SrsHttpConn(cm, fd, m)
119 { 120 {
120 -  
121 - req = NULL;  
122 - transport = new SrsTcpClient();  
123 - client = NULL;  
124 - stream_id = 0;  
125 - 121 + sdk = new SrsSimpleRtmpClient();
126 pprint = SrsPithyPrint::create_caster(); 122 pprint = SrsPithyPrint::create_caster();
127 } 123 }
128 124
129 SrsDynamicHttpConn::~SrsDynamicHttpConn() 125 SrsDynamicHttpConn::~SrsDynamicHttpConn()
130 { 126 {
131 - close();  
132 -  
133 - srs_freep(transport); 127 + srs_freep(sdk);
134 srs_freep(pprint); 128 srs_freep(pprint);
135 } 129 }
136 130
@@ -176,7 +170,7 @@ int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std @@ -176,7 +170,7 @@ int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std
176 } 170 }
177 171
178 ret = do_proxy(rr, &dec); 172 ret = do_proxy(rr, &dec);
179 - close(); 173 + sdk->close();
180 174
181 return ret; 175 return ret;
182 } 176 }
@@ -189,7 +183,7 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -189,7 +183,7 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
189 while (!rr->eof()) { 183 while (!rr->eof()) {
190 pprint->elapse(); 184 pprint->elapse();
191 185
192 - if ((ret = connect()) != ERROR_SUCCESS) { 186 + if ((ret = sdk->connect(output)) != ERROR_SUCCESS) {
193 return ret; 187 return ret;
194 } 188 }
195 189
@@ -212,13 +206,17 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -212,13 +206,17 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
212 return ret; 206 return ret;
213 } 207 }
214 208
215 - if ((ret = rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) { 209 + if ((ret = sdk->rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) {
216 if (!srs_is_client_gracefully_close(ret)) { 210 if (!srs_is_client_gracefully_close(ret)) {
217 srs_error("flv: proxy rtmp packet failed. ret=%d", ret); 211 srs_error("flv: proxy rtmp packet failed. ret=%d", ret);
218 } 212 }
219 return ret; 213 return ret;
220 } 214 }
221 215
  216 + if (pprint->can_print()) {
  217 + srs_trace("flv: send msg %d age=%d, dts=%d, size=%d", type, pprint->age(), time, size);
  218 + }
  219 +
222 if ((ret = dec->read_previous_tag_size(pps)) != ERROR_SUCCESS) { 220 if ((ret = dec->read_previous_tag_size(pps)) != ERROR_SUCCESS) {
223 if (!srs_is_client_gracefully_close(ret)) { 221 if (!srs_is_client_gracefully_close(ret)) {
224 srs_error("flv: proxy tag header pps failed. ret=%d", ret); 222 srs_error("flv: proxy tag header pps failed. ret=%d", ret);
@@ -230,142 +228,6 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -230,142 +228,6 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
230 return ret; 228 return ret;
231 } 229 }
232 230
233 -int SrsDynamicHttpConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)  
234 -{  
235 - int ret = ERROR_SUCCESS;  
236 -  
237 - SrsSharedPtrMessage* msg = NULL;  
238 -  
239 - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {  
240 - srs_error("flv: create shared ptr msg failed. ret=%d", ret);  
241 - return ret;  
242 - }  
243 - srs_assert(msg);  
244 -  
245 - if (pprint->can_print()) {  
246 - srs_trace("flv: send msg %s age=%d, dts=%"PRId64", size=%d",  
247 - msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size);  
248 - }  
249 -  
250 - // send out encoded msg.  
251 - if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {  
252 - return ret;  
253 - }  
254 -  
255 - return ret;  
256 -}  
257 -  
258 -int SrsDynamicHttpConn::connect()  
259 -{  
260 - int ret = ERROR_SUCCESS;  
261 -  
262 - // when ok, ignore.  
263 - // TODO: FIXME: should reconnect when disconnected.  
264 - if (transport->connected()) {  
265 - return ret;  
266 - }  
267 -  
268 - // parse uri  
269 - if (!req) {  
270 - req = new SrsRequest();  
271 - srs_parse_rtmp_url(output, req->tcUrl, req->stream);  
272 - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);  
273 - }  
274 -  
275 - // connect host.  
276 - if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {  
277 - return ret;  
278 - }  
279 -  
280 - srs_freep(client);  
281 - client = new SrsRtmpClient(transport);  
282 -  
283 - client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);  
284 - client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);  
285 -  
286 - // connect to vhost/app  
287 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
288 - srs_error("mpegts: handshake with server failed. ret=%d", ret);  
289 - return ret;  
290 - }  
291 - if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {  
292 - srs_error("mpegts: connect with server failed. ret=%d", ret);  
293 - return ret;  
294 - }  
295 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
296 - srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);  
297 - return ret;  
298 - }  
299 -  
300 - // publish.  
301 - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {  
302 - srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",  
303 - req->stream.c_str(), stream_id, ret);  
304 - return ret;  
305 - }  
306 -  
307 - return ret;  
308 -}  
309 -  
310 -// TODO: FIXME: refine the connect_app.  
311 -int SrsDynamicHttpConn::connect_app(string ep_server, int ep_port)  
312 -{  
313 - int ret = ERROR_SUCCESS;  
314 -  
315 - // args of request takes the srs info.  
316 - if (req->args == NULL) {  
317 - req->args = SrsAmf0Any::object();  
318 - }  
319 -  
320 - // notify server the edge identity,  
321 - // @see https://github.com/simple-rtmp-server/srs/issues/147  
322 - SrsAmf0Object* data = req->args;  
323 - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));  
324 - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));  
325 - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));  
326 - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));  
327 - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));  
328 - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));  
329 - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));  
330 - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));  
331 - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));  
332 - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));  
333 - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));  
334 - // for edge to directly get the id of client.  
335 - data->set("srs_pid", SrsAmf0Any::number(getpid()));  
336 - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));  
337 -  
338 - // local ip of edge  
339 - std::vector<std::string> ips = srs_get_local_ipv4_ips();  
340 - assert(_srs_config->get_stats_network() < (int)ips.size());  
341 - std::string local_ip = ips[_srs_config->get_stats_network()];  
342 - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));  
343 -  
344 - // generate the tcUrl  
345 - std::string param = "";  
346 - std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);  
347 -  
348 - // upnode server identity will show in the connect_app of client.  
349 - // @see https://github.com/simple-rtmp-server/srs/issues/160  
350 - // the debug_srs_upnode is config in vhost and default to true.  
351 - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);  
352 - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {  
353 - srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",  
354 - tc_url.c_str(), debug_srs_upnode, ret);  
355 - return ret;  
356 - }  
357 -  
358 - return ret;  
359 -}  
360 -  
361 -void SrsDynamicHttpConn::close()  
362 -{  
363 - transport->close();  
364 -  
365 - srs_freep(client);  
366 - srs_freep(req);  
367 -}  
368 -  
369 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) 231 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
370 { 232 {
371 http = h; 233 http = h;
@@ -44,6 +44,7 @@ class SrsPithyPrint; @@ -44,6 +44,7 @@ class SrsPithyPrint;
44 class ISrsHttpResponseReader; 44 class ISrsHttpResponseReader;
45 class SrsFlvDecoder; 45 class SrsFlvDecoder;
46 class SrsTcpClient; 46 class SrsTcpClient;
  47 +class SrsSimpleRtmpClient;
47 48
48 #include <srs_app_st.hpp> 49 #include <srs_app_st.hpp>
49 #include <srs_app_listener.hpp> 50 #include <srs_app_listener.hpp>
@@ -85,11 +86,7 @@ class SrsDynamicHttpConn : public SrsHttpConn @@ -85,11 +86,7 @@ class SrsDynamicHttpConn : public SrsHttpConn
85 private: 86 private:
86 std::string output; 87 std::string output;
87 SrsPithyPrint* pprint; 88 SrsPithyPrint* pprint;
88 -private:  
89 - SrsRequest* req;  
90 - SrsTcpClient* transport;  
91 - SrsRtmpClient* client;  
92 - int stream_id; 89 + SrsSimpleRtmpClient* sdk;
93 public: 90 public:
94 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); 91 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
95 virtual ~SrsDynamicHttpConn(); 92 virtual ~SrsDynamicHttpConn();
@@ -99,14 +96,6 @@ public: @@ -99,14 +96,6 @@ public:
99 virtual int proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o); 96 virtual int proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o);
100 private: 97 private:
101 virtual int do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec); 98 virtual int do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec);
102 - virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);  
103 -private:  
104 - // connect to rtmp output url.  
105 - // @remark ignore when not connected, reconnect when disconnected.  
106 - virtual int connect();  
107 - virtual int connect_app(std::string ep_server, int ep_port);  
108 - // close the connected io and rtmp to ready to be re-connect.  
109 - virtual void close();  
110 }; 99 };
111 100
112 /** 101 /**
@@ -76,6 +76,152 @@ using namespace std; @@ -76,6 +76,152 @@ using namespace std;
76 // when edge timeout, retry next. 76 // when edge timeout, retry next.
77 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) 77 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
78 78
  79 +SrsSimpleRtmpClient::SrsSimpleRtmpClient()
  80 +{
  81 + req = NULL;
  82 + client = NULL;
  83 +
  84 + transport = new SrsTcpClient();
  85 + stream_id = 0;
  86 +}
  87 +
  88 +SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
  89 +{
  90 + close();
  91 +
  92 + srs_freep(transport);
  93 +}
  94 +
  95 +int SrsSimpleRtmpClient::connect(string url)
  96 +{
  97 + int ret = ERROR_SUCCESS;
  98 +
  99 + // when ok, ignore.
  100 + // TODO: FIXME: should reconnect when disconnected.
  101 + if (transport->connected()) {
  102 + return ret;
  103 + }
  104 +
  105 + // parse uri
  106 + if (!req) {
  107 + req = new SrsRequest();
  108 + srs_parse_rtmp_url(url, req->tcUrl, req->stream);
  109 + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);
  110 + }
  111 +
  112 + // connect host.
  113 + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
  114 + return ret;
  115 + }
  116 +
  117 + srs_freep(client);
  118 + client = new SrsRtmpClient(transport);
  119 +
  120 + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  121 + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  122 +
  123 + // connect to vhost/app
  124 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  125 + srs_error("mpegts: handshake with server failed. ret=%d", ret);
  126 + return ret;
  127 + }
  128 + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
  129 + srs_error("mpegts: connect with server failed. ret=%d", ret);
  130 + return ret;
  131 + }
  132 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  133 + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  134 + return ret;
  135 + }
  136 +
  137 + // publish.
  138 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  139 + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
  140 + req->stream.c_str(), stream_id, ret);
  141 + return ret;
  142 + }
  143 +
  144 + return ret;
  145 +}
  146 +
  147 +void SrsSimpleRtmpClient::close()
  148 +{
  149 + transport->close();
  150 +
  151 + srs_freep(client);
  152 + srs_freep(req);
  153 +}
  154 +
  155 +int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
  156 +{
  157 + int ret = ERROR_SUCCESS;
  158 +
  159 + SrsSharedPtrMessage* msg = NULL;
  160 +
  161 + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
  162 + srs_error("flv: create shared ptr msg failed. ret=%d", ret);
  163 + return ret;
  164 + }
  165 + srs_assert(msg);
  166 +
  167 + // send out encoded msg.
  168 + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
  169 + return ret;
  170 + }
  171 +
  172 + return ret;
  173 +}
  174 +
  175 +int SrsSimpleRtmpClient::connect_app(string ep_server, int ep_port)
  176 +{
  177 + int ret = ERROR_SUCCESS;
  178 +
  179 + // args of request takes the srs info.
  180 + if (req->args == NULL) {
  181 + req->args = SrsAmf0Any::object();
  182 + }
  183 +
  184 + // notify server the edge identity,
  185 + // @see https://github.com/simple-rtmp-server/srs/issues/147
  186 + SrsAmf0Object* data = req->args;
  187 + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
  188 + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  189 + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
  190 + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
  191 + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
  192 + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
  193 + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
  194 + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
  195 + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
  196 + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
  197 + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
  198 + // for edge to directly get the id of client.
  199 + data->set("srs_pid", SrsAmf0Any::number(getpid()));
  200 + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
  201 +
  202 + // local ip of edge
  203 + std::vector<std::string> ips = srs_get_local_ipv4_ips();
  204 + assert(_srs_config->get_stats_network() < (int)ips.size());
  205 + std::string local_ip = ips[_srs_config->get_stats_network()];
  206 + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
  207 +
  208 + // generate the tcUrl
  209 + std::string param = "";
  210 + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
  211 +
  212 + // upnode server identity will show in the connect_app of client.
  213 + // @see https://github.com/simple-rtmp-server/srs/issues/160
  214 + // the debug_srs_upnode is config in vhost and default to true.
  215 + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
  216 + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
  217 + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
  218 + tc_url.c_str(), debug_srs_upnode, ret);
  219 + return ret;
  220 + }
  221 +
  222 + return ret;
  223 +}
  224 +
79 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) 225 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
80 : SrsConnection(svr, c) 226 : SrsConnection(svr, c)
81 { 227 {
@@ -58,8 +58,30 @@ class SrsSecurity; @@ -58,8 +58,30 @@ class SrsSecurity;
58 class ISrsWakable; 58 class ISrsWakable;
59 59
60 /** 60 /**
61 -* the client provides the main logic control for RTMP clients.  
62 -*/ 61 + * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs.
  62 + */
  63 +class SrsSimpleRtmpClient
  64 +{
  65 +private:
  66 + SrsRequest* req;
  67 + SrsTcpClient* transport;
  68 + SrsRtmpClient* client;
  69 + int stream_id;
  70 +public:
  71 + SrsSimpleRtmpClient();
  72 + virtual ~SrsSimpleRtmpClient();
  73 +public:
  74 + virtual int connect(std::string url);
  75 + virtual void close();
  76 +public:
  77 + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
  78 +private:
  79 + virtual int connect_app(std::string ep_server, int ep_port);
  80 +};
  81 +
  82 +/**
  83 + * the client provides the main logic control for RTMP clients.
  84 + */
63 class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler 85 class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler
64 { 86 {
65 // for the thread to directly access any field of connection. 87 // for the thread to directly access any field of connection.