winlin

refine forwarder code, use simple rtmp client.

@@ -45,51 +45,47 @@ using namespace std; @@ -45,51 +45,47 @@ using namespace std;
45 #include <srs_kernel_codec.hpp> 45 #include <srs_kernel_codec.hpp>
46 #include <srs_core_autofree.hpp> 46 #include <srs_core_autofree.hpp>
47 #include <srs_kernel_utility.hpp> 47 #include <srs_kernel_utility.hpp>
  48 +#include <srs_app_rtmp_conn.hpp>
48 49
49 // when error, forwarder sleep for a while and retry. 50 // when error, forwarder sleep for a while and retry.
50 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) 51 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
51 52
52 -SrsForwarder::SrsForwarder(SrsSource* _source) 53 +SrsForwarder::SrsForwarder(SrsSource* s)
53 { 54 {
54 - source = _source; 55 + source = s;
55 56
56 - _req = NULL;  
57 - client = NULL;  
58 - transport = new SrsTcpClient();  
59 - kbps = new SrsKbps();  
60 - stream_id = 0; 57 + req = NULL;
  58 + sh_video = sh_audio = NULL;
61 59
  60 + sdk = new SrsSimpleRtmpClient();
62 pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US); 61 pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
63 queue = new SrsMessageQueue(); 62 queue = new SrsMessageQueue();
64 jitter = new SrsRtmpJitter(); 63 jitter = new SrsRtmpJitter();
65 -  
66 - sh_video = sh_audio = NULL;  
67 } 64 }
68 65
69 SrsForwarder::~SrsForwarder() 66 SrsForwarder::~SrsForwarder()
70 { 67 {
71 on_unpublish(); 68 on_unpublish();
72 69
73 - srs_freep(transport); 70 + srs_freep(sdk);
74 srs_freep(pthread); 71 srs_freep(pthread);
75 srs_freep(queue); 72 srs_freep(queue);
76 srs_freep(jitter); 73 srs_freep(jitter);
77 - srs_freep(kbps);  
78 74
79 srs_freep(sh_video); 75 srs_freep(sh_video);
80 srs_freep(sh_audio); 76 srs_freep(sh_audio);
81 } 77 }
82 78
83 -int SrsForwarder::initialize(SrsRequest* req, string ep_forward) 79 +int SrsForwarder::initialize(SrsRequest* r, string ep)
84 { 80 {
85 int ret = ERROR_SUCCESS; 81 int ret = ERROR_SUCCESS;
86 82
87 // it's ok to use the request object, 83 // it's ok to use the request object,
88 // SrsSource already copy it and never delete it. 84 // SrsSource already copy it and never delete it.
89 - _req = req; 85 + req = r;
90 86
91 // the ep(endpoint) to forward to 87 // the ep(endpoint) to forward to
92 - _ep_forward = ep_forward; 88 + ep_forward = ep;
93 89
94 return ret; 90 return ret;
95 } 91 }
@@ -103,12 +99,17 @@ int SrsForwarder::on_publish() @@ -103,12 +99,17 @@ int SrsForwarder::on_publish()
103 { 99 {
104 int ret = ERROR_SUCCESS; 100 int ret = ERROR_SUCCESS;
105 101
106 - SrsRequest* req = _req;  
107 -  
108 // discovery the server port and tcUrl from req and ep_forward. 102 // discovery the server port and tcUrl from req and ep_forward.
109 - int port;  
110 - std::string server, tc_url;  
111 - discovery_ep(server, port, tc_url); 103 + std::string server;
  104 + std::string tcUrl;
  105 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  106 + if (true) {
  107 + // parse host:port from hostport.
  108 + srs_parse_hostport(ep_forward, server, port);
  109 +
  110 + // generate tcUrl
  111 + tcUrl = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
  112 + }
112 113
113 // dead loop check 114 // dead loop check
114 std::string source_ep = "rtmp://"; 115 std::string source_ep = "rtmp://";
@@ -119,7 +120,7 @@ int SrsForwarder::on_publish() @@ -119,7 +120,7 @@ int SrsForwarder::on_publish()
119 source_ep += req->vhost; 120 source_ep += req->vhost;
120 121
121 std::string dest_ep = "rtmp://"; 122 std::string dest_ep = "rtmp://";
122 - if (_ep_forward == SRS_CONSTS_LOCALHOST) { 123 + if (ep_forward == SRS_CONSTS_LOCALHOST) {
123 dest_ep += req->host; 124 dest_ep += req->host;
124 } else { 125 } else {
125 dest_ep += server; 126 dest_ep += server;
@@ -136,7 +137,7 @@ int SrsForwarder::on_publish() @@ -136,7 +137,7 @@ int SrsForwarder::on_publish()
136 return ret; 137 return ret;
137 } 138 }
138 srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", 139 srs_trace("start forward %s to %s, tcUrl=%s, stream=%s",
139 - source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 140 + source_ep.c_str(), dest_ep.c_str(), tcUrl.c_str(),
140 req->stream.c_str()); 141 req->stream.c_str());
141 142
142 if ((ret = pthread->start()) != ERROR_SUCCESS) { 143 if ((ret = pthread->start()) != ERROR_SUCCESS) {
@@ -151,10 +152,7 @@ int SrsForwarder::on_publish() @@ -151,10 +152,7 @@ int SrsForwarder::on_publish()
151 void SrsForwarder::on_unpublish() 152 void SrsForwarder::on_unpublish()
152 { 153 {
153 pthread->stop(); 154 pthread->stop();
154 - transport->close();  
155 -  
156 - srs_freep(client);  
157 - kbps->set_io(NULL, NULL); 155 + sdk->close();
158 } 156 }
159 157
160 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata) 158 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata)
@@ -228,32 +226,26 @@ int SrsForwarder::cycle() @@ -228,32 +226,26 @@ int SrsForwarder::cycle()
228 { 226 {
229 int ret = ERROR_SUCCESS; 227 int ret = ERROR_SUCCESS;
230 228
231 - std::string ep_server;  
232 - int ep_port;  
233 - if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {  
234 - return ret; 229 + std::string url;
  230 + if (true) {
  231 + std::string server;
  232 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  233 +
  234 + // parse host:port from hostport.
  235 + srs_parse_hostport(ep_forward, server, port);
  236 +
  237 + // generate url
  238 + url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream);
235 } 239 }
236 - srs_assert(client);  
237 -  
238 - client->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);  
239 - client->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);  
240 240
241 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
242 - srs_error("handshake with server failed. ret=%d", ret);  
243 - return ret;  
244 - }  
245 - if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {  
246 - srs_error("connect with server failed. ret=%d", ret);  
247 - return ret;  
248 - }  
249 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
250 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); 241 + int64_t cto = SRS_FORWARDER_SLEEP_US;
  242 + int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
  243 + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {
  244 + srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
251 return ret; 245 return ret;
252 } 246 }
253 247
254 - if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) {  
255 - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",  
256 - _req->stream.c_str(), stream_id, ret); 248 + if ((ret = sdk->publish()) != ERROR_SUCCESS) {
257 return ret; 249 return ret;
258 } 250 }
259 251
@@ -269,106 +261,12 @@ int SrsForwarder::cycle() @@ -269,106 +261,12 @@ int SrsForwarder::cycle()
269 return ret; 261 return ret;
270 } 262 }
271 263
272 -void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url)  
273 -{  
274 - SrsRequest* req = _req;  
275 -  
276 - port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
277 - srs_parse_hostport(_ep_forward, server, port);  
278 -  
279 - // generate tcUrl  
280 - tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);  
281 -}  
282 -  
283 -int SrsForwarder::connect_server(string& ep_server, int& ep_port)  
284 -{  
285 - int ret = ERROR_SUCCESS;  
286 -  
287 - // reopen  
288 - transport->close();  
289 -  
290 - // discovery the server port and tcUrl from req and ep_forward.  
291 - string tc_url;  
292 - discovery_ep(ep_server, ep_port, tc_url);  
293 -  
294 - // open socket.  
295 - int64_t timeout = SRS_FORWARDER_SLEEP_US;  
296 - if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) {  
297 - srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",  
298 - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);  
299 - return ret;  
300 - }  
301 -  
302 - srs_freep(client);  
303 - client = new SrsRtmpClient(transport);  
304 -  
305 - kbps->set_io(transport, transport);  
306 -  
307 - srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",  
308 - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port);  
309 -  
310 - return ret;  
311 -}  
312 -  
313 -// TODO: FIXME: refine the connect_app.  
314 -int SrsForwarder::connect_app(string ep_server, int ep_port)  
315 -{  
316 - int ret = ERROR_SUCCESS;  
317 -  
318 - SrsRequest* req = _req;  
319 -  
320 - // args of request takes the srs info.  
321 - if (req->args == NULL) {  
322 - req->args = SrsAmf0Any::object();  
323 - }  
324 -  
325 - // notify server the edge identity,  
326 - // @see https://github.com/simple-rtmp-server/srs/issues/147  
327 - SrsAmf0Object* data = req->args;  
328 - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));  
329 - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));  
330 - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));  
331 - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));  
332 - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));  
333 - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));  
334 - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));  
335 - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));  
336 - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));  
337 - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));  
338 - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));  
339 - // for edge to directly get the id of client.  
340 - data->set("srs_pid", SrsAmf0Any::number(getpid()));  
341 - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));  
342 -  
343 - // local ip of edge  
344 - std::vector<std::string> ips = srs_get_local_ipv4_ips();  
345 - assert(_srs_config->get_stats_network() < (int)ips.size());  
346 - std::string local_ip = ips[_srs_config->get_stats_network()];  
347 - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));  
348 -  
349 - // generate the tcUrl  
350 - std::string param = "";  
351 - std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);  
352 -  
353 - // upnode server identity will show in the connect_app of client.  
354 - // @see https://github.com/simple-rtmp-server/srs/issues/160  
355 - // the debug_srs_upnode is config in vhost and default to true.  
356 - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);  
357 - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {  
358 - srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d",  
359 - tc_url.c_str(), debug_srs_upnode, ret);  
360 - return ret;  
361 - }  
362 -  
363 - return ret;  
364 -}  
365 -  
366 #define SYS_MAX_FORWARD_SEND_MSGS 128 264 #define SYS_MAX_FORWARD_SEND_MSGS 128
367 int SrsForwarder::forward() 265 int SrsForwarder::forward()
368 { 266 {
369 int ret = ERROR_SUCCESS; 267 int ret = ERROR_SUCCESS;
370 268
371 - client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 269 + sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
372 270
373 SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder(); 271 SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
374 SrsAutoFree(SrsPithyPrint, pprint); 272 SrsAutoFree(SrsPithyPrint, pprint);
@@ -378,13 +276,13 @@ int SrsForwarder::forward() @@ -378,13 +276,13 @@ int SrsForwarder::forward()
378 // update sequence header 276 // update sequence header
379 // TODO: FIXME: maybe need to zero the sequence header timestamp. 277 // TODO: FIXME: maybe need to zero the sequence header timestamp.
380 if (sh_video) { 278 if (sh_video) {
381 - if ((ret = client->send_and_free_message(sh_video->copy(), stream_id)) != ERROR_SUCCESS) { 279 + if ((ret = sdk->send_and_free_message(sh_video->copy())) != ERROR_SUCCESS) {
382 srs_error("forwarder send sh_video to server failed. ret=%d", ret); 280 srs_error("forwarder send sh_video to server failed. ret=%d", ret);
383 return ret; 281 return ret;
384 } 282 }
385 } 283 }
386 if (sh_audio) { 284 if (sh_audio) {
387 - if ((ret = client->send_and_free_message(sh_audio->copy(), stream_id)) != ERROR_SUCCESS) { 285 + if ((ret = sdk->send_and_free_message(sh_audio->copy())) != ERROR_SUCCESS) {
388 srs_error("forwarder send sh_audio to server failed. ret=%d", ret); 286 srs_error("forwarder send sh_audio to server failed. ret=%d", ret);
389 return ret; 287 return ret;
390 } 288 }
@@ -396,7 +294,7 @@ int SrsForwarder::forward() @@ -396,7 +294,7 @@ int SrsForwarder::forward()
396 // read from client. 294 // read from client.
397 if (true) { 295 if (true) {
398 SrsCommonMessage* msg = NULL; 296 SrsCommonMessage* msg = NULL;
399 - ret = client->recv_message(&msg); 297 + ret = sdk->recv_message(&msg);
400 298
401 srs_verbose("play loop recv message. ret=%d", ret); 299 srs_verbose("play loop recv message. ret=%d", ret);
402 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { 300 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
@@ -417,12 +315,7 @@ int SrsForwarder::forward() @@ -417,12 +315,7 @@ int SrsForwarder::forward()
417 315
418 // pithy print 316 // pithy print
419 if (pprint->can_print()) { 317 if (pprint->can_print()) {
420 - kbps->sample();  
421 - srs_trace("-> "SRS_CONSTS_LOG_FOWARDER  
422 - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",  
423 - pprint->age(), count,  
424 - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),  
425 - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); 318 + sdk->kbps_sample(SRS_CONSTS_LOG_FOWARDER, pprint->age(), count);
426 } 319 }
427 320
428 // ignore when no messages. 321 // ignore when no messages.
@@ -432,7 +325,7 @@ int SrsForwarder::forward() @@ -432,7 +325,7 @@ int SrsForwarder::forward()
432 } 325 }
433 326
434 // sendout messages, all messages are freed by send_and_free_messages(). 327 // sendout messages, all messages are freed by send_and_free_messages().
435 - if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) { 328 + if ((ret = sdk->send_and_free_messages(msgs.msgs, count)) != ERROR_SUCCESS) {
436 srs_error("forwarder messages to server failed. ret=%d", ret); 329 srs_error("forwarder messages to server failed. ret=%d", ret);
437 return ret; 330 return ret;
438 } 331 }
@@ -43,6 +43,7 @@ class SrsRtmpClient; @@ -43,6 +43,7 @@ class SrsRtmpClient;
43 class SrsRequest; 43 class SrsRequest;
44 class SrsSource; 44 class SrsSource;
45 class SrsKbps; 45 class SrsKbps;
  46 +class SrsSimpleRtmpClient;
46 47
47 /** 48 /**
48 * forward the stream to other servers. 49 * forward the stream to other servers.
@@ -52,16 +53,13 @@ class SrsForwarder : public ISrsReusableThread2Handler @@ -52,16 +53,13 @@ class SrsForwarder : public ISrsReusableThread2Handler
52 { 53 {
53 private: 54 private:
54 // the ep to forward, server[:port]. 55 // the ep to forward, server[:port].
55 - std::string _ep_forward;  
56 - SrsRequest* _req;  
57 - int stream_id; 56 + std::string ep_forward;
  57 + SrsRequest* req;
58 private: 58 private:
59 SrsReusableThread2* pthread; 59 SrsReusableThread2* pthread;
60 private: 60 private:
61 SrsSource* source; 61 SrsSource* source;
62 - SrsTcpClient* transport;  
63 - SrsKbps* kbps;  
64 - SrsRtmpClient* client; 62 + SrsSimpleRtmpClient* sdk;
65 SrsRtmpJitter* jitter; 63 SrsRtmpJitter* jitter;
66 SrsMessageQueue* queue; 64 SrsMessageQueue* queue;
67 /** 65 /**
@@ -74,7 +72,7 @@ public: @@ -74,7 +72,7 @@ public:
74 SrsForwarder(SrsSource* _source); 72 SrsForwarder(SrsSource* _source);
75 virtual ~SrsForwarder(); 73 virtual ~SrsForwarder();
76 public: 74 public:
77 - virtual int initialize(SrsRequest* req, std::string ep_forward); 75 + virtual int initialize(SrsRequest* r, std::string ep);
78 virtual void set_queue_size(double queue_size); 76 virtual void set_queue_size(double queue_size);
79 public: 77 public:
80 virtual int on_publish(); 78 virtual int on_publish();
@@ -98,9 +96,6 @@ public: @@ -98,9 +96,6 @@ public:
98 public: 96 public:
99 virtual int cycle(); 97 virtual int cycle();
100 private: 98 private:
101 - virtual void discovery_ep(std::string& server, int& port, std::string& tc_url);  
102 - virtual int connect_server(std::string& ep_server, int& ep_port);  
103 - virtual int connect_app(std::string ep_server, int ep_port);  
104 virtual int forward(); 99 virtual int forward();
105 }; 100 };
106 101
@@ -301,6 +301,11 @@ int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int @@ -301,6 +301,11 @@ int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int
301 return client->send_and_free_messages(msgs, nb_msgs, stream_id); 301 return client->send_and_free_messages(msgs, nb_msgs, stream_id);
302 } 302 }
303 303
  304 +int SrsSimpleRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg)
  305 +{
  306 + return client->send_and_free_message(msg, stream_id);
  307 +}
  308 +
304 void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) 309 void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
305 { 310 {
306 transport->set_recv_timeout(timeout); 311 transport->set_recv_timeout(timeout);
@@ -90,6 +90,7 @@ public: @@ -90,6 +90,7 @@ public:
90 virtual int recv_message(SrsCommonMessage** pmsg); 90 virtual int recv_message(SrsCommonMessage** pmsg);
91 virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); 91 virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
92 virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs); 92 virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
  93 + virtual int send_and_free_message(SrsSharedPtrMessage* msg);
93 public: 94 public:
94 virtual void set_recv_timeout(int64_t timeout); 95 virtual void set_recv_timeout(int64_t timeout);
95 }; 96 };