Blame view

trunk/src/app/srs_app_server.cpp 27.5 KB
winlin authored
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2014 winlin
winlin authored
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_server.hpp>
winlin authored
25 26 27 28 29

#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <signal.h>
winlin authored
30 31 32
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
winlin authored
33 34 35

#include <algorithm>
36
#include <srs_kernel_log.hpp>
37
#include <srs_kernel_error.hpp>
38
#include <srs_app_rtmp_conn.hpp>
39
#include <srs_app_config.hpp>
40
#include <srs_kernel_utility.hpp>
41 42
#include <srs_app_http_api.hpp>
#include <srs_app_http_conn.hpp>
winlin authored
43
#include <srs_app_ingest.hpp>
44
#include <srs_app_source.hpp>
45
#include <srs_app_utility.hpp>
46
#include <srs_app_heartbeat.hpp>
winlin authored
47
48 49 50
// signal defines.
#define SIGNAL_RELOAD SIGHUP
winlin authored
51
// nginx also set to 512
52
#define SERVER_LISTEN_BACKLOG 512
winlin authored
53
54
// system interval in ms,
winlin authored
55 56 57 58
// all resolution times should be times togother,
// for example, system-time is 3(300ms),
// then rusage can be 3*x, for instance, 3*10=30(3s),
// the meminfo canbe 30*x, for instance, 30*2=60(6s)
59
// for performance refine, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
60
// @remark, recomment to 1000ms.
61
#define SRS_SYS_CYCLE_INTERVAL 1000
winlin authored
62 63 64

// update time interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES
65
// @see SYS_TIME_RESOLUTION_US
winlin authored
66 67 68 69
#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 3

// update rusage interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES
70
#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3
winlin authored
71
72 73
// update network devices info interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES
74
#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3
75
winlin authored
76 77
// update rusage interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES
78
#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3
winlin authored
79
80 81
// update the disk iops interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES
82
#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6
83
winlin authored
84 85
// update rusage interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES
86
#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6
winlin authored
87
88 89
// update platform info interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES
90
#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9
91
92 93
// update network devices info interval:
//      SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
94
#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9
95
96
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
winlin authored
97
{
98 99 100
    fd = -1;
    stfd = NULL;
    
101 102 103
    _port = 0;
    _server = server;
    _type = type;
104
105
    pthread = new SrsThread(this, 0, true);
winlin authored
106 107 108 109
}

SrsListener::~SrsListener()
{
110 111 112 113 114 115 116 117
    srs_close_stfd(stfd);
    
    pthread->stop();
    srs_freep(pthread);
    
    // st does not close it sometimes, 
    // close it manually.
    close(fd);
winlin authored
118 119
}
120 121 122 123 124 125
SrsListenerType SrsListener::type()
{
    return _type;
}

int SrsListener::listen(int port)
winlin authored
126
{
127 128
    int ret = ERROR_SUCCESS;
    
129
    _port = port;
130 131
    
    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
winlin authored
132
        ret = ERROR_SOCKET_CREATE;
winlin authored
133
        srs_error("create linux socket error. port=%d, ret=%d", port, ret);
winlin authored
134
        return ret;
135
    }
winlin authored
136
    srs_verbose("create linux socket success. port=%d, fd=%d", port, fd);
winlin authored
137 138 139 140
    
    int reuse_socket = 1;
    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
        ret = ERROR_SOCKET_SETREUSE;
winlin authored
141
        srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
winlin authored
142 143
        return ret;
    }
winlin authored
144
    srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd);
winlin authored
145 146 147
    
    sockaddr_in addr;
    addr.sin_family = AF_INET;
148
    addr.sin_port = htons(_port);
winlin authored
149 150 151
    addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
        ret = ERROR_SOCKET_BIND;
winlin authored
152
        srs_error("bind socket error. port=%d, ret=%d", port, ret);
winlin authored
153 154
        return ret;
    }
winlin authored
155
    srs_verbose("bind socket success. port=%d, fd=%d", port, fd);
winlin authored
156 157 158
    
    if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
        ret = ERROR_SOCKET_LISTEN;
winlin authored
159
        srs_error("listen socket error. port=%d, ret=%d", port, ret);
winlin authored
160 161
        return ret;
    }
winlin authored
162
    srs_verbose("listen socket success. port=%d, fd=%d", port, fd);
winlin authored
163 164 165
    
    if ((stfd = st_netfd_open_socket(fd)) == NULL){
        ret = ERROR_ST_OPEN_SOCKET;
winlin authored
166
        srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret);
winlin authored
167 168
        return ret;
    }
winlin authored
169
    srs_verbose("st open socket success. port=%d, fd=%d", port, fd);
winlin authored
170
    
171
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
winlin authored
172
        srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret);
winlin authored
173 174
        return ret;
    }
winlin authored
175
    srs_verbose("create st listen thread success, port=%d", port);
winlin authored
176
    
winlin authored
177
    srs_trace("listen thread cid=%d, current_cid=%d, "
winlin authored
178 179
        "listen at port=%d, type=%d, fd=%d started success, port=%d", 
        pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
180 181
    
    return ret;
winlin authored
182 183
}
184
void SrsListener::on_thread_start()
winlin authored
185
{
186
    srs_trace("listen cycle start, port=%d, type=%d, fd=%d", _port, _type, fd);
winlin authored
187 188
}
189
int SrsListener::cycle()
winlin authored
190
{
191 192
    int ret = ERROR_SUCCESS;
    
193 194 195 196
    st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
    
    if(client_stfd == NULL){
        // ignore error.
197
        srs_error("ignore accept thread stoppped for accept client error");
198 199 200
        return ret;
    }
    srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
201
    
202
    if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) {
203 204 205 206 207
        srs_warn("accept client error. ret=%d", ret);
        return ret;
    }
    
    return ret;
winlin authored
208 209
}
210 211 212 213 214 215 216 217
SrsSignalManager* SrsSignalManager::instance = NULL;

SrsSignalManager::SrsSignalManager(SrsServer* server)
{
    SrsSignalManager::instance = this;
    
    _server = server;
    sig_pipe[0] = sig_pipe[1] = -1;
218
    pthread = new SrsThread(this, 0, true);
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    signal_read_stfd = NULL;
}

SrsSignalManager::~SrsSignalManager()
{
    pthread->stop();
    srs_freep(pthread);
    
    srs_close_stfd(signal_read_stfd);
    
    if (sig_pipe[0] > 0) {
        ::close(sig_pipe[0]);
    }
    if (sig_pipe[1] > 0) {
        ::close(sig_pipe[1]);
    }
}

int SrsSignalManager::initialize()
{
    int ret = ERROR_SUCCESS;
    return ret;
}

int SrsSignalManager::start()
{
    int ret = ERROR_SUCCESS;
    
    /**
    * Note that if multiple processes are used (see below), 
    * the signal pipe should be initialized after the fork(2) call 
    * so that each process has its own private pipe.
    */
    struct sigaction sa;
    
    /* Create signal pipe */
    if (pipe(sig_pipe) < 0) {
        ret = ERROR_SYSTEM_CREATE_PIPE;
        srs_error("create signal manager pipe failed. ret=%d", ret);
        return ret;
    }
    
    /* Install sig_catcher() as a signal handler */
    sa.sa_handler = SrsSignalManager::sig_catcher;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGNAL_RELOAD, &sa, NULL);
    
    sa.sa_handler = SrsSignalManager::sig_catcher;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGTERM, &sa, NULL);
    
    sa.sa_handler = SrsSignalManager::sig_catcher;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGINT, &sa, NULL);
    
277 278 279 280 281 282 283
    sa.sa_handler = SrsSignalManager::sig_catcher;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGUSR2, &sa, NULL);
    
    srs_trace("signal installed");
    
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
    return pthread->start();
}

int SrsSignalManager::cycle()
{
    int ret = ERROR_SUCCESS;
    
    if (signal_read_stfd == NULL) {
        signal_read_stfd = st_netfd_open(sig_pipe[0]);
    }

    int signo;
    
    /* Read the next signal from the pipe */
    st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);
    
    /* Process signal synchronously */
    _server->on_signal(signo);
    
    return ret;
}

void SrsSignalManager::sig_catcher(int signo)
{
    int err;
    
    /* Save errno to restore it after the write() */
    err = errno;
    
    /* write() is reentrant/async-safe */
    int fd = SrsSignalManager::instance->sig_pipe[1];
    write(fd, &signo, sizeof(int));
    
    errno = err;
}
winlin authored
320 321
SrsServer::SrsServer()
{
322 323
    signal_reload = false;
    signal_gmc_stop = false;
winlin authored
324
    pid_fd = -1;
325
    
326 327
    signal_manager = NULL;
    kbps = NULL;
328
    
329 330 331
    // donot new object in constructor,
    // for some global instance is not ready now,
    // new these objects in initialize instead.
332
#ifdef SRS_AUTO_HTTP_API
333
    http_api_handler = NULL;
winlin authored
334
#endif
335
#ifdef SRS_AUTO_HTTP_SERVER
336
    http_stream_handler = NULL;
winlin authored
337
#endif
338 339 340
#ifdef SRS_AUTO_HTTP_PARSER
    http_heartbeat = NULL;
#endif
341
#ifdef SRS_AUTO_INGEST
342
    ingester = NULL;
343
#endif
winlin authored
344 345 346 347
}

SrsServer::~SrsServer()
{
348 349 350 351 352
    destroy();
}

void SrsServer::destroy()
{
353
    srs_warn("start destroy server");
354
    
355
    _srs_config->unsubscribe(this);
356
    
357 358 359
    close_listeners(SrsListenerRtmpStream);
    close_listeners(SrsListenerHttpApi);
    close_listeners(SrsListenerHttpStream);
360 361 362 363

#ifdef SRS_AUTO_INGEST
    ingester->stop();
#endif
364
    
365
#ifdef SRS_AUTO_HTTP_API
winlin authored
366 367 368
    srs_freep(http_api_handler);
#endif
369
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
370 371
    srs_freep(http_stream_handler);
#endif
372 373 374 375 376

#ifdef SRS_AUTO_HTTP_PARSER
    srs_freep(http_heartbeat);
#endif
377
#ifdef SRS_AUTO_INGEST
378 379
    srs_freep(ingester);
#endif
380 381 382 383 384 385 386
    
    if (pid_fd > 0) {
        ::close(pid_fd);
        pid_fd = -1;
    }
    
    srs_freep(signal_manager);
387
    srs_freep(kbps);
388
    
winlin authored
389 390
    // @remark never destroy the connections, 
    // for it's still alive.
391
winlin authored
392
    // @remark never destroy the source, 
393 394
    // when we free all sources, the fmle publish may retry
    // and segment fault.
winlin authored
395 396 397 398
}

int SrsServer::initialize()
{
399
    int ret = ERROR_SUCCESS;
winlin authored
400
    
401 402 403
    // ensure the time is ok.
    srs_update_system_time_ms();
    
winlin authored
404
    // for the main objects(server, config, log, context),
405 406 407 408 409
    // never subscribe handler in constructor,
    // instead, subscribe handler in initialize method.
    srs_assert(_srs_config);
    _srs_config->subscribe(this);
    
410 411 412 413 414 415 416
    srs_assert(!signal_manager);
    signal_manager = new SrsSignalManager(this);
    
    srs_assert(!kbps);
    kbps = new SrsKbps();
    kbps->set_io(NULL, NULL);
    
417
#ifdef SRS_AUTO_HTTP_API
418 419 420
    srs_assert(!http_api_handler);
    http_api_handler = SrsHttpHandler::create_http_api();
#endif
421
#ifdef SRS_AUTO_HTTP_SERVER
422 423 424
    srs_assert(!http_stream_handler);
    http_stream_handler = SrsHttpHandler::create_http_stream();
#endif
425 426 427 428
#ifdef SRS_AUTO_HTTP_PARSER
    srs_assert(!http_heartbeat);
    http_heartbeat = new SrsHttpHeartbeat();
#endif
429
#ifdef SRS_AUTO_INGEST
430 431 432 433
    srs_assert(!ingester);
    ingester = new SrsIngester();
#endif
    
434
#ifdef SRS_AUTO_HTTP_API
winlin authored
435 436 437 438 439
    if ((ret = http_api_handler->initialize()) != ERROR_SUCCESS) {
        return ret;
    }
#endif
440
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
441 442 443 444 445
    if ((ret = http_stream_handler->initialize()) != ERROR_SUCCESS) {
        return ret;
    }
#endif
446
    return ret;
winlin authored
447 448
}
449 450 451 452 453
int SrsServer::initialize_signal()
{
    return signal_manager->initialize();
}
winlin authored
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
int SrsServer::acquire_pid_file()
{
    int ret = ERROR_SUCCESS;
    
    std::string pid_file = _srs_config->get_pid_file();
    
    // -rw-r--r-- 
    // 644
    int mode = S_IRUSR | S_IWUSR |  S_IRGRP | S_IROTH;
    
    int fd;
    // open pid file
    if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) < 0) {
        ret = ERROR_SYSTEM_PID_ACQUIRE;
        srs_error("open pid file %s error, ret=%#x", pid_file.c_str(), ret);
        return ret;
    }
    
    // require write lock
473
    struct flock lock;
winlin authored
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524

    lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
    lock.l_start = 0; // type offset, relative to l_whence
    lock.l_whence = SEEK_SET;  // SEEK_SET, SEEK_CUR, SEEK_END
    lock.l_len = 0;
    
    if (fcntl(fd, F_SETLK, &lock) < 0) {
        if(errno == EACCES || errno == EAGAIN) {
            ret = ERROR_SYSTEM_PID_ALREADY_RUNNING;
            srs_error("srs is already running! ret=%#x", ret);
            return ret;
        }
        
        ret = ERROR_SYSTEM_PID_LOCK;
        srs_error("require lock for file %s error! ret=%#x", pid_file.c_str(), ret);
        return ret;
    }

    // truncate file
    if (ftruncate(fd, 0) < 0) {
        ret = ERROR_SYSTEM_PID_TRUNCATE_FILE;
        srs_error("truncate pid file %s error! ret=%#x", pid_file.c_str(), ret);
        return ret;
    }

    int pid = (int)getpid();
    
    // write the pid
    char buf[512];
    snprintf(buf, sizeof(buf), "%d", pid);
    if (write(fd, buf, strlen(buf)) != (int)strlen(buf)) {
        ret = ERROR_SYSTEM_PID_WRITE_FILE;
        srs_error("write our pid error! pid=%d file=%s ret=%#x", pid, pid_file.c_str(), ret);
        return ret;
    }

    // auto close when fork child process.
    int val;
    if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
        ret = ERROR_SYSTEM_PID_GET_FILE_INFO;
        srs_error("fnctl F_GETFD error! file=%s ret=%#x", pid_file.c_str(), ret);
        return ret;
    }
    val |= FD_CLOEXEC;
    if (fcntl(fd, F_SETFD, val) < 0) {
        ret = ERROR_SYSTEM_PID_SET_FILE_INFO;
        srs_error("fcntl F_SETFD error! file=%s ret=%#x", pid_file.c_str(), ret);
        return ret;
    }
    
    srs_trace("write pid=%d to %s success!", pid, pid_file.c_str());
winlin authored
525
    pid_fd = fd;
winlin authored
526 527 528 529
    
    return ret;
}
530 531 532 533
int SrsServer::initialize_st()
{
    int ret = ERROR_SUCCESS;
    
534 535 536
    // init st
    if ((ret = srs_init_st()) != ERROR_SUCCESS) {
        srs_error("init st failed. ret=%d", ret);
537 538 539
        return ret;
    }
    
540 541
    // @remark, st alloc segment use mmap, which only support 32757 threads,
    // if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
542
    // TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
543 544 545 546 547 548 549 550
    if (_srs_config->get_max_connections() > 32756) {
        ret = ERROR_ST_EXCEED_THREADS;
        srs_error("st mmap for stack allocation must <= %d threads, "
            "@see Makefile of st for MALLOC_STACK, please build st manually by "
            "\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\", ret=%d", ret);
        return ret;
    }
    
551 552
    // set current log id.
    _srs_context->generate_id();
winlin authored
553
    srs_trace("server main cid=%d", _srs_context->get_id());
554 555 556 557
    
    return ret;
}
winlin authored
558 559
int SrsServer::listen()
{
560 561
    int ret = ERROR_SUCCESS;
    
562 563
    if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
        return ret;
564
    }
565 566 567
    
    if ((ret = listen_http_api()) != ERROR_SUCCESS) {
        return ret;
568 569
    }
    
570 571
    if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
        return ret;
572 573 574
    }
    
    return ret;
winlin authored
575 576
}
577 578 579 580 581 582
int SrsServer::register_signal()
{
    // start signal process thread.
    return signal_manager->start();
}
winlin authored
583
int SrsServer::ingest()
winlin authored
584
{
585 586
    int ret = ERROR_SUCCESS;
    
587
#ifdef SRS_AUTO_INGEST
588 589
    if ((ret = ingester->start()) != ERROR_SUCCESS) {
        srs_error("start ingest streams failed. ret=%d", ret);
590 591
        return ret;
    }
592
#endif
winlin authored
593 594 595 596 597 598 599

    return ret;
}

int SrsServer::cycle()
{
    int ret = ERROR_SUCCESS;
600 601 602

    ret = do_cycle();
603
#ifdef SRS_AUTO_GPERF_MC
604
    destroy();
605
    
winlin authored
606
    // remark, for gmc, never invoke the exit().
607 608 609
    srs_warn("sleep a long time for system st-threads to cleanup.");
    st_usleep(3 * 1000 * 1000);
    srs_warn("system quit");
610 611 612
#else
    srs_warn("main cycle terminated, system quit normally.");
    exit(0);
613 614
#endif
    
615 616 617 618 619 620 621
    return ret;
}

void SrsServer::remove(SrsConnection* conn)
{
    std::vector<SrsConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn);
    
622 623 624 625
    // removed by destroy, ignore.
    if (it == conns.end()) {
        srs_warn("server moved connection, ignore.");
        return;
626 627
    }
    
628 629
    conns.erase(it);
    
630 631
    srs_info("conn removed. conns=%d", (int)conns.size());
    
632 633 634
    // resample the resource of specified connection.
    resample_kbps(conn);
    
635 636 637 638 639 640 641 642 643 644 645 646
    // all connections are created by server,
    // so we free it here.
    srs_freep(conn);
}

void SrsServer::on_signal(int signo)
{
    if (signo == SIGNAL_RELOAD) {
        signal_reload = true;
        return;
    }
    
647
    if (signo == SIGINT || signo == SIGUSR2) {
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
#ifdef SRS_AUTO_GPERF_MC
        srs_trace("gmc is on, main cycle will terminate normally.");
        signal_gmc_stop = true;
#else
        srs_trace("user terminate program");
        exit(0);
#endif
        return;
    }
    
    if (signo == SIGTERM) {
        srs_trace("user terminate program");
        exit(0);
        return;
    }
}

int SrsServer::do_cycle()
{
    int ret = ERROR_SUCCESS;
668
    
winlin authored
669 670
    // find the max loop
    int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
671
    
672
#ifdef SRS_AUTO_STAT
winlin authored
673 674
    max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
    max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
675
    max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
winlin authored
676
    max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
677
    max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
678
    max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
679
    max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
680
#endif
winlin authored
681
    
682 683
    // the deamon thread, update the time cache
    while (true) {
684
        // the interval in config.
685
        int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
686 687 688 689 690
        
        // dynamic fetch the max.
        int __max = max;
        __max = srs_max(__max, heartbeat_max_resolution);
        
691
        for (int i = 0; i < __max; i++) {
winlin authored
692
            st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
693
        
694 695 696
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
697 698
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
699
#ifdef SRS_AUTO_GPERF_MC
winlin authored
700
            if (signal_gmc_stop) {
701
                srs_warn("gmc got singal to stop server.");
702
                return ret;
winlin authored
703
            }
704
#endif
705
        
winlin authored
706 707 708 709 710 711 712 713 714 715
            if (signal_reload) {
                signal_reload = false;
                srs_info("get signal reload, to reload the config.");
                
                if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
                    srs_error("reload config failed. ret=%d", ret);
                    return ret;
                }
                srs_trace("reload config success.");
            }
716
            
winlin authored
717
            // update the cache time or rusage.
winlin authored
718
            if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
719
                srs_info("update current time cache.");
winlin authored
720 721
                srs_update_system_time_ms();
            }
722
            
723
#ifdef SRS_AUTO_STAT
winlin authored
724
            if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
725
                srs_info("update resource info, rss.");
winlin authored
726 727
                srs_update_system_rusage();
            }
winlin authored
728
            if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
729
                srs_info("update cpu info, cpu usage.");
730
                srs_update_proc_stat();
731
            }
732 733 734 735
            if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
                srs_info("update disk info, disk iops.");
                srs_update_disk_stat();
            }
winlin authored
736
            if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
737
                srs_info("update memory info, usage/free.");
winlin authored
738 739
                srs_update_meminfo();
            }
740
            if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
741
                srs_info("update platform info, uptime/load.");
742 743
                srs_update_platform_info();
            }
744
            if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
745
                srs_info("update network devices info.");
746 747
                srs_update_network_devices();
            }
748 749 750
            if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
                srs_info("update network rtmp server info.");
                resample_kbps(NULL);
751
                srs_update_rtmp_server((int)conns.size(), kbps);
752
            }
753
    #ifdef SRS_AUTO_HTTP_PARSER
754 755
            if (_srs_config->get_heartbeat_enabled()) {
                if ((i % heartbeat_max_resolution) == 0) {
756
                    srs_info("do http heartbeat, for internal server to report.");
757 758 759
                    http_heartbeat->heartbeat();
                }
            }
760
    #endif
761
#endif
762
            srs_info("server main thread loop");
763 764
        }
    }
765
766
    return ret;
winlin authored
767 768
}
769 770 771 772 773
int SrsServer::listen_rtmp()
{
    int ret = ERROR_SUCCESS;
    
    // stream service port.
774 775
    std::vector<std::string> ports = _srs_config->get_listen();
    srs_assert((int)ports.size() > 0);
776 777 778
    
    close_listeners(SrsListenerRtmpStream);
    
779
    for (int i = 0; i < (int)ports.size(); i++) {
780 781 782
        SrsListener* listener = new SrsListener(this, SrsListenerRtmpStream);
        listeners.push_back(listener);
        
783
        int port = ::atoi(ports[i].c_str());
784 785 786 787 788 789 790 791 792 793 794 795 796
        if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
            srs_error("RTMP stream listen at port %d failed. ret=%d", port, ret);
            return ret;
        }
    }
    
    return ret;
}

int SrsServer::listen_http_api()
{
    int ret = ERROR_SUCCESS;
    
797
#ifdef SRS_AUTO_HTTP_API
798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
    close_listeners(SrsListenerHttpApi);
    if (_srs_config->get_http_api_enabled()) {
        SrsListener* listener = new SrsListener(this, SrsListenerHttpApi);
        listeners.push_back(listener);
        
        int port = _srs_config->get_http_api_listen();
        if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
            srs_error("HTTP api listen at port %d failed. ret=%d", port, ret);
            return ret;
        }
    }
#endif
    
    return ret;
}

int SrsServer::listen_http_stream()
{
    int ret = ERROR_SUCCESS;
    
818
#ifdef SRS_AUTO_HTTP_SERVER
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
    close_listeners(SrsListenerHttpStream);
    if (_srs_config->get_http_stream_enabled()) {
        SrsListener* listener = new SrsListener(this, SrsListenerHttpStream);
        listeners.push_back(listener);
        
        int port = _srs_config->get_http_stream_listen();
        if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
            srs_error("HTTP stream listen at port %d failed. ret=%d", port, ret);
            return ret;
        }
    }
#endif
    
    return ret;
}

void SrsServer::close_listeners(SrsListenerType type)
winlin authored
836
{
837
    std::vector<SrsListener*>::iterator it;
838
    for (it = listeners.begin(); it != listeners.end();) {
839
        SrsListener* listener = *it;
840 841 842 843 844 845
        
        if (listener->type() != type) {
            ++it;
            continue;
        }
        
846
        srs_freep(listener);
847
        it = listeners.erase(it);
848
    }
winlin authored
849 850
}
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
void SrsServer::resample_kbps(SrsConnection* conn, bool do_resample)
{
    // resample all when conn is NULL.
    if (!conn) {
        for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
            SrsConnection* client = *it;
            srs_assert(client);
            
            // only resample, do resample when all finished.
            resample_kbps(client, false);
        }
        
        kbps->sample();
        return;
    }
    
    // resample for connection.
    conn->kbps_resample();
    
    kbps->add_delta(conn);
    
winlin authored
872
    // resample for server.
873 874 875 876 877
    if (do_resample) {
        kbps->sample();
    }
}
winlin authored
878 879
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
{
880 881 882 883 884 885 886 887 888 889 890 891 892 893 894
    int ret = ERROR_SUCCESS;
    
    int max_connections = _srs_config->get_max_connections();
    if ((int)conns.size() >= max_connections) {
        int fd = st_netfd_fileno(client_stfd);
        
        srs_error("exceed the max connections, drop client: "
            "clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);
            
        srs_close_stfd(client_stfd);
        
        return ret;
    }
    
    SrsConnection* conn = NULL;
895 896
    if (type == SrsListenerRtmpStream) {
        conn = new SrsRtmpConn(this, client_stfd);
897
    } else if (type == SrsListenerHttpApi) {
898
#ifdef SRS_AUTO_HTTP_API
winlin authored
899
        conn = new SrsHttpApi(this, client_stfd, http_api_handler);
winlin authored
900 901 902 903 904
#else
        srs_warn("close http client for server not support http-api");
        srs_close_stfd(client_stfd);
        return ret;
#endif
905
    } else if (type == SrsListenerHttpStream) {
906
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
907
        conn = new SrsHttpConn(this, client_stfd, http_stream_handler);
908 909 910 911 912
#else
        srs_warn("close http client for server not support http-server");
        srs_close_stfd(client_stfd);
        return ret;
#endif
913
    } else {
914
        // TODO: FIXME: handler others
915 916 917 918 919
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conns.push_back(conn);
920
    srs_verbose("add conn to vector.");
921 922
    
    // cycle will start process thread and when finished remove the client.
923
    // @remark never use the conn, for it maybe destroyed.
924 925 926
    if ((ret = conn->start()) != ERROR_SUCCESS) {
        return ret;
    }
927
    srs_verbose("conn started success.");
928 929

    srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
930 931
    
    return ret;
winlin authored
932 933 934 935
}

int SrsServer::on_reload_listen()
{
936
    return listen();
winlin authored
937
}
winlin authored
938 939 940 941 942 943 944 945 946 947

int SrsServer::on_reload_pid()
{
    if (pid_fd > 0) {
        ::close(pid_fd);
        pid_fd = -1;
    }
    
    return acquire_pid_file();
}
winlin authored
948
winlin authored
949 950 951 952
int SrsServer::on_reload_vhost_added(std::string vhost)
{
    int ret = ERROR_SUCCESS;
    
953
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
954 955 956 957 958 959 960 961 962 963 964 965
    if (!_srs_config->get_vhost_http_enabled(vhost)) {
        return ret;
    }
    
    if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
        return ret;
    }
#endif

    return ret;
}
966
int SrsServer::on_reload_vhost_removed(std::string /*vhost*/)
winlin authored
967 968 969
{
    int ret = ERROR_SUCCESS;
    
970
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
971 972 973 974 975 976 977 978 979 980 981 982
    if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
        return ret;
    }
#endif

    return ret;
}

int SrsServer::on_reload_vhost_http_updated()
{
    int ret = ERROR_SUCCESS;
    
983
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
984 985 986 987 988 989 990 991 992 993 994
    srs_freep(http_stream_handler);
    http_stream_handler = SrsHttpHandler::create_http_stream();

    if ((ret = http_stream_handler->initialize()) != ERROR_SUCCESS) {
        return ret;
    }
#endif

    return ret;
}
winlin authored
995 996
int SrsServer::on_reload_http_api_enabled()
{
winlin authored
997 998
    int ret = ERROR_SUCCESS;
    
999
#ifdef SRS_AUTO_HTTP_API
winlin authored
1000 1001 1002 1003
    ret = listen_http_api();
#endif
    
    return ret;
winlin authored
1004 1005 1006 1007 1008 1009
}

int SrsServer::on_reload_http_api_disabled()
{
    int ret = ERROR_SUCCESS;
    
1010
#ifdef SRS_AUTO_HTTP_API
winlin authored
1011
    close_listeners(SrsListenerHttpApi);
winlin authored
1012 1013 1014 1015 1016 1017 1018 1019 1020
#endif
    
    return ret;
}

int SrsServer::on_reload_http_stream_enabled()
{
    int ret = ERROR_SUCCESS;
    
1021
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
    ret = listen_http_stream();
#endif

    return ret;
}

int SrsServer::on_reload_http_stream_disabled()
{
    int ret = ERROR_SUCCESS;
    
1032
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
    close_listeners(SrsListenerHttpStream);
#endif

    return ret;
}

int SrsServer::on_reload_http_stream_updated()
{
    int ret = ERROR_SUCCESS;
    
1043
#ifdef SRS_AUTO_HTTP_SERVER
winlin authored
1044 1045 1046 1047 1048 1049 1050 1051
    if ((ret = on_reload_http_stream_enabled()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
        return ret;
    }
#endif
winlin authored
1052 1053 1054
    
    return ret;
}
1055