Blame view

trunk/src/app/srs_app_forward.cpp 13.5 KB
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2014 winlin
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_forward.hpp>
25 26 27 28 29 30

#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
31 32
using namespace std;
33
#include <srs_app_source.hpp>
34
#include <srs_app_st_socket.hpp>
35
#include <srs_kernel_error.hpp>
36
#include <srs_kernel_log.hpp>
37 38
#include <srs_app_config.hpp>
#include <srs_app_pithy_print.hpp>
39
#include <srs_protocol_rtmp.hpp>
40
#include <srs_protocol_utility.hpp>
41
#include <srs_app_kbps.hpp>
42
#include <srs_protocol_msg_array.hpp>
43
#include <srs_app_utility.hpp>
44
#include <srs_protocol_amf0.hpp>
45
#include <srs_kernel_codec.hpp>
46
winlin authored
47 48 49
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
50
SrsForwarder::SrsForwarder(SrsSource* _source)
51
{
52 53
    source = _source;
    
54
    _req = NULL;
55 56 57
    io = NULL;
    client = NULL;
    stfd = NULL;
58
    kbps = new SrsKbps();
59
    stream_id = 0;
60
61
    pthread = new SrsThread("forward", this, SRS_FORWARDER_SLEEP_US, true);
62 63
    queue = new SrsMessageQueue();
    jitter = new SrsRtmpJitter();
64 65
    
    sh_video = sh_audio = NULL;
66 67 68 69
}

SrsForwarder::~SrsForwarder()
{
70 71 72 73 74
    on_unpublish();
    
    srs_freep(pthread);
    srs_freep(queue);
    srs_freep(jitter);
75
    srs_freep(kbps);
76 77 78
    
    srs_freep(sh_video);
    srs_freep(sh_audio);
winlin authored
79 80
}
81 82 83 84 85 86 87 88 89 90 91 92 93 94
int SrsForwarder::initialize(SrsRequest* req, string ep_forward)
{
    int ret = ERROR_SUCCESS;
    
    // it's ok to use the request object,
    // SrsSource already copy it and never delete it.
    _req = req;
    
    // the ep(endpoint) to forward to
    _ep_forward = ep_forward;
    
    return ret;
}
winlin authored
95 96
void SrsForwarder::set_queue_size(double queue_size)
{
97
    queue->set_queue_size(queue_size);
98 99
}
100
int SrsForwarder::on_publish()
101
{
102 103
    int ret = ERROR_SUCCESS;
    
104
    SrsRequest* req = _req;
105
    
106 107 108
    // discovery the server port and tcUrl from req and ep_forward.
    std::string server, port, tc_url;
    discovery_ep(server, port, tc_url);
109 110
    
    // dead loop check
111 112
    std::string source_ep = "rtmp://";
    source_ep += req->host;
113 114
    source_ep += ":";
    source_ep += req->port;
115 116
    source_ep += "?vhost=";
    source_ep += req->vhost;
117
    
118
    std::string dest_ep = "rtmp://";
119
    if (_ep_forward == SRS_CONSTS_LOCALHOST) {
120 121
        dest_ep += req->host;
    } else {
122
        dest_ep += _ep_forward;
123
    }
124
    dest_ep += ":";
125
    dest_ep += port;
126
    dest_ep += "?vhost=";
127
    dest_ep += req->vhost;
128 129 130
    
    if (source_ep == dest_ep) {
        ret = ERROR_SYSTEM_FORWARD_LOOP;
131
        srs_warn("forward loop detected. src=%s, dest=%s, ret=%d", 
132 133 134
            source_ep.c_str(), dest_ep.c_str(), ret);
        return ret;
    }
135
    srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", 
136
        source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 
137
        req->stream.c_str());
138 139
    
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
140 141
        srs_error("start srs thread failed. ret=%d", ret);
        return ret;
142
    }
winlin authored
143
    srs_trace("forward thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
144 145
    
    return ret;
146 147 148 149
}

void SrsForwarder::on_unpublish()
{
150 151 152 153 154 155
    pthread->stop();
    
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
156
    kbps->set_io(NULL, NULL);
157 158
}
159
int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
160
{
161 162
    int ret = ERROR_SUCCESS;
    
163
    if ((ret = jitter->correct(metadata, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
164 165 166 167 168 169 170 171 172
        srs_freep(metadata);
        return ret;
    }
    
    if ((ret = queue->enqueue(metadata)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
173 174
}
175
int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
176
{
177 178
    int ret = ERROR_SUCCESS;
    
179
    if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
180 181 182 183
        srs_freep(msg);
        return ret;
    }
    
184 185 186 187 188
    if (SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(sh_audio);
        sh_audio = msg->copy();
    }
    
189 190 191 192 193
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
194 195
}
196
int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
197
{
198 199
    int ret = ERROR_SUCCESS;
    
200
    if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) {
201 202 203 204
        srs_freep(msg);
        return ret;
    }
    
205 206 207 208 209
    if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(sh_video);
        sh_video = msg->copy();
    }
    
210 211 212 213 214
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
215 216 217 218
}

int SrsForwarder::cycle()
{
219 220
    int ret = ERROR_SUCCESS;
    
221 222
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
223 224 225
        return ret;
    }
    srs_assert(client);
226
winlin authored
227 228
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
229 230 231 232 233
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
234 235
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        srs_error("connect with server failed. ret=%d", ret);
236 237 238 239 240 241 242
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
243
    if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) {
244
        srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", 
245
            _req->stream.c_str(), stream_id, ret);
246 247 248 249 250 251 252 253 254 255 256 257 258
        return ret;
    }
    
    if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) {
        srs_error("callback the source to feed the sequence header failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = forward()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
259 260 261 262
}

void SrsForwarder::close_underlayer_socket()
{
263
    srs_close_stfd(stfd);
264 265
}
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url)
{
    SrsRequest* req = _req;
    
    server = _ep_forward;
    port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    
    // TODO: FIXME: parse complex params
    size_t pos = _ep_forward.find(":");
    if (pos != std::string::npos) {
        port = _ep_forward.substr(pos + 1);
        server = _ep_forward.substr(0, pos);
    }
    
    // generate tcUrl
    tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
}

int SrsForwarder::connect_server(string& ep_server, string& ep_port)
285
{
286 287 288 289 290
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
291 292 293 294 295 296 297 298 299
    // discovery the server port and tcUrl from req and ep_forward.
    std::string server, s_port, tc_url;
    discovery_ep(server, s_port, tc_url);
    int port = ::atoi(s_port.c_str());
    
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
300
    // open socket.
301
    int64_t timeout = SRS_FORWARDER_SLEEP_US;
302
    if ((ret = srs_socket_connect(ep_server, port, timeout, &stfd)) != ERROR_SUCCESS) {
303
        srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
304
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
305 306
        return ret;
    }
307
    
308 309 310
    srs_freep(client);
    srs_freep(io);
    
311
    srs_assert(stfd);
312
    io = new SrsStSocket(stfd);
313 314
    client = new SrsRtmpClient(io);
    
315
    kbps->set_io(io, io);
316
    
317
    srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
        _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
    
    return ret;
}

// TODO: FIXME: refine the connect_app.
int SrsForwarder::connect_app(string ep_server, string ep_port)
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/147
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
347 348
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
    // generate the tcUrl
    std::string param = "";
    std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
    
    // upnode server identity will show in the connect_app of client.
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/160
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
368 369
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
370 371
        return ret;
    }
372
    
373
    return ret;
374 375
}
376
#define SYS_MAX_FORWARD_SEND_MSGS 128
377 378
int SrsForwarder::forward()
{
379 380
    int ret = ERROR_SUCCESS;
    
winlin authored
381
    client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
382
    
winlin authored
383
    SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_FORWARDER);
384
385
    SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
386
    
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
    // update sequence header
    // TODO: FIXME: maybe need to zero the sequence header timestamp.
    if (sh_video) {
        if ((ret = client->send_and_free_message(sh_video->copy(), stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder send sh_video to server failed. ret=%d", ret);
            return ret;
        }
    }
    if (sh_audio) {
        if ((ret = client->send_and_free_message(sh_audio->copy(), stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder send sh_audio to server failed. ret=%d", ret);
            return ret;
        }
    }
    
402
    while (pthread->can_loop()) {
403 404
        pithy_print.elapse();
405 406
        // read from client.
        if (true) {
407 408
            SrsMessage* msg = NULL;
            ret = client->recv_message(&msg);
409 410 411 412 413 414
            
            srs_verbose("play loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
                srs_error("recv server control message failed. ret=%d", ret);
                return ret;
            }
415 416
            
            srs_freep(msg);
417 418 419
        }
        
        // forward all messages.
420
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
421
        int count = 0;
422
        if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
423 424 425 426
            srs_error("get message to forward failed. ret=%d", ret);
            return ret;
        }
        
427 428
        // pithy print
        if (pithy_print.can_print()) {
429
            kbps->sample();
430
            srs_trace("-> "SRS_CONSTS_LOG_FOWARDER
431 432
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 
                pithy_print.age(), count,
433 434
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
435 436
        }
        
437 438 439 440 441 442
        // ignore when no messages.
        if (count <= 0) {
            srs_verbose("no packets to forward.");
            continue;
        }
    
443
        // sendout messages, all messages are freed by send_and_free_messages().
444 445 446
        if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
            srs_error("forwarder messages to server failed. ret=%d", ret);
            return ret;
447 448 449 450
        }
    }
    
    return ret;
451 452
}
453