Blame view

trunk/src/app/srs_app_forward.cpp 13.7 KB
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2015 SRS(simple-rtmp-server)
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_forward.hpp>
25 26 27 28 29 30

#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
31 32
using namespace std;
33
#include <srs_app_source.hpp>
34
#include <srs_app_st_socket.hpp>
35
#include <srs_kernel_error.hpp>
36
#include <srs_kernel_log.hpp>
37 38
#include <srs_app_config.hpp>
#include <srs_app_pithy_print.hpp>
39 40
#include <srs_rtmp_sdk.hpp>
#include <srs_rtmp_utility.hpp>
41
#include <srs_app_kbps.hpp>
42
#include <srs_rtmp_msg_array.hpp>
43
#include <srs_app_utility.hpp>
44
#include <srs_rtmp_amf0.hpp>
45
#include <srs_kernel_codec.hpp>
46
#include <srs_core_autofree.hpp>
47
winlin authored
48 49 50
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
51
SrsForwarder::SrsForwarder(SrsSource* _source)
52
{
53 54
    source = _source;
    
55
    _req = NULL;
56 57 58
    io = NULL;
    client = NULL;
    stfd = NULL;
59
    kbps = new SrsKbps();
60
    stream_id = 0;
61
62
    pthread = new SrsThread("forward", this, SRS_FORWARDER_SLEEP_US, true);
63 64
    queue = new SrsMessageQueue();
    jitter = new SrsRtmpJitter();
65 66
    
    sh_video = sh_audio = NULL;
67 68 69 70
}

SrsForwarder::~SrsForwarder()
{
71 72 73 74 75
    on_unpublish();
    
    srs_freep(pthread);
    srs_freep(queue);
    srs_freep(jitter);
76
    srs_freep(kbps);
77 78 79
    
    srs_freep(sh_video);
    srs_freep(sh_audio);
winlin authored
80 81
}
82 83 84 85 86 87 88 89 90 91 92 93 94 95
int SrsForwarder::initialize(SrsRequest* req, string ep_forward)
{
    int ret = ERROR_SUCCESS;
    
    // it's ok to use the request object,
    // SrsSource already copy it and never delete it.
    _req = req;
    
    // the ep(endpoint) to forward to
    _ep_forward = ep_forward;
    
    return ret;
}
winlin authored
96 97
void SrsForwarder::set_queue_size(double queue_size)
{
98
    queue->set_queue_size(queue_size);
99 100
}
101
int SrsForwarder::on_publish()
102
{
103 104
    int ret = ERROR_SUCCESS;
    
105
    SrsRequest* req = _req;
106
    
107 108 109
    // discovery the server port and tcUrl from req and ep_forward.
    std::string server, port, tc_url;
    discovery_ep(server, port, tc_url);
110 111
    
    // dead loop check
112 113
    std::string source_ep = "rtmp://";
    source_ep += req->host;
114 115
    source_ep += ":";
    source_ep += req->port;
116 117
    source_ep += "?vhost=";
    source_ep += req->vhost;
118
    
119
    std::string dest_ep = "rtmp://";
120
    if (_ep_forward == SRS_CONSTS_LOCALHOST) {
121 122
        dest_ep += req->host;
    } else {
123
        dest_ep += server;
124
    }
125
    dest_ep += ":";
126
    dest_ep += port;
127
    dest_ep += "?vhost=";
128
    dest_ep += req->vhost;
129 130 131
    
    if (source_ep == dest_ep) {
        ret = ERROR_SYSTEM_FORWARD_LOOP;
132
        srs_warn("forward loop detected. src=%s, dest=%s, ret=%d", 
133 134 135
            source_ep.c_str(), dest_ep.c_str(), ret);
        return ret;
    }
136
    srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", 
137
        source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 
138
        req->stream.c_str());
139 140
    
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
141 142
        srs_error("start srs thread failed. ret=%d", ret);
        return ret;
143
    }
winlin authored
144
    srs_trace("forward thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
145 146
    
    return ret;
147 148 149 150
}

void SrsForwarder::on_unpublish()
{
151 152 153 154 155 156
    pthread->stop();
    
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
157
    kbps->set_io(NULL, NULL);
158 159
}
160
int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata)
161
{
162
    int ret = ERROR_SUCCESS;
163
164
    SrsSharedPtrMessage* metadata = shared_metadata->copy();
165
    
166
    if ((ret = jitter->correct(metadata, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
167 168 169 170 171 172 173 174 175
        srs_freep(metadata);
        return ret;
    }
    
    if ((ret = queue->enqueue(metadata)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
176 177
}
178
int SrsForwarder::on_audio(SrsSharedPtrMessage* shared_audio)
179
{
180 181
    int ret = ERROR_SUCCESS;
    
182
    SrsSharedPtrMessage* msg = shared_audio->copy();
183
184
    if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
185 186 187 188
        srs_freep(msg);
        return ret;
    }
    
189 190 191 192 193
    if (SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(sh_audio);
        sh_audio = msg->copy();
    }
    
194 195 196 197 198
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
199 200
}
201
int SrsForwarder::on_video(SrsSharedPtrMessage* shared_video)
202
{
203
    int ret = ERROR_SUCCESS;
204
205
    SrsSharedPtrMessage* msg = shared_video->copy();
206
    
207
    if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
208 209 210 211
        srs_freep(msg);
        return ret;
    }
    
212 213 214 215 216
    if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(sh_video);
        sh_video = msg->copy();
    }
    
217 218 219 220 221
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
222 223 224 225
}

int SrsForwarder::cycle()
{
226 227
    int ret = ERROR_SUCCESS;
    
228 229
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
230 231 232
        return ret;
    }
    srs_assert(client);
233
winlin authored
234 235
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
236 237 238 239 240
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
241 242
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        srs_error("connect with server failed. ret=%d", ret);
243 244 245 246 247 248 249
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
250
    if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) {
251
        srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", 
252
            _req->stream.c_str(), stream_id, ret);
253 254 255 256 257 258 259 260 261 262 263 264 265
        return ret;
    }
    
    if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) {
        srs_error("callback the source to feed the sequence header failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = forward()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
266 267 268 269
}

void SrsForwarder::close_underlayer_socket()
{
270
    srs_close_stfd(stfd);
271 272
}
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url)
{
    SrsRequest* req = _req;
    
    server = _ep_forward;
    port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    
    // TODO: FIXME: parse complex params
    size_t pos = _ep_forward.find(":");
    if (pos != std::string::npos) {
        port = _ep_forward.substr(pos + 1);
        server = _ep_forward.substr(0, pos);
    }
    
    // generate tcUrl
    tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
}

int SrsForwarder::connect_server(string& ep_server, string& ep_port)
292
{
293 294 295 296 297
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
298 299 300 301 302 303 304 305 306
    // discovery the server port and tcUrl from req and ep_forward.
    std::string server, s_port, tc_url;
    discovery_ep(server, s_port, tc_url);
    int port = ::atoi(s_port.c_str());
    
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
307
    // open socket.
308
    int64_t timeout = SRS_FORWARDER_SLEEP_US;
309
    if ((ret = srs_socket_connect(ep_server, port, timeout, &stfd)) != ERROR_SUCCESS) {
310
        srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
311
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
312 313
        return ret;
    }
314
    
315 316 317
    srs_freep(client);
    srs_freep(io);
    
318
    srs_assert(stfd);
319
    io = new SrsStSocket(stfd);
320 321
    client = new SrsRtmpClient(io);
    
322
    kbps->set_io(io, io);
323
    
324
    srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
        _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
    
    return ret;
}

// TODO: FIXME: refine the connect_app.
int SrsForwarder::connect_app(string ep_server, string ep_port)
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
343
    // @see https://github.com/simple-rtmp-server/srs/issues/147
344 345
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
346
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
347 348 349 350 351 352 353
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
354 355
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
    // generate the tcUrl
    std::string param = "";
    std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
    
    // upnode server identity will show in the connect_app of client.
371
    // @see https://github.com/simple-rtmp-server/srs/issues/160
372 373 374
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
375 376
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
377 378
        return ret;
    }
379
    
380
    return ret;
381 382
}
383
#define SYS_MAX_FORWARD_SEND_MSGS 128
384 385
int SrsForwarder::forward()
{
386 387
    int ret = ERROR_SUCCESS;
    
winlin authored
388
    client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
389
    
390 391
    SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
    SrsAutoFree(SrsPithyPrint, pprint);
392
393
    SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
394
    
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
    // update sequence header
    // TODO: FIXME: maybe need to zero the sequence header timestamp.
    if (sh_video) {
        if ((ret = client->send_and_free_message(sh_video->copy(), stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder send sh_video to server failed. ret=%d", ret);
            return ret;
        }
    }
    if (sh_audio) {
        if ((ret = client->send_and_free_message(sh_audio->copy(), stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder send sh_audio to server failed. ret=%d", ret);
            return ret;
        }
    }
    
410
    while (pthread->can_loop()) {
411
        pprint->elapse();
412
413 414
        // read from client.
        if (true) {
415
            SrsCommonMessage* msg = NULL;
416
            ret = client->recv_message(&msg);
417 418 419 420 421 422
            
            srs_verbose("play loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
                srs_error("recv server control message failed. ret=%d", ret);
                return ret;
            }
423 424
            
            srs_freep(msg);
425 426 427
        }
        
        // forward all messages.
428
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
429
        int count = 0;
430
        if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
431 432 433 434
            srs_error("get message to forward failed. ret=%d", ret);
            return ret;
        }
        
435
        // pithy print
436
        if (pprint->can_print()) {
437
            kbps->sample();
438
            srs_trace("-> "SRS_CONSTS_LOG_FOWARDER
439
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 
440
                pprint->age(), count,
441 442
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
443 444
        }
        
445 446 447 448 449 450
        // ignore when no messages.
        if (count <= 0) {
            srs_verbose("no packets to forward.");
            continue;
        }
    
451
        // sendout messages, all messages are freed by send_and_free_messages().
452 453 454
        if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder messages to server failed. ret=%d", ret);
            return ret;
455 456 457 458
        }
    }
    
    return ret;
459 460
}
461