Blame view

trunk/src/app/srs_app_rtmp_conn.cpp 43.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
The MIT License (MIT)

Copyright (c) 2013-2014 winlin

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#include <srs_app_rtmp_conn.hpp>

#include <stdlib.h>
27 28 29
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44

using namespace std;

#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_protocol_rtmp.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_source.hpp>
#include <srs_app_server.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_config.hpp>
#include <srs_app_refer.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_http.hpp>
#include <srs_app_bandwidth.hpp>
45
#include <srs_app_st_socket.hpp>
46
#include <srs_app_http_hooks.hpp>
47
#include <srs_app_edge.hpp>
48
#include <srs_app_utility.hpp>
49
#include <srs_protocol_msg_array.hpp>
50
#include <srs_protocol_amf0.hpp>
51
#include <srs_app_recv_thread.hpp>
52
#include <srs_core_performance.hpp>
53
#include <srs_kernel_utility.hpp>
54
winlin authored
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
// sleep a while and close the connection.
#define SRS_STREAM_BUSY_SLEEP_US (int64_t)(3*1000*1000LL)

// the timeout to wait encoder to republish
// if timeout, close the connection.
#define SRS_REPUBLISH_SEND_TIMEOUT_US (int64_t)(3*60*1000*1000LL)
// if timeout, close the connection.
#define SRS_REPUBLISH_RECV_TIMEOUT_US (int64_t)(3*60*1000*1000LL)

// the timeout to wait client data, when client paused
// if timeout, close the connection.
#define SRS_PAUSED_SEND_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
// if timeout, close the connection.
#define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
72 73 74
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
75
SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
76 77 78 79
    : SrsConnection(srs_server, client_stfd)
{
    req = new SrsRequest();
    res = new SrsResponse();
80
    skt = new SrsStSocket(client_stfd);
81 82 83
    rtmp = new SrsRtmpServer(skt);
    refer = new SrsRefer();
    bandwidth = new SrsBandwidth();
84
    duration = 0;
85 86
    kbps = new SrsKbps();
    kbps->set_io(skt, skt);
87
    
88
    mw_sleep = SRS_PERF_MW_SLEEP;
89
    mw_enabled = false;
90
    realtime = SRS_PERF_MIN_LATENCY_ENABLED;
91 92 93 94
    
    _srs_config->subscribe(this);
}
95
SrsRtmpConn::~SrsRtmpConn()
96 97 98 99 100 101 102 103 104
{
    _srs_config->unsubscribe(this);
    
    srs_freep(req);
    srs_freep(res);
    srs_freep(rtmp);
    srs_freep(skt);
    srs_freep(refer);
    srs_freep(bandwidth);
105
    srs_freep(kbps);
106 107
}
108 109 110 111 112
void SrsRtmpConn::kbps_resample()
{
    kbps->sample();
}
113
// TODO: return detail message when error for client.
114
int SrsRtmpConn::do_cycle()
115 116 117
{
    int ret = ERROR_SUCCESS;
    
winlin authored
118
    srs_trace("RTMP client ip=%s", ip.c_str());
119
winlin authored
120 121
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    
    if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
        srs_error("rtmp handshake failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp handshake success");
    
    if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
        srs_error("rtmp connect vhost/app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp connect app success");
    
    // discovery vhost, resolve the vhost from config
    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
    if (parsed_vhost) {
        req->vhost = parsed_vhost->arg0();
    }
    
    srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
    
    if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
        ret = ERROR_RTMP_REQ_TCURL;
        srs_error("discovery tcUrl failed. "
            "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
            req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
        return ret;
    }
    
    // check vhost
    if ((ret = check_vhost()) != ERROR_SUCCESS) {
        srs_error("check vhost failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check vhost success.");
    
winlin authored
159
    srs_trace("connect app, "
160
        "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 
161 162
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
163
        req->app.c_str(), (req->args? "(obj)":"null"));
164
    
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
            srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    
194
    ret = service_cycle();
195
    http_hooks_on_close();
196 197 198 199
    
    return ret;
}
200
int SrsRtmpConn::on_reload_vhost_removed(string vhost)
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
{
    int ret = ERROR_SUCCESS;
    
    if (req->vhost != vhost) {
        return ret;
    }
    
    // if the vhost connected is removed, disconnect the client.
    srs_trace("vhost %s removed/disabled, close client url=%s", 
        vhost.c_str(), req->get_stream_url().c_str());
        
    srs_close_stfd(stfd);
    
    return ret;
}
216
217
int SrsRtmpConn::on_reload_vhost_mw(string vhost)
218
{
219 220 221 222 223 224
    int ret = ERROR_SUCCESS;
    
    if (req->vhost != vhost) {
        return ret;
    }
    
225 226 227 228
    int sleep_ms = _srs_config->get_mw_sleep_ms(req->vhost);
    
    // when mw_sleep changed, resize the socket send buffer.
    change_mw_sleep(sleep_ms);
229
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
    return ret;
}

int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
{
    int ret = ERROR_SUCCESS;
    
    if (req->vhost != vhost) {
        return ret;
    }
    
    bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost);
    srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
    realtime = realtime_enabled;

    return ret;
246 247
}
248 249 250 251 252 253 254 255 256
int64_t SrsRtmpConn::get_send_bytes_delta()
{
    return kbps->get_send_bytes_delta();
}

int64_t SrsRtmpConn::get_recv_bytes_delta()
{
    return kbps->get_recv_bytes_delta();
}
257
    
258
int SrsRtmpConn::service_cycle()
259 260 261
{    
    int ret = ERROR_SUCCESS;
    
262
    if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) {
263 264 265 266 267
        srs_error("set window acknowledgement size failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("set window acknowledgement size success");
        
268
    if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) {
269 270 271 272 273
        srs_error("set peer bandwidth failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("set peer bandwidth success");
274 275 276
    // get the ip which client connected.
    std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
    
277 278
    // do bandwidth test if connect to the vhost which is for bandwidth check.
    if (_srs_config->get_bw_check_enabled(req->vhost)) {
279
        return bandwidth->bandwidth_check(rtmp, skt, req, local_ip);
280 281
    }
    
tufang14 authored
282
    // do token traverse before serve it.
winlin authored
283
    // @see https://github.com/winlinvip/simple-rtmp-server/pull/239
tufang14 authored
284 285 286 287 288 289 290 291 292
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
    if (vhost_is_edge && edge_traverse) {
        if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
            srs_warn("token auth failed, ret=%d", ret);
            return ret;
        }
    }
    
winlin authored
293
    // response the client connect ok.
294
    if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
        srs_error("response connect app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("response connect app success");
        
    if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) {
        srs_error("on_bw_done failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("on_bw_done success");
    
    while (true) {
        ret = stream_service_cycle();
        
        // stream service must terminated with error, never success.
        srs_assert(ret != ERROR_SUCCESS);
        
        // when not system control error, fatal error, return.
        if (!srs_is_system_control_error(ret)) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("stream service cycle failed. ret=%d", ret);
            }
            return ret;
        }
        
        // for republish, continue service
        if (ret == ERROR_CONTROL_REPUBLISH) {
            // set timeout to a larger value, wait for encoder to republish.
            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
            
            srs_trace("control message(unpublish) accept, retry stream service.");
            continue;
        }
        
        // for "some" system control error, 
        // logical accept and retry stream service.
        if (ret == ERROR_CONTROL_RTMP_CLOSE) {
winlin authored
333 334
            // TODO: FIXME: use ping message to anti-death of socket.
            // @see: https://github.com/winlinvip/simple-rtmp-server/issues/39
335 336 337 338 339 340 341 342 343 344 345 346 347 348
            // set timeout to a larger value, for user paused.
            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
            
            srs_trace("control message(close) accept, retry stream service.");
            continue;
        }
        
        // for other system control message, fatal error.
        srs_error("control message(%d) reject as error. ret=%d", ret, ret);
        return ret;
    }
}
349
int SrsRtmpConn::stream_service_cycle()
350 351 352
{
    int ret = ERROR_SUCCESS;
        
353
    SrsRtmpConnType type;
354
    if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
winlin authored
355 356 357
        if (!srs_is_client_gracefully_close(ret)) {
            srs_error("identify client failed. ret=%d", ret);
        }
358 359
        return ret;
    }
360
    req->strip();
winlin authored
361
    srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", 
362
        srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
363 364

    // client is identified, set the timeout to service timeout.
winlin authored
365 366
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
367 368 369 370 371 372 373
    
    // set chunk size to larger.
    int chunk_size = _srs_config->get_chunk_size(req->vhost);
    if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
        srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
        return ret;
    }
374
    srs_info("set chunk_size=%d success", chunk_size);
375
    
376 377
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    
378
    // find a source to serve.
379 380 381 382
    SrsSource* source = NULL;
    if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) {
        return ret;
    }
383 384
    srs_assert(source != NULL);
    
385 386 387 388 389
    // check ASAP, to fail it faster if invalid.
    if (type != SrsRtmpConnPlay && !vhost_is_edge) {
        // check publish available
        // for edge, never check it, for edge use proxy mode.
        if (!source->can_publish()) {
390 391 392 393 394 395 396
            ret = ERROR_SYSTEM_STREAM_BUSY;
            srs_warn("stream %s is already publishing. ret=%d", 
                req->get_stream_url().c_str(), ret);
            // to delay request
            st_usleep(SRS_STREAM_BUSY_SLEEP_US);
            return ret;
        }
397 398 399
    }
    
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
400
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
401 402
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 
        source->source_id(), source->source_id());
403 404 405
    source->set_cache(enabled_cache);
    
    switch (type) {
406
        case SrsRtmpConnPlay: {
407 408
            srs_verbose("start to play stream %s.", req->stream.c_str());
            
winlin authored
409
            if (vhost_is_edge) {
410
                // notice edge to start for the first client.
411 412
                if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) {
                    srs_error("notice edge start play stream failed. ret=%d", ret);
winlin authored
413 414 415 416
                    return ret;
                }
            }
            
417
            // response connection start play
418 419 420 421
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to play stream failed. ret=%d", ret);
                return ret;
            }
422
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
423 424 425
                srs_error("http hook on_play failed. ret=%d", ret);
                return ret;
            }
426
            
427 428
            srs_info("start to play stream %s success", req->stream.c_str());
            ret = playing(source);
429
            http_hooks_on_stop();
430
            
431 432
            return ret;
        }
433
        case SrsRtmpConnFMLEPublish: {
434 435
            srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
            
winlin authored
436 437 438 439 440 441 442
            if (vhost_is_edge) {
                if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
                    srs_error("notice edge start publish stream failed. ret=%d", ret);
                    return ret;
                }
            }
            
443 444 445 446
            if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
447
            
448 449 450 451
            if (!vhost_is_edge) {
                if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
                    return ret;
                }
452
            }
453
            
454
            ret = fmle_publishing(source);
455 456 457
            
            if (!vhost_is_edge) {
                source->release_publish();
458
            }
459
            
460 461
            return ret;
        }
462
        case SrsRtmpConnFlashPublish: {
463 464
            srs_verbose("flash start to publish stream %s.", req->stream.c_str());
            
465 466 467 468 469 470 471
            if (vhost_is_edge) {
                if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
                    srs_error("notice edge start publish stream failed. ret=%d", ret);
                    return ret;
                }
            }
            
472 473 474 475
            if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("flash start to publish stream failed. ret=%d", ret);
                return ret;
            }
476
            
477 478 479 480
            if (!vhost_is_edge) {
                if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
                    return ret;
                }
481
            }
482
            
483
            ret = flash_publishing(source);
484 485 486
            
            if (!vhost_is_edge) {
                source->release_publish();
487 488
            }
            
489 490 491 492 493 494 495 496
            return ret;
        }
        default: {
            ret = ERROR_SYSTEM_CLIENT_INVALID;
            srs_info("invalid client type=%d. ret=%d", type, ret);
            return ret;
        }
    }
497
498 499 500
    return ret;
}
501
int SrsRtmpConn::check_vhost()
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(req != NULL);
    
    SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost);
    if (vhost == NULL) {
        ret = ERROR_RTMP_VHOST_NOT_FOUND;
        srs_error("vhost %s not found. ret=%d", req->vhost.c_str(), ret);
        return ret;
    }
    
    if (!_srs_config->get_vhost_enabled(req->vhost)) {
        ret = ERROR_RTMP_VHOST_NOT_FOUND;
        srs_error("vhost %s disabled. ret=%d", req->vhost.c_str(), ret);
        return ret;
    }
    
    if (req->vhost != vhost->arg0()) {
        srs_trace("vhost change from %s to %s", req->vhost.c_str(), vhost->arg0().c_str());
        req->vhost = vhost->arg0();
    }
    
    if ((ret = refer->check(req->pageUrl, _srs_config->get_refer(req->vhost))) != ERROR_SUCCESS) {
        srs_error("check refer failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check refer success.");
    
531
    if ((ret = http_hooks_on_connect()) != ERROR_SUCCESS) {
532 533 534 535 536 537
        return ret;
    }
    
    return ret;
}
538
int SrsRtmpConn::playing(SrsSource* source)
539 540 541
{
    int ret = ERROR_SUCCESS;
    
542
    // use isolate thread to recv, 
543
    // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
544
    SrsQueueRecvThread trd(rtmp, SRS_PERF_MW_SLEEP);
545 546 547 548 549 550 551 552 553 554 555 556 557
    
    // start isolate recv thread.
    if ((ret = trd.start()) != ERROR_SUCCESS) {
        srs_error("start isolate recv thread failed. ret=%d", ret);
        return ret;
    }
    
    // delivery messages for clients playing stream.
    ret = do_playing(source, &trd);
    
    // stop isolate recv thread
    trd.stop();
    
558 559 560 561 562
    // warn for the message is dropped.
    if (!trd.empty()) {
        srs_warn("drop the received %d messages", trd.size());
    }
    
563 564 565
    return ret;
}
566
int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
567 568 569
{
    int ret = ERROR_SUCCESS;
    
570 571 572 573 574 575 576 577 578 579 580 581 582
    if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) {
        srs_error("check play_refer failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check play_refer success.");
    
    SrsConsumer* consumer = NULL;
    if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
        srs_error("create consumer failed. ret=%d", ret);
        return ret;
    }
    
    srs_assert(consumer != NULL);
583
    SrsAutoFree(SrsConsumer, consumer);
584
    trd->set_consumer(consumer);
585 586
    srs_verbose("consumer created success.");
    
587 588
    // initialize other components
    SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
589
    SrsMessageArray msgs(SRS_PERF_MW_MSGS);
590
    bool user_specified_duration_to_stop = (req->duration > 0);
591
    int64_t starttime = -1;
592
    
593 594
    // setup the realtime.
    realtime = _srs_config->get_realtime_enabled(req->vhost);
595 596 597 598 599
    // setup the mw config.
    // when mw_sleep changed, resize the socket send buffer.
    mw_enabled = true;
    change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
    
600
    while (true) {
601
        // to use isolate thread to recv, can improve about 33% performance.
winlin authored
602
        // @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
603 604
        // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
        while (!trd->empty()) {
605
            SrsCommonMessage* msg = trd->pump();
606
            srs_verbose("pump client message to process.");
607
            
608
            if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
609
                if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
610
                    srs_error("process play control message failed. ret=%d", ret);
611 612 613 614 615
                }
                return ret;
            }
        }
        
616 617 618 619 620 621 622 623
        // quit when recv thread error.
        if ((ret = trd->error_code()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("recv thread failed. ret=%d", ret);
            }
            return ret;
        }
        
624 625 626
        // collect elapse for pithy print.
        pithy_print.elapse();
        
627
#ifdef SRS_PERF_QUEUE_COND_WAIT
628 629 630
        // for send wait time debug
        srs_verbose("send thread now=%"PRId64"us, wait %dms", srs_update_system_time_ms(), mw_sleep);
        
631 632
        // wait for message to incoming.
        // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
633 634 635 636 637 638 639 640
        // @see https://github.com/winlinvip/simple-rtmp-server/issues/257
        if (realtime) {
            // for realtime, min required msgs is 0, send when got one+ msgs.
            consumer->wait(0, mw_sleep);
        } else {
            // for no-realtime, got some msgs then send.
            consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
        }
641 642 643
        
        // for send wait time debug
        srs_verbose("send thread now=%"PRId64"us wakeup", srs_update_system_time_ms());
644 645
#endif
        
646
        // get messages from consumer.
647
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
648
        int count = 0;
649
        if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
650 651 652 653 654 655 656 657
            srs_error("get messages from consumer failed. ret=%d", ret);
            return ret;
        }

        // reportable
        if (pithy_print.can_print()) {
            kbps->sample();
            srs_trace("-> "SRS_CONSTS_LOG_PLAY
658
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
659 660
                pithy_print.age(), count,
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
661 662
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
                mw_sleep
663
            );
664 665
        }
        
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
        // we use wait timeout to get messages,
        // for min latency event no message incoming,
        // so the count maybe zero.
        if (count > 0) {
            srs_verbose("mw wait %dms and got %d msgs %d(%"PRId64"-%"PRId64")ms", 
                mw_sleep, count, 
                (count > 0? msgs.msgs[count - 1]->timestamp - msgs.msgs[0]->timestamp : 0),
                (count > 0? msgs.msgs[0]->timestamp : 0), 
                (count > 0? msgs.msgs[count - 1]->timestamp : 0));
        }
        
        if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
            srs_info("mw sleep %dms for no msg", mw_sleep);
            st_usleep(mw_sleep * 1000);
#else
            srs_verbose("mw wait %dms and got nothing.", mw_sleep);
#endif
            // ignore when nothing got.
            continue;
        }
        srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
        
689 690 691 692
        // only when user specifies the duration, 
        // we start to collect the durations for each message.
        if (user_specified_duration_to_stop) {
            for (int i = 0; i < count; i++) {
693
                SrsSharedPtrMessage* msg = msgs.msgs[i];
694
                
695 696
                // foreach msg, collect the duration.
                // @remark: never use msg when sent it, for the protocol sdk will free it.
697 698
                if (starttime < 0 || starttime > msg->timestamp) {
                    starttime = msg->timestamp;
699
                }
700 701
                duration += msg->timestamp - starttime;
                starttime = msg->timestamp;
702
            }
703 704
        }
        
705
        // sendout messages, all messages are freed by send_and_free_messages().
706 707 708 709
        // no need to assert msg, for the rtmp will assert it.
        if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("send messages to client failed. ret=%d", ret);
710
            }
711
            return ret;
712 713 714 715
        }
        
        // if duration specified, and exceed it, stop play live.
        // @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
716 717 718 719 720 721
        if (user_specified_duration_to_stop) {
            if (duration >= (int64_t)req->duration) {
                ret = ERROR_RTMP_DURATION_EXCEED;
                srs_trace("stop live for duration exceed. ret=%d", ret);
                return ret;
            }
722 723 724 725 726 727
        }
    }
    
    return ret;
}
728
int SrsRtmpConn::fmle_publishing(SrsSource* source)
729 730 731
{
    int ret = ERROR_SUCCESS;
    
732 733 734 735 736 737 738
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
            
    if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
        srs_error("http hook on_publish failed. ret=%d", ret);
        return ret;
    }
739 740
    // use isolate thread to recv,
    // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
741 742
    SrsPublishRecvThread trd(rtmp, req, 
        st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
743
744
    srs_info("start to publish stream %s success", req->stream.c_str());
745 746 747 748
    ret = do_publishing(source, &trd);

    // stop isolate recv thread
    trd.stop();
749 750 751 752 753 754 755 756 757 758 759 760 761 762

    // when edge, notice edge to change state.
    // when origin, notice all service to unpublish.
    if (vhost_is_edge) {
        source->on_edge_proxy_unpublish();
    } else {
        source->on_unpublish();
    }

    http_hooks_on_unpublish();
    
    return ret;
}
763
int SrsRtmpConn::flash_publishing(SrsSource* source)
764 765
{
    int ret = ERROR_SUCCESS;
766
767
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
768
769 770 771 772
    if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
        srs_error("http hook on_publish failed. ret=%d", ret);
        return ret;
    }
773 774 775

    // use isolate thread to recv,
    // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
776 777
    SrsPublishRecvThread trd(rtmp, req, 
        st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
778 779 780 781 782 783

    srs_info("start to publish stream %s success", req->stream.c_str());
    ret = do_publishing(source, &trd);

    // stop isolate recv thread
    trd.stop();
784 785 786 787 788 789 790 791

    // when edge, notice edge to change state.
    // when origin, notice all service to unpublish.
    if (vhost_is_edge) {
        source->on_edge_proxy_unpublish();
    } else {
        source->on_unpublish();
    }
792
793
    http_hooks_on_unpublish();
794
795 796 797
    return ret;
}
798
int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
799 800
{
    int ret = ERROR_SUCCESS;
801
802
    if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
803
        srs_error("check publish_refer failed. ret=%d", ret);
804 805
        return ret;
    }
806 807
    srs_verbose("check publish_refer success.");
winlin authored
808
    SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER);
809
810
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
811
812
    // when edge, ignore the publish event, directly proxy it.
813
    if (!vhost_is_edge) {
814 815
        // notify the hls to prepare when publish start.
        if ((ret = source->on_publish()) != ERROR_SUCCESS) {
816
            srs_error("hls on_publish failed. ret=%d", ret);
817 818
            return ret;
        }
819
        srs_verbose("hls on_publish success.");
820
    }
821 822 823 824 825 826 827 828

    // start isolate recv thread.
    if ((ret = trd->start()) != ERROR_SUCCESS) {
        srs_error("start isolate recv thread failed. ret=%d", ret);
        return ret;
    }

    int64_t nb_msgs = 0;
829
    while (true) {
830 831
        // cond wait for error.
        trd->wait(SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000);
832
833 834 835 836
        // check the thread error code.
        if ((ret = trd->error_code()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("recv thread failed. ret=%d", ret);
837
            }
838
            return ret;
839 840
        }
841 842 843 844 845 846 847 848 849
        // when not got any messages, timeout.
        if (trd->nb_msgs() <= nb_msgs) {
            ret = ERROR_SOCKET_TIMEOUT;
            srs_warn("publish timeout %"PRId64"us, nb_msgs=%"PRId64", ret=%d",
                     SRS_CONSTS_RTMP_RECV_TIMEOUT_US, nb_msgs, ret);
            break;
        }
        nb_msgs = trd->nb_msgs();
850
        pithy_print.elapse();
851 852 853

        // reportable
        if (pithy_print.can_print()) {
854
            kbps->sample();
855 856
            bool mr = _srs_config->get_mr_enabled(req->vhost);
            int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
857
            srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
858
                " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pithy_print.age(),
859
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
860 861 862
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
                mr, mr_sleep
            );
863
        }
864 865 866 867 868
    }

    return ret;
}
869
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
870 871
{
    int ret = ERROR_SUCCESS;
872
    
873 874 875 876 877 878 879 880 881 882 883 884
    // process publish event.
    if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
        SrsPacket* pkt = NULL;
        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("fmle decode unpublish message failed. ret=%d", ret);
            return ret;
        }

        SrsAutoFree(SrsPacket, pkt);

        // for flash, any packet is republish.
        if (!is_fmle) {
885 886 887 888 889 890
            // flash unpublish.
            // TODO: maybe need to support republish.
            srs_trace("flash flash publish finished.");
            return ERROR_CONTROL_REPUBLISH;
        }
891 892 893 894 895 896 897
        // for fmle, drop others except the fmle start packet.
        if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
            SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
            if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
                return ret;
            }
            return ERROR_CONTROL_REPUBLISH;
898
        }
899 900 901 902 903 904 905 906 907

        srs_trace("fmle ignore AMF0/AMF3 command message.");
        return ret;
    }

    // video, audio, data message
    if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {
        srs_error("fmle process publish message failed. ret=%d", ret);
        return ret;
908 909 910 911 912
    }
    
    return ret;
}
913
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
914 915 916
{
    int ret = ERROR_SUCCESS;
    
917 918
    // for edge, directly proxy message to origin.
    if (vhost_is_edge) {
919 920 921 922 923
        if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
            srs_error("edge publish proxy msg failed. ret=%d", ret);
            return ret;
        }
        return ret;
924 925
    }
    
926 927 928 929 930 931
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
winlin authored
932
        return ret;
933 934 935 936 937 938 939
    }
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
winlin authored
940
        return ret;
941 942
    }
    
943 944 945 946 947 948 949 950 951
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
952 953
    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
954
        SrsPacket* pkt = NULL;
955
        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
956 957 958
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
959
        SrsAutoFree(SrsPacket, pkt);
960 961 962 963 964 965 966
    
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
967
            srs_info("process onMetaData message success.");
968 969 970
            return ret;
        }
        
971
        srs_info("ignore AMF0/AMF3 data message.");
972 973 974 975 976 977
        return ret;
    }
    
    return ret;
}
978
int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
979 980 981 982 983 984 985
{
    int ret = ERROR_SUCCESS;
    
    if (!msg) {
        srs_verbose("ignore all empty message.");
        return ret;
    }
986
    SrsAutoFree(SrsCommonMessage, msg);
987 988 989 990 991 992
    
    if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
        srs_info("ignore all message except amf0/amf3 command.");
        return ret;
    }
    
993
    SrsPacket* pkt = NULL;
994
    if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
995 996 997 998 999
        srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret);
        return ret;
    }
    srs_info("decode the amf0/amf3 command packet success.");
    
1000
    SrsAutoFree(SrsPacket, pkt);
1001
    
1002 1003
    // for jwplayer/flowplayer, which send close as pause message.
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/6
1004
    SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);
1005 1006 1007 1008 1009 1010
    if (close) {
        ret = ERROR_CONTROL_RTMP_CLOSE;
        srs_trace("system control message: rtmp close stream. ret=%d", ret);
        return ret;
    }
    
1011 1012 1013
    // call msg,
    // support response null first,
    // @see https://github.com/winlinvip/simple-rtmp-server/issues/106
1014
    // TODO: FIXME: response in right way, or forward in edge mode.
1015
    SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
winlin authored
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
    if (call) {
        // only response it when transaction id not zero,
        // for the zero means donot need response.
        if (call->transaction_id > 0) {
            SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
            res->command_object = SrsAmf0Any::null();
            res->response = SrsAmf0Any::null();
            if ((ret = rtmp->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
                srs_warn("response call failed. ret=%d", ret);
                return ret;
            }
1027 1028 1029 1030 1031
        }
        return ret;
    }
    
    // pause or other msg.
1032
    SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
    if (!pause) {
        srs_info("ignore all amf0/amf3 command except pause.");
        return ret;
    }
    
    if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
        srs_error("rtmp process play client pause failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
        srs_error("consumer process play client pause failed. ret=%d", ret);
        return ret;
    }
    srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
    
    return ret;
}
1052 1053 1054 1055 1056 1057
void SrsRtmpConn::change_mw_sleep(int sleep_ms)
{
    if (!mw_enabled) {
        return;
    }
    
1058 1059 1060 1061 1062 1063 1064
    // get the sock buffer size.
    int fd = st_netfd_fileno(stfd);
    int onb_sbuf = 0;
    socklen_t sock_buf_size = sizeof(int);
    getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);
    
#ifdef SRS_PERF_MW_SO_SNDBUF    
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    // the bytes:
    //      4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
    //      128KB=131072, 256KB=262144, 512KB=524288
    // the buffer should set to sleep*kbps/8,
    // for example, your system delivery stream in 1000kbps,
    // sleep 800ms for small bytes, the buffer should set to:
    //      800*1000/8=100000B(about 128KB).
    // other examples:
    //      2000*3000/8=750000B(about 732KB).
    //      2000*5000/8=1250000B(about 1220KB).
    int kbps = 5000;
    int socket_buffer_size = sleep_ms * kbps / 8;

    // socket send buffer, system will double it.
    int nb_sbuf = socket_buffer_size / 2;
    
    // set the socket send buffer when required larger buffer
    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, sock_buf_size) < 0) {
        srs_warn("set sock SO_SENDBUF=%d failed.", nb_sbuf);
    }
    getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size);
    
1087
    srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d, realtime=%d", 
1088
        mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size,
1089
        onb_sbuf, nb_sbuf, realtime);
1090
#else
1091 1092
    srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d, realtime=%d", 
        mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf, realtime);
1093
#endif
1094 1095 1096 1097
        
    mw_sleep = sleep_ms;
}
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
int SrsRtmpConn::check_edge_token_traverse_auth()
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(req);
    
    st_netfd_t stsock = NULL;
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
    for (int i = 0; i < (int)conf->args.size(); i++) {
        if ((ret = connect_server(i, &stsock)) == ERROR_SUCCESS) {
            break;
        }
    }
    if (ret != ERROR_SUCCESS) {
        srs_warn("token traverse connect failed. ret=%d", ret);
        return ret;
    }
    
    srs_assert(stsock);
1117
    SrsStSocket* io = new SrsStSocket(stsock);
1118 1119
    SrsRtmpClient* client = new SrsRtmpClient(io);
    
1120
    ret = do_token_traverse_auth(client);
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139

    srs_freep(client);
    srs_freep(io);
    srs_close_stfd(stsock);

    return ret;
}

int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
{
    int ret = ERROR_SUCCESS;
    
    SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
    srs_assert(conf);
    
    // select the origin.
    std::string server = conf->args.at(origin_index % conf->args.size());
    origin_index = (origin_index + 1) % conf->args.size();
    
1140 1141
    std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
    int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
1142 1143 1144 1145 1146 1147 1148 1149
    size_t pos = server.find(":");
    if (pos != std::string::npos) {
        s_port = server.substr(pos + 1);
        server = server.substr(0, pos);
        port = ::atoi(s_port.c_str());
    }
    
    // open socket.
1150 1151 1152 1153 1154
    st_netfd_t stsock = NULL;
    int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US;
    if ((ret = srs_socket_connect(server, port, timeout, &stsock)) != ERROR_SUCCESS) {
        srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
            req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
1155 1156 1157 1158 1159 1160 1161 1162
        return ret;
    }
    srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);
    
    *pstsock = stsock;
    return ret;
}
1163
int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
1164 1165 1166 1167 1168
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(client);
winlin authored
1169 1170
    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
1171 1172 1173 1174 1175
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
1176 1177 1178
    
    // for token tranverse, always take the debug info(which carries token).
    if ((ret = client->connect_app(req->app, req->tcUrl, req, true)) != ERROR_SUCCESS) {
1179 1180 1181 1182 1183 1184 1185 1186 1187
        srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret);
        return ret;
    }
    
    srs_trace("edge token auth ok, tcUrl=%s", req->tcUrl.c_str());
    
    return ret;
}
1188
int SrsRtmpConn::http_hooks_on_connect()
1189 1190 1191
{
    int ret = ERROR_SUCCESS;
    
1192
#ifdef SRS_AUTO_HTTP_CALLBACK
1193
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1194 1195 1196 1197
        // HTTP: on_connect 
        SrsConfDirective* on_connect = _srs_config->get_vhost_on_connect(req->vhost);
        if (!on_connect) {
            srs_info("ignore the empty http callback: on_connect");
1198 1199
            return ret;
        }
1200 1201 1202 1203 1204 1205 1206 1207 1208
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_connect->args.size(); i++) {
            std::string url = on_connect->args.at(i);
            if ((ret = SrsHttpHooks::on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) {
                srs_error("hook client on_connect failed. url=%s, ret=%d", url.c_str(), ret);
                return ret;
            }
        }
1209 1210 1211 1212 1213 1214
    }
#endif

    return ret;
}
1215
void SrsRtmpConn::http_hooks_on_close()
1216
{
1217
#ifdef SRS_AUTO_HTTP_CALLBACK
1218
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231
        // whatever the ret code, notify the api hooks.
        // HTTP: on_close 
        SrsConfDirective* on_close = _srs_config->get_vhost_on_close(req->vhost);
        if (!on_close) {
            srs_info("ignore the empty http callback: on_close");
            return;
        }
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_close->args.size(); i++) {
            std::string url = on_close->args.at(i);
            SrsHttpHooks::on_close(url, connection_id, ip, req);
        }
1232 1233 1234 1235
    }
#endif
}
1236
int SrsRtmpConn::http_hooks_on_publish()
1237 1238 1239
{
    int ret = ERROR_SUCCESS;
    
1240
#ifdef SRS_AUTO_HTTP_CALLBACK
1241
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1242 1243 1244 1245
        // HTTP: on_publish 
        SrsConfDirective* on_publish = _srs_config->get_vhost_on_publish(req->vhost);
        if (!on_publish) {
            srs_info("ignore the empty http callback: on_publish");
1246 1247
            return ret;
        }
1248 1249 1250 1251 1252 1253 1254 1255 1256
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_publish->args.size(); i++) {
            std::string url = on_publish->args.at(i);
            if ((ret = SrsHttpHooks::on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) {
                srs_error("hook client on_publish failed. url=%s, ret=%d", url.c_str(), ret);
                return ret;
            }
        }
1257 1258 1259 1260 1261 1262
    }
#endif

    return ret;
}
1263
void SrsRtmpConn::http_hooks_on_unpublish()
1264
{
1265
#ifdef SRS_AUTO_HTTP_CALLBACK
1266
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279
        // whatever the ret code, notify the api hooks.
        // HTTP: on_unpublish 
        SrsConfDirective* on_unpublish = _srs_config->get_vhost_on_unpublish(req->vhost);
        if (!on_unpublish) {
            srs_info("ignore the empty http callback: on_unpublish");
            return;
        }
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_unpublish->args.size(); i++) {
            std::string url = on_unpublish->args.at(i);
            SrsHttpHooks::on_unpublish(url, connection_id, ip, req);
        }
1280 1281 1282 1283
    }
#endif
}
1284
int SrsRtmpConn::http_hooks_on_play()
1285 1286 1287
{
    int ret = ERROR_SUCCESS;
    
1288
#ifdef SRS_AUTO_HTTP_CALLBACK
1289
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1290 1291 1292 1293
        // HTTP: on_play 
        SrsConfDirective* on_play = _srs_config->get_vhost_on_play(req->vhost);
        if (!on_play) {
            srs_info("ignore the empty http callback: on_play");
1294 1295
            return ret;
        }
1296 1297 1298 1299 1300 1301 1302 1303 1304
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_play->args.size(); i++) {
            std::string url = on_play->args.at(i);
            if ((ret = SrsHttpHooks::on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) {
                srs_error("hook client on_play failed. url=%s, ret=%d", url.c_str(), ret);
                return ret;
            }
        }
1305 1306 1307 1308 1309 1310
    }
#endif

    return ret;
}
1311
void SrsRtmpConn::http_hooks_on_stop()
1312
{
1313
#ifdef SRS_AUTO_HTTP_CALLBACK
1314
    if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
        // whatever the ret code, notify the api hooks.
        // HTTP: on_stop 
        SrsConfDirective* on_stop = _srs_config->get_vhost_on_stop(req->vhost);
        if (!on_stop) {
            srs_info("ignore the empty http callback: on_stop");
            return;
        }
        
        int connection_id = _srs_context->get_id();
        for (int i = 0; i < (int)on_stop->args.size(); i++) {
            std::string url = on_stop->args.at(i);
            SrsHttpHooks::on_stop(url, connection_id, ip, req);
        }
1328 1329 1330 1331 1332
    }
#endif

    return;
}
1333