Blame view

trunk/src/app/srs_app_edge.cpp 25.0 KB
winlin authored
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
The MIT License (MIT)

Copyright (c) 2013-2014 winlin

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#include <srs_app_edge.hpp>
26 27 28 29 30
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
31 32
using namespace std;
33
#include <srs_kernel_error.hpp>
34
#include <srs_protocol_rtmp.hpp>
35 36 37
#include <srs_protocol_io.hpp>
#include <srs_app_config.hpp>
#include <srs_protocol_utility.hpp>
38
#include <srs_app_st_socket.hpp>
39 40 41
#include <srs_app_source.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_core_autofree.hpp>
42
#include <srs_app_kbps.hpp>
43
#include <srs_protocol_msg_array.hpp>
44
#include <srs_app_utility.hpp>
45
#include <srs_protocol_amf0.hpp>
winlin authored
46 47

// when error, edge ingester sleep for a while and retry.
48 49 50
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)

// when edge timeout, retry next.
51
#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL)
winlin authored
52
53 54 55 56 57 58
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(1*1000*1000LL)

// when edge timeout, retry next.
#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL)
59 60 61
// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
winlin authored
62 63
SrsEdgeIngester::SrsEdgeIngester()
{
64
    io = NULL;
65
    kbps = new SrsKbps();
66
    client = NULL;
winlin authored
67 68
    _edge = NULL;
    _req = NULL;
69 70 71
    origin_index = 0;
    stream_id = 0;
    stfd = NULL;
72
    pthread = new SrsThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US, true);
winlin authored
73 74 75 76
}

SrsEdgeIngester::~SrsEdgeIngester()
{
77 78 79
    stop();
    
    srs_freep(pthread);
80
    srs_freep(kbps);
winlin authored
81 82
}
winlin authored
83
int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req)
winlin authored
84 85 86
{
    int ret = ERROR_SUCCESS;
    
87
    _source = source;
winlin authored
88 89 90 91 92 93 94 95
    _edge = edge;
    _req = req;
    
    return ret;
}

int SrsEdgeIngester::start()
{
96 97 98 99 100 101 102 103 104 105 106
    return pthread->start();
}

void SrsEdgeIngester::stop()
{
    pthread->stop();
    
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
107
    kbps->set_io(NULL, NULL);
108 109 110
    
    // notice to unpublish.
    _source->on_unpublish();
111 112 113 114
}

int SrsEdgeIngester::cycle()
{
winlin authored
115
    int ret = ERROR_SUCCESS;
116
    
117 118
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
119 120 121 122
        return ret;
    }
    srs_assert(client);
winlin authored
123 124
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
125 126 127 128 129 130 131

    SrsRequest* req = _req;
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
132
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
133 134 135 136 137 138 139 140 141 142 143 144 145 146
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
    if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", 
            req->stream.c_str(), stream_id, ret);
        return ret;
    }
    
    if ((ret = _source->on_publish()) != ERROR_SUCCESS) {
winlin authored
147
        srs_error("edge pull stream then publish to edge failed. ret=%d", ret);
148 149 150 151 152 153 154
        return ret;
    }
    
    if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) {
        return ret;
    }
    
winlin authored
155 156 157 158
    ret = ingest();
    if (srs_is_client_gracefully_close(ret)) {
        srs_warn("origin disconnected, retry. ret=%d", ret);
        ret = ERROR_SUCCESS;
159 160
    }
    
winlin authored
161 162 163
    return ret;
}
164
int SrsEdgeIngester::ingest()
winlin authored
165 166
{
    int ret = ERROR_SUCCESS;
167
    
168
    client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
169
    
winlin authored
170
    SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE);
171 172 173 174 175 176

    while (pthread->can_loop()) {
        pithy_print.elapse();
        
        // pithy print
        if (pithy_print.can_print()) {
177
            kbps->sample();
178
            srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY
179 180
                " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", 
                pithy_print.age(),
181 182
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
183 184 185
        }

        // read from client.
186 187
        SrsMessage* msg = NULL;
        if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
winlin authored
188
            if (!srs_is_client_gracefully_close(ret)) {
winlin authored
189
                srs_error("pull origin server message failed. ret=%d", ret);
winlin authored
190
            }
191 192 193 194 195
            return ret;
        }
        srs_verbose("edge loop recv message. ret=%d", ret);
        
        srs_assert(msg);
196
        SrsAutoFree(SrsMessage, msg);
197 198 199 200 201 202 203 204 205
        
        if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
            return ret;
        }
    }
    
    return ret;
}
206
// TODO: FIXME: refine the connect_app.
207
int SrsEdgeIngester::connect_app(string ep_server, string ep_port)
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/147
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
230 231
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
232 233 234 235 236 237 238 239 240 241
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
242 243 244 245
    // generate the tcUrl
    std::string param = "";
    std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
    
246
    // upnode server identity will show in the connect_app of client.
247 248 249 250
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/160
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
251 252
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
253 254 255 256 257 258
        return ret;
    }
    
    return ret;
}
259
int SrsEdgeIngester::process_publish_message(SrsMessage* msg)
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
{
    int ret = ERROR_SUCCESS;
    
    SrsSource* source = _source;
        
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
    }
280 281 282 283 284 285 286 287 288
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
289 290 291

    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
292
        SrsPacket* pkt = NULL;
293
        if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
294 295 296
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
297
        SrsAutoFree(SrsPacket, pkt);
298 299 300 301 302 303 304
    
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
305
            srs_info("process onMetaData message success.");
306 307 308
            return ret;
        }
        
309
        srs_info("ignore AMF0/AMF3 data message.");
310 311 312 313 314 315 316 317 318 319 320
        return ret;
    }
    
    return ret;
}

void SrsEdgeIngester::close_underlayer_socket()
{
    srs_close_stfd(stfd);
}
321
int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port)
322 323 324 325 326 327 328
{
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
329 330 331 332 333 334 335 336 337
    
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/79
    // when origin is error, for instance, server is shutdown,
    // then user remove the vhost then reload, the conf is empty.
    if (!conf) {
        ret = ERROR_EDGE_VHOST_REMOVED;
        srs_warn("vhost %s removed. ret=%d", _req->vhost.c_str(), ret);
        return ret;
    }
338 339 340 341 342
    
    // select the origin.
    std::string server = conf->args.at(origin_index % conf->args.size());
    origin_index = (origin_index + 1) % conf->args.size();
    
343 344
    std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
345 346 347 348 349 350 351
    size_t pos = server.find(":");
    if (pos != std::string::npos) {
        s_port = server.substr(pos + 1);
        server = server.substr(0, pos);
        port = ::atoi(s_port.c_str());
    }
    
352 353 354 355
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
356 357 358
    // open socket.
    int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;
    if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
winlin authored
359
        srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
360
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
361 362 363 364 365 366
        return ret;
    }
    
    srs_freep(client);
    srs_freep(io);
    
367
    srs_assert(stfd);
368
    io = new SrsStSocket(stfd);
369 370
    client = new SrsRtmpClient(io);
    
371
    kbps->set_io(io, io);
winlin authored
372
    
373
    srs_trace("edge pull connected, can_publish=%d, url=%s/%s, server=%s:%d",
winlin authored
374
        _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port);
375
    
winlin authored
376 377
    return ret;
}
378
winlin authored
379 380 381
SrsEdgeForwarder::SrsEdgeForwarder()
{
    io = NULL;
382
    kbps = new SrsKbps();
winlin authored
383 384 385 386 387 388
    client = NULL;
    _edge = NULL;
    _req = NULL;
    origin_index = 0;
    stream_id = 0;
    stfd = NULL;
389
    pthread = new SrsThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US, true);
390 391
    queue = new SrsMessageQueue();
    send_error_code = ERROR_SUCCESS;
winlin authored
392 393 394 395 396
}

SrsEdgeForwarder::~SrsEdgeForwarder()
{
    stop();
397 398 399
    
    srs_freep(pthread);
    srs_freep(queue);
400
    srs_freep(kbps);
winlin authored
401 402
}
403 404 405 406 407
void SrsEdgeForwarder::set_queue_size(double queue_size)
{
    return queue->set_queue_size(queue_size);
}
winlin authored
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;
    
    _source = source;
    _edge = edge;
    _req = req;
    
    return ret;
}

int SrsEdgeForwarder::start()
{
    int ret = ERROR_SUCCESS;
    
423 424
    send_error_code = ERROR_SUCCESS;
    
425 426
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
winlin authored
427 428 429 430
        return ret;
    }
    srs_assert(client);
winlin authored
431 432
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
winlin authored
433 434 435 436 437 438 439

    SrsRequest* req = _req;
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
440 441
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        srs_error("connect with server failed. ret=%d", ret);
winlin authored
442 443 444 445 446 447 448
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
449
    if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
winlin authored
450 451 452 453 454
        srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", 
            req->stream.c_str(), stream_id, ret);
        return ret;
    }
    
455
    return pthread->start();
winlin authored
456 457
}
458 459
void SrsEdgeForwarder::stop()
{
460 461
    pthread->stop();
    
462 463 464 465
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
466
    kbps->set_io(NULL, NULL);
467 468
}
469
#define SYS_MAX_EDGE_SEND_MSGS 128
470 471 472 473
int SrsEdgeForwarder::cycle()
{
    int ret = ERROR_SUCCESS;
    
winlin authored
474
    client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
475
    
winlin authored
476
    SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE);
477
    
478
    SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
479 480

    while (pthread->can_loop()) {
481 482 483 484 485 486 487
        if (send_error_code != ERROR_SUCCESS) {
            st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
            continue;
        }

        // read from client.
        if (true) {
488 489
            SrsMessage* msg = NULL;
            ret = client->recv_message(&msg);
490 491 492
            
            srs_verbose("edge loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
winlin authored
493
                srs_error("edge push get server control message failed. ret=%d", ret);
494 495 496 497 498 499 500 501
                send_error_code = ret;
                continue;
            }
            
            srs_freep(msg);
        }
        
        // forward all messages.
502
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
503
        int count = 0;
504
        if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
winlin authored
505
            srs_error("get message to push to origin failed. ret=%d", ret);
506 507 508
            return ret;
        }
        
509 510 511 512
        pithy_print.elapse();
        
        // pithy print
        if (pithy_print.can_print()) {
513
            kbps->sample();
514
            srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH
515 516
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 
                pithy_print.age(), count,
517 518
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
519
        }
520 521 522
        
        // ignore when no messages.
        if (count <= 0) {
winlin authored
523
            srs_verbose("no packets to push.");
524 525
            continue;
        }
526
    
527
        // sendout messages, all messages are freed by send_and_free_messages().
528 529 530
        if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
            srs_error("edge publish push message to server failed. ret=%d", ret);
            return ret;
531
        }
532 533 534 535 536
    }
    
    return ret;
}
537
int SrsEdgeForwarder::proxy(SrsMessage* msg)
winlin authored
538 539 540
{
    int ret = ERROR_SUCCESS;
    
541 542 543 544 545
    if ((ret = send_error_code) != ERROR_SUCCESS) {
        srs_error("publish edge proxy thread send error, ret=%d", ret);
        return ret;
    }
    
546 547 548
    // the msg is auto free by source,
    // so we just ignore, or copy then send it.
    if (msg->size <= 0
549 550 551 552
        || msg->header.is_set_chunk_size()
        || msg->header.is_window_ackledgement_size()
        || msg->header.is_ackledgement()
    ) {
553 554 555
        return ret;
    }
    
556 557
    SrsSharedPtrMessage copy;
    if ((ret = copy.create(msg)) != ERROR_SUCCESS) {
558
        srs_error("initialize the msg failed. ret=%d", ret);
559 560
        return ret;
    }
561
    srs_verbose("initialize shared ptr msg success.");
562
    
563 564
    copy.header.stream_id = stream_id;
    if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
565
        srs_error("enqueue edge publish msg failed. ret=%d", ret);
winlin authored
566 567 568 569 570 571 572 573 574 575
    }
    
    return ret;
}

void SrsEdgeForwarder::close_underlayer_socket()
{
    srs_close_stfd(stfd);
}
576
int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port)
winlin authored
577 578 579 580 581 582 583 584 585 586 587 588 589
{
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
    srs_assert(conf);
    
    // select the origin.
    std::string server = conf->args.at(origin_index % conf->args.size());
    origin_index = (origin_index + 1) % conf->args.size();
    
590 591
    std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
winlin authored
592 593 594 595 596 597 598
    size_t pos = server.find(":");
    if (pos != std::string::npos) {
        s_port = server.substr(pos + 1);
        server = server.substr(0, pos);
        port = ::atoi(s_port.c_str());
    }
    
599 600 601 602
    // output the connected server and port.
    ep_server = server;
    ep_port = s_port;
    
winlin authored
603
    // open socket.
604 605
    int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US;
    if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) {
winlin authored
606
        srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
607
            _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
winlin authored
608 609 610
        return ret;
    }
    
611 612 613 614
    srs_freep(client);
    srs_freep(io);
    
    srs_assert(stfd);
615
    io = new SrsStSocket(stfd);
616 617 618 619
    client = new SrsRtmpClient(io);
    
    kbps->set_io(io, io);
    
620
    // open socket.
621
    srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d",
622
        _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port);
winlin authored
623 624 625 626
    
    return ret;
}
627
// TODO: FIXME: refine the connect_app.
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
int SrsEdgeForwarder::connect_app(string ep_server, string ep_port)
{
    int ret = ERROR_SUCCESS;
    
    SrsRequest* req = _req;
    
    // args of request takes the srs info.
    if (req->args == NULL) {
        req->args = SrsAmf0Any::object();
    }
    
    // notify server the edge identity,
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/147
    SrsAmf0Object* data = req->args;
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
    data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
    data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
651 652
    data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    // local ip of edge
    std::vector<std::string> ips = srs_get_local_ipv4_ips();
    assert(_srs_config->get_stats_network() < (int)ips.size());
    std::string local_ip = ips[_srs_config->get_stats_network()];
    data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
    
    // generate the tcUrl
    std::string param = "";
    std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
    
    // upnode server identity will show in the connect_app of client.
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/160
    // the debug_srs_upnode is config in vhost and default to true.
    bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
    if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
672 673
        srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", 
            tc_url.c_str(), debug_srs_upnode, ret);
674 675 676 677 678 679
        return ret;
    }
    
    return ret;
}
winlin authored
680
SrsPlayEdge::SrsPlayEdge()
681 682
{
    state = SrsEdgeStateInit;
683
    user_state = SrsEdgeUserStateInit;
winlin authored
684
    ingester = new SrsEdgeIngester();
685 686
}
winlin authored
687
SrsPlayEdge::~SrsPlayEdge()
688
{
winlin authored
689
    srs_freep(ingester);
690 691
}
winlin authored
692
int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
693 694 695
{
    int ret = ERROR_SUCCESS;
    
696
    if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
winlin authored
697 698
        return ret;
    }
699 700 701 702
    
    return ret;
}
winlin authored
703
int SrsPlayEdge::on_client_play()
704 705
{
    int ret = ERROR_SUCCESS;
winlin authored
706 707
    
    // error state.
708
    if (user_state != SrsEdgeUserStateInit) {
winlin authored
709
        ret = ERROR_RTMP_EDGE_PLAY_STATE;
winlin authored
710
        srs_error("invalid state for client to pull stream on edge. "
711
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
winlin authored
712 713 714 715 716 717 718 719 720
        return ret;
    }
    
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        return ingester->start();
    }
721 722 723
    return ret;
}
winlin authored
724
void SrsPlayEdge::on_all_client_stop()
725
{
726 727 728
    // when all client disconnected,
    // and edge is ingesting origin stream, abort it.
    if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {
729 730
        ingester->stop();
    
731 732 733 734 735 736
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge change from %d to state %d (init).", pstate, state);
        
        return;
    }
737 738
}
winlin authored
739
int SrsPlayEdge::on_ingest_play()
740 741 742
{
    int ret = ERROR_SUCCESS;
    
743 744 745 746 747 748 749
    // when already connected(for instance, reconnect for error), ignore.
    if (state == SrsEdgeStateIngestConnected) {
        return ret;
    }
    
    srs_assert(state == SrsEdgeStatePlay);
    
750 751
    SrsEdgeState pstate = state;
    state = SrsEdgeStateIngestConnected;
winlin authored
752
    srs_trace("edge change from %d to state %d (pull).", pstate, state);
753 754 755
    
    return ret;
}
winlin authored
756 757 758 759 760 761 762 763 764 765 766 767 768

SrsPublishEdge::SrsPublishEdge()
{
    state = SrsEdgeStateInit;
    user_state = SrsEdgeUserStateInit;
    forwarder = new SrsEdgeForwarder();
}

SrsPublishEdge::~SrsPublishEdge()
{
    srs_freep(forwarder);
}
769 770 771 772 773
void SrsPublishEdge::set_queue_size(double queue_size)
{
    return forwarder->set_queue_size(queue_size);
}
winlin authored
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;

    if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

int SrsPublishEdge::on_client_publish()
{
    int ret = ERROR_SUCCESS;
    
    // error state.
    if (user_state != SrsEdgeUserStateInit) {
        ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
792
        srs_error("invalid state for client to publish stream on edge. "
winlin authored
793 794 795 796
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
        return ret;
    }
    
797 798 799 800 801 802
    // error when not init state.
    if (state != SrsEdgeStateInit) {
        ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
        srs_error("invalid state for client to publish stream on edge. "
            "state=%d, user_state=%d, ret=%d", state, user_state, ret);
        return ret;
winlin authored
803
    }
804
    
805 806 807 808 809 810 811
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/180
    // to avoid multiple publish the same stream on the same edge,
    // directly enter the publish stage.
    if (true) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStatePublish;
        srs_trace("edge change from %d to state %d (push).", pstate, state);
812 813
    }
    
814 815 816 817 818 819 820 821 822 823
    // start to forward stream to origin.
    ret = forwarder->start();
    
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/180
    // when failed, revert to init
    if (ret != ERROR_SUCCESS) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge revert from %d to state %d (push). ret=%d", pstate, state, ret);
    }
824
    
825
    return ret;
winlin authored
826 827
}
828
int SrsPublishEdge::on_proxy_publish(SrsMessage* msg)
winlin authored
829
{
830 831 832 833 834 835 836 837
    return forwarder->proxy(msg);
}

void SrsPublishEdge::on_proxy_unpublish()
{
    if (state == SrsEdgeStatePublish) {
        forwarder->stop();
    }
838 839 840 841
    
    SrsEdgeState pstate = state;
    state = SrsEdgeStateInit;
    srs_trace("edge change from %d to state %d (init).", pstate, state);
winlin authored
842
}
843