winlin

add todo for memory increase when forwarder server failed.

1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_forward.hpp>  
25 -  
26 -#include <stdlib.h>  
27 -#include <sys/socket.h>  
28 -#include <netinet/in.h>  
29 -#include <arpa/inet.h>  
30 -  
31 -#include <srs_core_error.hpp>  
32 -#include <srs_core_rtmp.hpp>  
33 -#include <srs_core_log.hpp>  
34 -#include <srs_core_protocol.hpp>  
35 -#include <srs_core_pithy_print.hpp>  
36 -#include <srs_core_rtmp.hpp>  
37 -#include <srs_core_config.hpp>  
38 -  
39 -#define SRS_PULSE_TIMEOUT_MS 100  
40 -#define SRS_FORWARDER_SLEEP_MS 2000  
41 -#define SRS_SEND_TIMEOUT_US 3000000L  
42 -#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US  
43 -  
44 -SrsForwarder::SrsForwarder()  
45 -{  
46 - client = NULL;  
47 - stfd = NULL;  
48 - stream_id = 0;  
49 -  
50 - tid = NULL;  
51 - loop = false;  
52 -}  
53 -  
54 -SrsForwarder::~SrsForwarder()  
55 -{  
56 - on_unpublish();  
57 -  
58 - std::vector<SrsSharedPtrMessage*>::iterator it;  
59 - for (it = msgs.begin(); it != msgs.end(); ++it) {  
60 - SrsSharedPtrMessage* msg = *it;  
61 - srs_freep(msg);  
62 - }  
63 - msgs.clear();  
64 -}  
65 -  
66 -int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)  
67 -{  
68 - int ret = ERROR_SUCCESS;  
69 -  
70 - // forward app  
71 - app = req->app;  
72 -  
73 - stream_name = req->stream;  
74 - server = forward_server;  
75 - std::string s_port = RTMP_DEFAULT_PORTS;  
76 - port = RTMP_DEFAULT_PORT;  
77 -  
78 - size_t pos = forward_server.find(":");  
79 - if (pos != std::string::npos) {  
80 - s_port = forward_server.substr(pos + 1);  
81 - server = forward_server.substr(0, pos);  
82 - }  
83 - // discovery vhost  
84 - std::string vhost = req->vhost;  
85 - srs_vhost_resolve(vhost, s_port);  
86 - port = ::atoi(s_port.c_str());  
87 -  
88 - // generate tcUrl  
89 - tc_url = "rtmp://";  
90 - tc_url += vhost;  
91 - tc_url += "/";  
92 - tc_url += req->app;  
93 -  
94 - // dead loop check  
95 - std::string source_ep = req->vhost;  
96 - source_ep += ":";  
97 - source_ep += req->port;  
98 -  
99 - std::string dest_ep = vhost;  
100 - dest_ep += ":";  
101 - dest_ep += s_port;  
102 -  
103 - if (source_ep == dest_ep) {  
104 - ret = ERROR_SYSTEM_FORWARD_LOOP;  
105 - srs_warn("farder loop detected. src=%s, dest=%s, ret=%d",  
106 - source_ep.c_str(), dest_ep.c_str(), ret);  
107 - return ret;  
108 - }  
109 - srs_trace("start forward %s to %s, stream: %s/%s",  
110 - source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),  
111 - stream_name.c_str());  
112 -  
113 - // TODO: seems bug when republish and reforward.  
114 -  
115 - // start forward  
116 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
117 - return ret;  
118 - }  
119 -  
120 - srs_assert(!tid);  
121 - if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){  
122 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
123 - srs_error("st_thread_create failed. ret=%d", ret);  
124 - return ret;  
125 - }  
126 -  
127 - return ret;  
128 -}  
129 -  
130 -void SrsForwarder::on_unpublish()  
131 -{  
132 - if (tid) {  
133 - loop = false;  
134 - st_thread_interrupt(tid);  
135 - st_thread_join(tid, NULL);  
136 - tid = NULL;  
137 - }  
138 -  
139 - if (stfd) {  
140 - int fd = st_netfd_fileno(stfd);  
141 - st_netfd_close(stfd);  
142 - stfd = NULL;  
143 -  
144 - // st does not close it sometimes,  
145 - // close it manually.  
146 - close(fd);  
147 - }  
148 -  
149 - srs_freep(client);  
150 -}  
151 -  
152 -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)  
153 -{  
154 - int ret = ERROR_SUCCESS;  
155 -  
156 - msgs.push_back(metadata);  
157 -  
158 - return ret;  
159 -}  
160 -  
161 -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)  
162 -{  
163 - int ret = ERROR_SUCCESS;  
164 -  
165 - msgs.push_back(msg);  
166 -  
167 - return ret;  
168 -}  
169 -  
170 -int SrsForwarder::on_video(SrsSharedPtrMessage* msg)  
171 -{  
172 - int ret = ERROR_SUCCESS;  
173 -  
174 - msgs.push_back(msg);  
175 -  
176 - return ret;  
177 -}  
178 -  
179 -int SrsForwarder::open_socket()  
180 -{  
181 - int ret = ERROR_SUCCESS;  
182 -  
183 - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",  
184 - stream_name.c_str(), tc_url.c_str(), server.c_str(), port);  
185 -  
186 - int sock = socket(AF_INET, SOCK_STREAM, 0);  
187 - if(sock == -1){  
188 - ret = ERROR_SOCKET_CREATE;  
189 - srs_error("create socket error. ret=%d", ret);  
190 - return ret;  
191 - }  
192 -  
193 - stfd = st_netfd_open_socket(sock);  
194 - if(stfd == NULL){  
195 - ret = ERROR_ST_OPEN_SOCKET;  
196 - srs_error("st_netfd_open_socket failed. ret=%d", ret);  
197 - return ret;  
198 - }  
199 -  
200 - srs_freep(client);  
201 - client = new SrsRtmpClient(stfd);  
202 -  
203 - return ret;  
204 -}  
205 -  
206 -int SrsForwarder::connect_server()  
207 -{  
208 - int ret = ERROR_SUCCESS;  
209 -  
210 - std::string ip = srs_dns_resolve(server);  
211 - if (ip.empty()) {  
212 - ret = ERROR_SYSTEM_IP_INVALID;  
213 - srs_error("dns resolve server error, ip empty. ret=%d", ret);  
214 - return ret;  
215 - }  
216 -  
217 - sockaddr_in addr;  
218 - addr.sin_family = AF_INET;  
219 - addr.sin_port = htons(port);  
220 - addr.sin_addr.s_addr = inet_addr(ip.c_str());  
221 -  
222 - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){  
223 - ret = ERROR_ST_CONNECT;  
224 - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);  
225 - return ret;  
226 - }  
227 - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);  
228 -  
229 - return ret;  
230 -}  
231 -  
232 -int SrsForwarder::cycle()  
233 -{  
234 - int ret = ERROR_SUCCESS;  
235 -  
236 - client->set_recv_timeout(SRS_RECV_TIMEOUT_US);  
237 - client->set_send_timeout(SRS_SEND_TIMEOUT_US);  
238 -  
239 - if ((ret = connect_server()) != ERROR_SUCCESS) {  
240 - return ret;  
241 - }  
242 - srs_assert(client);  
243 -  
244 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
245 - srs_error("handshake with server failed. ret=%d", ret);  
246 - return ret;  
247 - }  
248 - if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {  
249 - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);  
250 - return ret;  
251 - }  
252 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
253 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);  
254 - return ret;  
255 - }  
256 - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {  
257 - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",  
258 - stream_name.c_str(), stream_id, ret);  
259 - return ret;  
260 - }  
261 -  
262 - if ((ret = forward()) != ERROR_SUCCESS) {  
263 - return ret;  
264 - }  
265 -  
266 - return ret;  
267 -}  
268 -  
269 -int SrsForwarder::forward()  
270 -{  
271 - int ret = ERROR_SUCCESS;  
272 -  
273 - client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);  
274 -  
275 - SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);  
276 -  
277 - while (loop) {  
278 - pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);  
279 -  
280 - // switch to other st-threads.  
281 - st_usleep(0);  
282 -  
283 - // read from client.  
284 - if (true) {  
285 - SrsCommonMessage* msg = NULL;  
286 - ret = client->recv_message(&msg);  
287 -  
288 - srs_verbose("play loop recv message. ret=%d", ret);  
289 - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {  
290 - srs_error("recv server control message failed. ret=%d", ret);  
291 - return ret;  
292 - }  
293 - }  
294 -  
295 - // ignore when no messages.  
296 - int count = (int)msgs.size();  
297 - if (msgs.empty()) {  
298 - continue;  
299 - }  
300 -  
301 - // reportable  
302 - if (pithy_print.can_print()) {  
303 - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
304 - pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());  
305 - }  
306 -  
307 - // all msgs to forward.  
308 - int i = 0;  
309 - for (i = 0; i < count; i++) {  
310 - SrsSharedPtrMessage* msg = msgs[i];  
311 - msgs[i] = NULL;  
312 -  
313 - // we erased the sendout messages, the msg must not be NULL.  
314 - srs_assert(msg);  
315 -  
316 - ret = client->send_message(msg);  
317 - if (ret != ERROR_SUCCESS) {  
318 - srs_error("forwarder send message to server failed. ret=%d", ret);  
319 -  
320 - // convert the index to count when error.  
321 - i++;  
322 -  
323 - break;  
324 - }  
325 - }  
326 -  
327 - // clear sendout mesages.  
328 - if (i < count) {  
329 - srs_warn("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);  
330 - } else {  
331 - srs_info("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);  
332 - }  
333 - msgs.erase(msgs.begin(), msgs.begin() + i);  
334 -  
335 - if (ret != ERROR_SUCCESS) {  
336 - break;  
337 - }  
338 - }  
339 -  
340 - return ret;  
341 -}  
342 -  
343 -void SrsForwarder::forward_cycle()  
344 -{  
345 - int ret = ERROR_SUCCESS;  
346 -  
347 - log_context->generate_id();  
348 - srs_trace("forward cycle start");  
349 -  
350 - while (loop) {  
351 - if ((ret = cycle()) != ERROR_SUCCESS) {  
352 - srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);  
353 - } else {  
354 - srs_info("forward cycle success, retry");  
355 - }  
356 -  
357 - if (!loop) {  
358 - break;  
359 - }  
360 -  
361 - st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);  
362 -  
363 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
364 - srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);  
365 - } else {  
366 - srs_info("forward cycle reopen success");  
367 - }  
368 - }  
369 - srs_trace("forward cycle finished");  
370 -}  
371 -  
372 -void* SrsForwarder::forward_thread(void* arg)  
373 -{  
374 - SrsForwarder* obj = (SrsForwarder*)arg;  
375 - srs_assert(obj != NULL);  
376 -  
377 - obj->loop = true;  
378 - obj->forward_cycle();  
379 -  
380 - return NULL;  
381 -}  
382 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_forward.hpp>
  25 +
  26 +#include <stdlib.h>
  27 +#include <sys/socket.h>
  28 +#include <netinet/in.h>
  29 +#include <arpa/inet.h>
  30 +
  31 +#include <srs_core_error.hpp>
  32 +#include <srs_core_rtmp.hpp>
  33 +#include <srs_core_log.hpp>
  34 +#include <srs_core_protocol.hpp>
  35 +#include <srs_core_pithy_print.hpp>
  36 +#include <srs_core_rtmp.hpp>
  37 +#include <srs_core_config.hpp>
  38 +
  39 +#define SRS_PULSE_TIMEOUT_MS 100
  40 +#define SRS_FORWARDER_SLEEP_MS 2000
  41 +#define SRS_SEND_TIMEOUT_US 3000000L
  42 +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
  43 +
  44 +SrsForwarder::SrsForwarder()
  45 +{
  46 + client = NULL;
  47 + stfd = NULL;
  48 + stream_id = 0;
  49 +
  50 + tid = NULL;
  51 + loop = false;
  52 +}
  53 +
  54 +SrsForwarder::~SrsForwarder()
  55 +{
  56 + on_unpublish();
  57 +
  58 + std::vector<SrsSharedPtrMessage*>::iterator it;
  59 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  60 + SrsSharedPtrMessage* msg = *it;
  61 + srs_freep(msg);
  62 + }
  63 + msgs.clear();
  64 +}
  65 +
  66 +int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
  67 +{
  68 + int ret = ERROR_SUCCESS;
  69 +
  70 + // forward app
  71 + app = req->app;
  72 +
  73 + stream_name = req->stream;
  74 + server = forward_server;
  75 + std::string s_port = RTMP_DEFAULT_PORTS;
  76 + port = RTMP_DEFAULT_PORT;
  77 +
  78 + size_t pos = forward_server.find(":");
  79 + if (pos != std::string::npos) {
  80 + s_port = forward_server.substr(pos + 1);
  81 + server = forward_server.substr(0, pos);
  82 + }
  83 + // discovery vhost
  84 + std::string vhost = req->vhost;
  85 + srs_vhost_resolve(vhost, s_port);
  86 + port = ::atoi(s_port.c_str());
  87 +
  88 + // generate tcUrl
  89 + tc_url = "rtmp://";
  90 + tc_url += vhost;
  91 + tc_url += "/";
  92 + tc_url += req->app;
  93 +
  94 + // dead loop check
  95 + std::string source_ep = req->vhost;
  96 + source_ep += ":";
  97 + source_ep += req->port;
  98 +
  99 + std::string dest_ep = vhost;
  100 + dest_ep += ":";
  101 + dest_ep += s_port;
  102 +
  103 + if (source_ep == dest_ep) {
  104 + ret = ERROR_SYSTEM_FORWARD_LOOP;
  105 + srs_warn("farder loop detected. src=%s, dest=%s, ret=%d",
  106 + source_ep.c_str(), dest_ep.c_str(), ret);
  107 + return ret;
  108 + }
  109 + srs_trace("start forward %s to %s, stream: %s/%s",
  110 + source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),
  111 + stream_name.c_str());
  112 +
  113 + // TODO: seems bug when republish and reforward.
  114 +
  115 + // start forward
  116 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  117 + return ret;
  118 + }
  119 +
  120 + srs_assert(!tid);
  121 + if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
  122 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  123 + srs_error("st_thread_create failed. ret=%d", ret);
  124 + return ret;
  125 + }
  126 +
  127 + return ret;
  128 +}
  129 +
  130 +void SrsForwarder::on_unpublish()
  131 +{
  132 + if (tid) {
  133 + loop = false;
  134 + st_thread_interrupt(tid);
  135 + st_thread_join(tid, NULL);
  136 + tid = NULL;
  137 + }
  138 +
  139 + if (stfd) {
  140 + int fd = st_netfd_fileno(stfd);
  141 + st_netfd_close(stfd);
  142 + stfd = NULL;
  143 +
  144 + // st does not close it sometimes,
  145 + // close it manually.
  146 + close(fd);
  147 + }
  148 +
  149 + srs_freep(client);
  150 +}
  151 +
  152 +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
  153 +{
  154 + int ret = ERROR_SUCCESS;
  155 +
  156 + msgs.push_back(metadata);
  157 +
  158 + return ret;
  159 +}
  160 +
  161 +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
  162 +{
  163 + int ret = ERROR_SUCCESS;
  164 +
  165 + msgs.push_back(ms
  166 + // TODO: FIXME: must drop the msgs when server failed.g);
  167 +
  168 + return ret;
  169 +}
  170 +
  171 +int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
  172 +{
  173 + int ret = ERROR_SUCCESS;
  174 +
  175 + // TODO: FIXME: must drop the msgs when server failed.
  176 + msgs.push_back(msg);
  177 +
  178 + return ret;
  179 +}
  180 +
  181 +int SrsForwarder::open_socket()
  182 +{
  183 + int ret = ERROR_SUCCESS;
  184 +
  185 + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
  186 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
  187 +
  188 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  189 + if(sock == -1){
  190 + ret = ERROR_SOCKET_CREATE;
  191 + srs_error("create socket error. ret=%d", ret);
  192 + return ret;
  193 + }
  194 +
  195 + stfd = st_netfd_open_socket(sock);
  196 + if(stfd == NULL){
  197 + ret = ERROR_ST_OPEN_SOCKET;
  198 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  199 + return ret;
  200 + }
  201 +
  202 + srs_freep(client);
  203 + client = new SrsRtmpClient(stfd);
  204 +
  205 + return ret;
  206 +}
  207 +
  208 +int SrsForwarder::connect_server()
  209 +{
  210 + int ret = ERROR_SUCCESS;
  211 +
  212 + std::string ip = srs_dns_resolve(server);
  213 + if (ip.empty()) {
  214 + ret = ERROR_SYSTEM_IP_INVALID;
  215 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  216 + return ret;
  217 + }
  218 +
  219 + sockaddr_in addr;
  220 + addr.sin_family = AF_INET;
  221 + addr.sin_port = htons(port);
  222 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  223 +
  224 + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
  225 + ret = ERROR_ST_CONNECT;
  226 + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
  227 + return ret;
  228 + }
  229 + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
  230 +
  231 + return ret;
  232 +}
  233 +
  234 +int SrsForwarder::cycle()
  235 +{
  236 + int ret = ERROR_SUCCESS;
  237 +
  238 + client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  239 + client->set_send_timeout(SRS_SEND_TIMEOUT_US);
  240 +
  241 + if ((ret = connect_server()) != ERROR_SUCCESS) {
  242 + return ret;
  243 + }
  244 + srs_assert(client);
  245 +
  246 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  247 + srs_error("handshake with server failed. ret=%d", ret);
  248 + return ret;
  249 + }
  250 + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
  251 + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
  252 + return ret;
  253 + }
  254 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  255 + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  256 + return ret;
  257 + }
  258 + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
  259 + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
  260 + stream_name.c_str(), stream_id, ret);
  261 + return ret;
  262 + }
  263 +
  264 + if ((ret = forward()) != ERROR_SUCCESS) {
  265 + return ret;
  266 + }
  267 +
  268 + return ret;
  269 +}
  270 +
  271 +int SrsForwarder::forward()
  272 +{
  273 + int ret = ERROR_SUCCESS;
  274 +
  275 + client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
  276 +
  277 + SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
  278 +
  279 + while (loop) {
  280 + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
  281 +
  282 + // switch to other st-threads.
  283 + st_usleep(0);
  284 +
  285 + // read from client.
  286 + if (true) {
  287 + SrsCommonMessage* msg = NULL;
  288 + ret = client->recv_message(&msg);
  289 +
  290 + srs_verbose("play loop recv message. ret=%d", ret);
  291 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
  292 + srs_error("recv server control message failed. ret=%d", ret);
  293 + return ret;
  294 + }
  295 + }
  296 +
  297 + // ignore when no messages.
  298 + int count = (int)msgs.size();
  299 + if (msgs.empty()) {
  300 + continue;
  301 + }
  302 +
  303 + // reportable
  304 + if (pithy_print.can_print()) {
  305 + srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  306 + pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
  307 + }
  308 +
  309 + // all msgs to forward.
  310 + int i = 0;
  311 + for (i = 0; i < count; i++) {
  312 + SrsSharedPtrMessage* msg = msgs[i];
  313 + msgs[i] = NULL;
  314 +
  315 + // we erased the sendout messages, the msg must not be NULL.
  316 + srs_assert(msg);
  317 +
  318 + ret = client->send_message(msg);
  319 + if (ret != ERROR_SUCCESS) {
  320 + srs_error("forwarder send message to server failed. ret=%d", ret);
  321 +
  322 + // convert the index to count when error.
  323 + i++;
  324 +
  325 + break;
  326 + }
  327 + }
  328 +
  329 + // clear sendout mesages.
  330 + if (i < count) {
  331 + srs_warn("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);
  332 + } else {
  333 + srs_info("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);
  334 + }
  335 + msgs.erase(msgs.begin(), msgs.begin() + i);
  336 +
  337 + if (ret != ERROR_SUCCESS) {
  338 + break;
  339 + }
  340 + }
  341 +
  342 + return ret;
  343 +}
  344 +
  345 +void SrsForwarder::forward_cycle()
  346 +{
  347 + int ret = ERROR_SUCCESS;
  348 +
  349 + log_context->generate_id();
  350 + srs_trace("forward cycle start");
  351 +
  352 + while (loop) {
  353 + if ((ret = cycle()) != ERROR_SUCCESS) {
  354 + srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
  355 + } else {
  356 + srs_info("forward cycle success, retry");
  357 + }
  358 +
  359 + if (!loop) {
  360 + break;
  361 + }
  362 +
  363 + st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
  364 +
  365 + if ((ret = open_socket()) != ERROR_SUCCESS) {
  366 + srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
  367 + } else {
  368 + srs_info("forward cycle reopen success");
  369 + }
  370 + }
  371 + srs_trace("forward cycle finished");
  372 +}
  373 +
  374 +void* SrsForwarder::forward_thread(void* arg)
  375 +{
  376 + SrsForwarder* obj = (SrsForwarder*)arg;
  377 + srs_assert(obj != NULL);
  378 +
  379 + obj->loop = true;
  380 + obj->forward_cycle();
  381 +
  382 + return NULL;
  383 +}
  384 +