Blame view

trunk/src/app/srs_app_edge.cpp 25.9 KB
winlin authored
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2015 SRS(simple-rtmp-server)
winlin authored
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#include <srs_app_edge.hpp>
26 27 28 29 30
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
31 32
using namespace std;
33
#include <srs_kernel_error.hpp>
34 35
#include <srs_rtmp_sdk.hpp>
#include <srs_rtmp_io.hpp>
36
#include <srs_app_config.hpp>
37
#include <srs_rtmp_utility.hpp>
38
#include <srs_app_st_socket.hpp>
39 40 41
#include <srs_app_source.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_core_autofree.hpp>
42
#include <srs_app_kbps.hpp>
43
#include <srs_rtmp_msg_array.hpp>
44
#include <srs_app_utility.hpp>
45
#include <srs_rtmp_amf0.hpp>
46
#include <srs_kernel_utility.hpp>
winlin authored
47 48

// when error, edge ingester sleep for a while and retry.
49 50 51
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)

// when edge timeout, retry next.
52
#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL)
winlin authored
53
54 55 56 57 58 59
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(1*1000*1000LL)

// when edge timeout, retry next.
#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL)
60 61 62
// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
winlin authored
63 64
SrsEdgeIngester::SrsEdgeIngester()
{
65
    io = NULL;
66
    kbps = new SrsKbps();
67
    client = NULL;
winlin authored
68 69
    _edge = NULL;
    _req = NULL;
70 71 72
    origin_index = 0;
    stream_id = 0;
    stfd = NULL;
73
    pthread = new SrsThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US, true);
winlin authored
74 75 76 77
}

SrsEdgeIngester::~SrsEdgeIngester()
{
78 79 80
    stop();
    
    srs_freep(pthread);
81
    srs_freep(kbps);
winlin authored
82 83
}
winlin authored
84
int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req)
winlin authored
85 86 87
{
    int ret = ERROR_SUCCESS;
    
88
    _source = source;
winlin authored
89 90 91 92 93 94 95 96
    _edge = edge;
    _req = req;
    
    return ret;
}

int SrsEdgeIngester::start()
{
97 98 99 100 101 102 103 104 105 106 107
    return pthread->start();
}

void SrsEdgeIngester::stop()
{
    pthread->stop();
    
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
108
    kbps->set_io(NULL, NULL);
109 110 111
    
    // notice to unpublish.
    _source->on_unpublish();
112 113 114 115
}

int SrsEdgeIngester::cycle()
{
winlin authored
116
    int ret = ERROR_SUCCESS;
117
    
118 119
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
120 121 122 123
        return ret;
    }
    srs_assert(client);
winlin authored
124 125
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
126 127 128 129 130 131 132

    SrsRequest* req = _req;
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
133
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
134 135 136 137 138 139 140 141 142 143 144 145 146 147
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
    if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", 
            req->stream.c_str(), stream_id, ret);
        return ret;
    }
    
    if ((ret = _source->on_publish()) != ERROR_SUCCESS) {
winlin authored
148
        srs_error("edge pull stream then publish to edge failed. ret=%d", ret);
149 150 151 152 153 154 155
        return ret;
    }
    
    if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) {
        return ret;
    }
    
winlin authored
156 157 158 159
    ret = ingest();
    if (srs_is_client_gracefully_close(ret)) {
        srs_warn("origin disconnected, retry. ret=%d", ret);
        ret = ERROR_SUCCESS;
160 161
    }
    
winlin authored
162 163 164
    return ret;
}
165
int SrsEdgeIngester::ingest()
winlin authored
166 167
{
    int ret = ERROR_SUCCESS;
168
    
169
    client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
170
    
171 172
    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);
173 174

    while (pthread->can_loop()) {
175
        pprint->elapse();
176 177
        
        // pithy print
178
        if (pprint->can_print()) {
179
            kbps->sample();
180
            srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY
181
                " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", 
182
                pprint->age(),
183 184
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
185 186 187
        }

        // read from client.
188
        SrsCommonMessage* msg = NULL;
189
        if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
winlin authored
190
            if (!srs_is_client_gracefully_close(ret)) {
winlin authored
191
                srs_error("pull origin server message failed. ret=%d", ret);
winlin authored
192
            }
193 194 195 196 197
            return ret;
        }
        srs_verbose("edge loop recv message. ret=%d", ret);
        
        srs_assert(msg);
198
        SrsAutoFree(SrsCommonMessage, msg);
199 200 201 202 203 204 205 206 207
        
        if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
            return ret;
        }
    }
    
    return ret;
}
208
// TODO: FIXME: refine the connect_app.
209
int SrsEdgeIngester::connect_app(string ep_server, string ep_port)
210 211 212 213 214 215 216 217 218 219 220
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
221
    // @see https://github.com/simple-rtmp-server/srs/issues/147
222 223
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
224
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
225 226 227 228 229 230 231
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
232 233
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
234 235 236 237 238 239 240 241 242 243
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
244
    // support vhost tranform for edge,
245
    // @see https://github.com/simple-rtmp-server/srs/issues/372
246 247
    std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
    vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
248 249
    // generate the tcUrl
    std::string param = "";
250 251 252 253 254 255
    std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param);
    srs_trace("edge ingest from %s:%s at %s", ep_server.c_str(), ep_port.c_str(), tc_url.c_str());
    
    // replace the tcUrl in request,
    // which will replace the tc_url in client.connect_app().
    req->tcUrl = tc_url;
256
    
257
    // upnode server identity will show in the connect_app of client.
258
    // @see https://github.com/simple-rtmp-server/srs/issues/160
259 260 261
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
262 263
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
264 265 266 267 268 269
        return ret;
    }
    
    return ret;
}
270
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
{
    int ret = ERROR_SUCCESS;
    
    SrsSource* source = _source;
        
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
    }
291 292 293 294 295 296 297 298 299
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
300 301 302

    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
303
        SrsPacket* pkt = NULL;
304
        if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
305 306 307
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
308
        SrsAutoFree(SrsPacket, pkt);
309 310 311 312 313 314 315
    
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
316
            srs_info("process onMetaData message success.");
317 318 319
            return ret;
        }
        
320
        srs_info("ignore AMF0/AMF3 data message.");
321 322 323 324 325 326 327 328 329 330 331
        return ret;
    }
    
    return ret;
}

void SrsEdgeIngester::close_underlayer_socket()
{
    srs_close_stfd(stfd);
}
332
int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port)
333 334 335 336 337 338 339
{
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
340
    
341
    // @see https://github.com/simple-rtmp-server/srs/issues/79
342 343 344 345 346 347 348
    // when origin is error, for instance, server is shutdown,
    // then user remove the vhost then reload, the conf is empty.
    if (!conf) {
        ret = ERROR_EDGE_VHOST_REMOVED;
        srs_warn("vhost %s removed. ret=%d", _req->vhost.c_str(), ret);
        return ret;
    }
349 350 351 352 353
    
    // select the origin.
    std::string server = conf->args.at(origin_index % conf->args.size());
    origin_index = (origin_index + 1) % conf->args.size();
    
354 355
    std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
356 357 358 359 360 361 362
    size_t pos = server.find(":");
    if (pos != std::string::npos) {
        s_port = server.substr(pos + 1);
        server = server.substr(0, pos);
        port = ::atoi(s_port.c_str());
    }
    
363 364 365 366
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
367 368 369
    // open socket.
    int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;
    if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
winlin authored
370
        srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
371
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
372 373 374 375 376 377
        return ret;
    }
    
    srs_freep(client);
    srs_freep(io);
    
378
    srs_assert(stfd);
379
    io = new SrsStSocket(stfd);
380 381
    client = new SrsRtmpClient(io);
    
382
    kbps->set_io(io, io);
winlin authored
383
    
384
    srs_trace("edge pull connected, can_publish=%d, url=%s/%s, server=%s:%d",
winlin authored
385
        _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port);
386
    
winlin authored
387 388
    return ret;
}
389
winlin authored
390 391 392
SrsEdgeForwarder::SrsEdgeForwarder()
{
    io = NULL;
393
    kbps = new SrsKbps();
winlin authored
394 395 396 397 398 399
    client = NULL;
    _edge = NULL;
    _req = NULL;
    origin_index = 0;
    stream_id = 0;
    stfd = NULL;
400
    pthread = new SrsThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US, true);
401 402
    queue = new SrsMessageQueue();
    send_error_code = ERROR_SUCCESS;
winlin authored
403 404 405 406 407
}

SrsEdgeForwarder::~SrsEdgeForwarder()
{
    stop();
408 409 410
    
    srs_freep(pthread);
    srs_freep(queue);
411
    srs_freep(kbps);
winlin authored
412 413
}
414 415 416 417 418
void SrsEdgeForwarder::set_queue_size(double queue_size)
{
    return queue->set_queue_size(queue_size);
}
winlin authored
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;
    
    _source = source;
    _edge = edge;
    _req = req;
    
    return ret;
}

int SrsEdgeForwarder::start()
{
    int ret = ERROR_SUCCESS;
    
434 435
    send_error_code = ERROR_SUCCESS;
    
436 437
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
winlin authored
438 439 440 441
        return ret;
    }
    srs_assert(client);
winlin authored
442 443
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
winlin authored
444 445 446 447 448 449 450

    SrsRequest* req = _req;
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
451 452
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        srs_error("connect with server failed. ret=%d", ret);
winlin authored
453 454 455 456 457 458 459
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
460
    if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
461
        srs_error("publish failed, stream=%s, stream_id=%d. ret=%d", 
winlin authored
462 463 464 465
            req->stream.c_str(), stream_id, ret);
        return ret;
    }
    
466
    return pthread->start();
winlin authored
467 468
}
469 470
void SrsEdgeForwarder::stop()
{
471 472
    pthread->stop();
    
473 474 475 476
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
477
    kbps->set_io(NULL, NULL);
478 479
}
480
#define SYS_MAX_EDGE_SEND_MSGS 128
481 482 483 484
int SrsEdgeForwarder::cycle()
{
    int ret = ERROR_SUCCESS;
    
winlin authored
485
    client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
486
    
487 488
    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);
489
    
490
    SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
491 492

    while (pthread->can_loop()) {
493 494 495 496 497 498 499
        if (send_error_code != ERROR_SUCCESS) {
            st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
            continue;
        }

        // read from client.
        if (true) {
500
            SrsCommonMessage* msg = NULL;
501
            ret = client->recv_message(&msg);
502 503 504
            
            srs_verbose("edge loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
winlin authored
505
                srs_error("edge push get server control message failed. ret=%d", ret);
506 507 508 509 510 511 512 513
                send_error_code = ret;
                continue;
            }
            
            srs_freep(msg);
        }
        
        // forward all messages.
514
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
515
        int count = 0;
516
        if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
winlin authored
517
            srs_error("get message to push to origin failed. ret=%d", ret);
518 519 520
            return ret;
        }
        
521
        pprint->elapse();
522 523
        
        // pithy print
524
        if (pprint->can_print()) {
525
            kbps->sample();
526
            srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH
527
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 
528
                pprint->age(), count,
529 530
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
531
        }
532 533 534
        
        // ignore when no messages.
        if (count <= 0) {
winlin authored
535
            srs_verbose("no packets to push.");
536 537
            continue;
        }
538
    
539
        // sendout messages, all messages are freed by send_and_free_messages().
540 541 542
        if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
            srs_error("edge publish push message to server failed. ret=%d", ret);
            return ret;
543
        }
544 545 546 547 548
    }
    
    return ret;
}
549
int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
winlin authored
550 551 552
{
    int ret = ERROR_SUCCESS;
    
553 554 555 556 557
    if ((ret = send_error_code) != ERROR_SUCCESS) {
        srs_error("publish edge proxy thread send error, ret=%d", ret);
        return ret;
    }
    
558 559 560
    // the msg is auto free by source,
    // so we just ignore, or copy then send it.
    if (msg->size <= 0
561 562 563 564
        || msg->header.is_set_chunk_size()
        || msg->header.is_window_ackledgement_size()
        || msg->header.is_ackledgement()
    ) {
565 566 567
        return ret;
    }
    
568 569
    SrsSharedPtrMessage copy;
    if ((ret = copy.create(msg)) != ERROR_SUCCESS) {
570
        srs_error("initialize the msg failed. ret=%d", ret);
571 572
        return ret;
    }
573
    srs_verbose("initialize shared ptr msg success.");
574
    
575
    copy.stream_id = stream_id;
576
    if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
577
        srs_error("enqueue edge publish msg failed. ret=%d", ret);
winlin authored
578 579 580 581 582 583 584 585 586 587
    }
    
    return ret;
}

void SrsEdgeForwarder::close_underlayer_socket()
{
    srs_close_stfd(stfd);
}
588
int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port)
winlin authored
589 590 591 592 593 594 595 596 597 598 599 600 601
{
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
    srs_assert(conf);
    
    // select the origin.
    std::string server = conf->args.at(origin_index % conf->args.size());
    origin_index = (origin_index + 1) % conf->args.size();
    
602 603
    std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
winlin authored
604 605 606 607 608 609 610
    size_t pos = server.find(":");
    if (pos != std::string::npos) {
        s_port = server.substr(pos + 1);
        server = server.substr(0, pos);
        port = ::atoi(s_port.c_str());
    }
    
611 612 613 614
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
winlin authored
615
    // open socket.
616 617
    int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US;
    if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
winlin authored
618
        srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
619
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
winlin authored
620 621 622
        return ret;
    }
    
623 624 625 626
    srs_freep(client);
    srs_freep(io);
    
    srs_assert(stfd);
627
    io = new SrsStSocket(stfd);
628 629 630 631
    client = new SrsRtmpClient(io);
    
    kbps->set_io(io, io);
    
632
    // open socket.
633
    srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d",
634
        _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
winlin authored
635 636 637 638
    
    return ret;
}
639
// TODO: FIXME: refine the connect_app.
640 641 642 643 644 645 646 647 648 649 650 651
int SrsEdgeForwarder::connect_app(string ep_server, string ep_port)
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
652
    // @see https://github.com/simple-rtmp-server/srs/issues/147
653 654
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
655
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
656 657 658 659 660 661 662
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
663 664
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
665 666 667 668 669 670 671 672 673 674
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
675
    // support vhost tranform for edge,
676
    // @see https://github.com/simple-rtmp-server/srs/issues/372
677 678
    std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
    vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
679 680
    // generate the tcUrl
    std::string param = "";
681 682 683 684 685 686
    std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param);
    srs_trace("edge forward to %s:%s at %s", ep_server.c_str(), ep_port.c_str(), tc_url.c_str());
    
    // replace the tcUrl in request,
    // which will replace the tc_url in client.connect_app().
    req->tcUrl = tc_url;
687 688
    
    // upnode server identity will show in the connect_app of client.
689
    // @see https://github.com/simple-rtmp-server/srs/issues/160
690 691 692
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
693 694
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
695 696 697 698 699 700
        return ret;
    }
    
    return ret;
}
winlin authored
701
SrsPlayEdge::SrsPlayEdge()
702 703
{
    state = SrsEdgeStateInit;
704
    user_state = SrsEdgeUserStateInit;
winlin authored
705
    ingester = new SrsEdgeIngester();
706 707
}
winlin authored
708
SrsPlayEdge::~SrsPlayEdge()
709
{
winlin authored
710
    srs_freep(ingester);
711 712
}
winlin authored
713
int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
714 715 716
{
    int ret = ERROR_SUCCESS;
    
717
    if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
winlin authored
718 719
        return ret;
    }
720 721 722 723
    
    return ret;
}
winlin authored
724
int SrsPlayEdge::on_client_play()
725 726
{
    int ret = ERROR_SUCCESS;
winlin authored
727 728
    
    // error state.
729
    if (user_state != SrsEdgeUserStateInit) {
winlin authored
730
        ret = ERROR_RTMP_EDGE_PLAY_STATE;
winlin authored
731
        srs_error("invalid state for client to pull stream on edge. "
732
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
winlin authored
733 734 735 736 737 738 739 740 741
        return ret;
    }
    
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        return ingester->start();
    }
742 743 744
    return ret;
}
winlin authored
745
void SrsPlayEdge::on_all_client_stop()
746
{
747 748 749
    // when all client disconnected,
    // and edge is ingesting origin stream, abort it.
    if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {
750 751
        ingester->stop();
    
752 753 754 755 756 757
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge change from %d to state %d (init).", pstate, state);
        
        return;
    }
758 759
}
winlin authored
760
int SrsPlayEdge::on_ingest_play()
761 762 763
{
    int ret = ERROR_SUCCESS;
    
764 765 766 767 768 769 770
    // when already connected(for instance, reconnect for error), ignore.
    if (state == SrsEdgeStateIngestConnected) {
        return ret;
    }
    
    srs_assert(state == SrsEdgeStatePlay);
    
771 772
    SrsEdgeState pstate = state;
    state = SrsEdgeStateIngestConnected;
winlin authored
773
    srs_trace("edge change from %d to state %d (pull).", pstate, state);
774 775 776
    
    return ret;
}
winlin authored
777 778 779 780 781 782 783 784 785 786 787 788 789

SrsPublishEdge::SrsPublishEdge()
{
    state = SrsEdgeStateInit;
    user_state = SrsEdgeUserStateInit;
    forwarder = new SrsEdgeForwarder();
}

SrsPublishEdge::~SrsPublishEdge()
{
    srs_freep(forwarder);
}
790 791 792 793 794
void SrsPublishEdge::set_queue_size(double queue_size)
{
    return forwarder->set_queue_size(queue_size);
}
winlin authored
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;

    if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

int SrsPublishEdge::on_client_publish()
{
    int ret = ERROR_SUCCESS;
    
    // error state.
    if (user_state != SrsEdgeUserStateInit) {
        ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
813
        srs_error("invalid state for client to publish stream on edge. "
winlin authored
814 815 816 817
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
        return ret;
    }
    
818 819 820 821 822 823
    // error when not init state.
    if (state != SrsEdgeStateInit) {
        ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
        srs_error("invalid state for client to publish stream on edge. "
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
        return ret;
winlin authored
824
    }
825
    
826
    // @see https://github.com/simple-rtmp-server/srs/issues/180
827 828 829 830 831 832
    // to avoid multiple publish the same stream on the same edge,
    // directly enter the publish stage.
    if (true) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStatePublish;
        srs_trace("edge change from %d to state %d (push).", pstate, state);
833 834
    }
    
835 836 837
    // start to forward stream to origin.
    ret = forwarder->start();
    
838
    // @see https://github.com/simple-rtmp-server/srs/issues/180
839 840 841 842 843 844
    // when failed, revert to init
    if (ret != ERROR_SUCCESS) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge revert from %d to state %d (push). ret=%d", pstate, state, ret);
    }
845
    
846
    return ret;
winlin authored
847 848
}
849
int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
winlin authored
850
{
851 852 853 854 855 856 857 858
    return forwarder->proxy(msg);
}

void SrsPublishEdge::on_proxy_unpublish()
{
    if (state == SrsEdgeStatePublish) {
        forwarder->stop();
    }
859 860 861 862
    
    SrsEdgeState pstate = state;
    state = SrsEdgeStateInit;
    srs_trace("edge change from %d to state %d (init).", pstate, state);
winlin authored
863
}
864