winlin

Merge branch 'srs.master'

@@ -388,7 +388,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -388,7 +388,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
388 "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" 388 "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config"
389 "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" 389 "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks"
390 "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" 390 "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge"
391 - "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac") 391 + "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac"
  392 + "srs_app_poll")
392 APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh 393 APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh
393 APP_OBJS="${MODULE_OBJS[@]}" 394 APP_OBJS="${MODULE_OBJS[@]}"
394 fi 395 fi
@@ -1502,9 +1502,9 @@ int SrsConfig::check_config() @@ -1502,9 +1502,9 @@ int SrsConfig::check_config()
1502 "total=%d(max_connections=%d, nb_consumed_fds=%d), ret=%d. " 1502 "total=%d(max_connections=%d, nb_consumed_fds=%d), ret=%d. "
1503 "you can change max_connections from %d to %d, or " 1503 "you can change max_connections from %d to %d, or "
1504 "you can login as root and set the limit: ulimit -HSn %d", 1504 "you can login as root and set the limit: ulimit -HSn %d",
1505 - nb_connections, nb_total, max_open_files, 1505 + nb_connections, nb_total + 1, max_open_files,
1506 nb_total, nb_connections, nb_consumed_fds, 1506 nb_total, nb_connections, nb_consumed_fds,
1507 - ret, nb_connections, nb_canbe, nb_total); 1507 + ret, nb_connections, nb_canbe, nb_total + 1);
1508 return ret; 1508 return ret;
1509 } 1509 }
1510 } 1510 }
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2014 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_app_poll.hpp>
  25 +
  26 +#include <srs_kernel_error.hpp>
  27 +#include <srs_kernel_log.hpp>
  28 +
  29 +// the interval in us to refresh the poll for all fds.
  30 +// for performance refine, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  31 +#define SRS_POLL_CYCLE_INTERVAL 10 * 1000 * 1000
  32 +
  33 +SrsPoll::SrsPoll()
  34 +{
  35 + _pds = NULL;
  36 + pthread = new SrsThread(this, 0, false);
  37 +}
  38 +
  39 +SrsPoll::~SrsPoll()
  40 +{
  41 + srs_freep(_pds);
  42 + srs_freep(pthread);
  43 + fds.clear();
  44 +}
  45 +
  46 +int SrsPoll::start()
  47 +{
  48 + return pthread->start();
  49 +}
  50 +
  51 +int SrsPoll::cycle()
  52 +{
  53 + int ret = ERROR_SUCCESS;
  54 +
  55 + if (fds.size() == 0) {
  56 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  57 + return ret;
  58 + }
  59 +
  60 + int nb_pds = (int)fds.size();
  61 +
  62 + // TODO: FIXME: use more efficient way for the poll.
  63 + srs_freep(_pds);
  64 + _pds = new pollfd[nb_pds];
  65 +
  66 + if (true) {
  67 + int index = 0;
  68 +
  69 + std::map<int, SrsPollFD*>::iterator it;
  70 + for (it = fds.begin(); it != fds.end(); ++it) {
  71 + int fd = it->first;
  72 +
  73 + pollfd& pfd = _pds[index++];
  74 + pfd.fd = fd;
  75 + pfd.events = POLLIN;
  76 + pfd.revents = 0;
  77 + }
  78 +
  79 + srs_assert(index == (int)fds.size());
  80 + }
  81 +
  82 + // Upon successful completion, a non-negative value is returned.
  83 + // A positive value indicates the total number of OS file descriptors in pds that have events.
  84 + // A value of 0 indicates that the call timed out.
  85 + if (st_poll(_pds, nb_pds, SRS_POLL_CYCLE_INTERVAL) < 0) {
  86 + srs_warn("ignore st_poll failed, size=%d", nb_pds);
  87 + return ret;
  88 + }
  89 +
  90 + for (int i = 0; i < nb_pds; i++) {
  91 + if (!(_pds[i].revents & POLLIN)) {
  92 + continue;
  93 + }
  94 +
  95 + int fd = _pds[i].fd;
  96 + if (fds.find(fd) == fds.end()) {
  97 + continue;
  98 + }
  99 +
  100 + SrsPollFD* owner = fds[fd];
  101 + owner->set_active(true);
  102 + }
  103 +
  104 + return ret;
  105 +}
  106 +
  107 +int SrsPoll::add(st_netfd_t stfd, SrsPollFD* owner)
  108 +{
  109 + int ret = ERROR_SUCCESS;
  110 +
  111 + int fd = st_netfd_fileno(stfd);
  112 + if (fds.find(fd) != fds.end()) {
  113 + ret = ERROR_RTMP_POLL_FD_DUPLICATED;
  114 + srs_error("fd exists, fd=%d, ret=%d", fd, ret);
  115 + return ret;
  116 + }
  117 +
  118 + fds[fd] = owner;
  119 +
  120 + return ret;
  121 +}
  122 +
  123 +void SrsPoll::remove(st_netfd_t stfd, SrsPollFD* owner)
  124 +{
  125 + std::map<int, SrsPollFD*>::iterator it;
  126 +
  127 + int fd = st_netfd_fileno(stfd);
  128 + if ((it = fds.find(fd)) != fds.end()) {
  129 + fds.erase(it);
  130 + }
  131 +}
  132 +
  133 +SrsPoll* SrsPoll::_instance = new SrsPoll();
  134 +
  135 +SrsPoll* SrsPoll::instance()
  136 +{
  137 + return _instance;
  138 +}
  139 +
  140 +SrsPollFD::SrsPollFD()
  141 +{
  142 + _stfd = NULL;
  143 + _active = false;
  144 +}
  145 +
  146 +SrsPollFD::~SrsPollFD()
  147 +{
  148 + if (_stfd) {
  149 + SrsPoll* poll = SrsPoll::instance();
  150 + poll->remove(_stfd, this);
  151 + }
  152 +}
  153 +
  154 +int SrsPollFD::initialize(st_netfd_t stfd)
  155 +{
  156 + int ret = ERROR_SUCCESS;
  157 +
  158 + _stfd = stfd;
  159 +
  160 + SrsPoll* poll = SrsPoll::instance();
  161 + if ((ret = poll->add(stfd, this)) != ERROR_SUCCESS) {
  162 + srs_error("add fd to poll failed. ret=%d", ret);
  163 + return ret;
  164 + }
  165 +
  166 + return ret;
  167 +}
  168 +
  169 +bool SrsPollFD::active()
  170 +{
  171 + return _active;
  172 +}
  173 +
  174 +void SrsPollFD::set_active(bool v)
  175 +{
  176 + _active = v;
  177 +}
  178 +
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2014 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#ifndef SRS_APP_POLL_HPP
  25 +#define SRS_APP_POLL_HPP
  26 +
  27 +/*
  28 +#include <srs_app_poll.hpp>
  29 +*/
  30 +
  31 +#include <srs_core.hpp>
  32 +
  33 +#include <map>
  34 +
  35 +#include <srs_app_st.hpp>
  36 +#include <srs_app_thread.hpp>
  37 +
  38 +class SrsPollFD;
  39 +
  40 +/**
  41 +* the poll for all play clients to finger the active fd out.
  42 +* for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  43 +* the poll is shared by all SrsPollFD, and we start an isolate thread to finger the active fds.
  44 +*/
  45 +class SrsPoll : public ISrsThreadHandler
  46 +{
  47 +private:
  48 + SrsThread* pthread;
  49 + pollfd* _pds;
  50 + std::map<int, SrsPollFD*> fds;
  51 +public:
  52 + SrsPoll();
  53 + virtual ~SrsPoll();
  54 +public:
  55 + /**
  56 + * start the poll thread.
  57 + */
  58 + virtual int start();
  59 + /**
  60 + * start an cycle thread.
  61 + */
  62 + virtual int cycle();
  63 +public:
  64 + /**
  65 + * add the fd to poll.
  66 + */
  67 + virtual int add(st_netfd_t stfd, SrsPollFD* owner);
  68 + /**
  69 + * remove the fd to poll, ignore any error.
  70 + */
  71 + virtual void remove(st_netfd_t stfd, SrsPollFD* owner);
  72 +// singleton
  73 +private:
  74 + static SrsPoll* _instance;
  75 +public:
  76 + static SrsPoll* instance();
  77 +};
  78 +
  79 +/**
  80 +* the poll fd to check whether the specified fd is active.
  81 +*/
  82 +class SrsPollFD
  83 +{
  84 +private:
  85 + st_netfd_t _stfd;
  86 + // whether current fd is active.
  87 + bool _active;
  88 +public:
  89 + SrsPollFD();
  90 + virtual ~SrsPollFD();
  91 +public:
  92 + /**
  93 + * initialize the poll.
  94 + * @param stfd the fd to poll.
  95 + */
  96 + virtual int initialize(st_netfd_t stfd);
  97 + /**
  98 + * whether fd is active.
  99 + */
  100 + virtual bool active();
  101 + /**
  102 + * the poll will set to fd active when got data to read,
  103 + * the connection will set to deactive when data read.
  104 + */
  105 + virtual void set_active(bool v);
  106 +};
  107 +
  108 +#endif
  109 +
@@ -48,6 +48,7 @@ using namespace std; @@ -48,6 +48,7 @@ using namespace std;
48 #include <srs_app_utility.hpp> 48 #include <srs_app_utility.hpp>
49 #include <srs_protocol_msg_array.hpp> 49 #include <srs_protocol_msg_array.hpp>
50 #include <srs_protocol_amf0.hpp> 50 #include <srs_protocol_amf0.hpp>
  51 +#include <srs_app_poll.hpp>
51 52
52 // when stream is busy, for example, streaming is already 53 // when stream is busy, for example, streaming is already
53 // publishing, when a new client to request to publish, 54 // publishing, when a new client to request to publish,
@@ -516,12 +517,20 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -516,12 +517,20 @@ int SrsRtmpConn::playing(SrsSource* source)
516 SrsAutoFree(SrsConsumer, consumer); 517 SrsAutoFree(SrsConsumer, consumer);
517 srs_verbose("consumer created success."); 518 srs_verbose("consumer created success.");
518 519
519 - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 520 + // use poll fd to manage the connection, read when active.
  521 + SrsPollFD poll_fd;
  522 + if ((ret = poll_fd.initialize(stfd)) != ERROR_SUCCESS) {
  523 + return ret;
  524 + }
520 525
521 - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); 526 + // TODO: FIXME: remove following.
  527 + //rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  528 + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
  529 + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT);
522 530
  531 + // initialize other components
  532 + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
523 SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); 533 SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);
524 -  
525 bool user_specified_duration_to_stop = (req->duration > 0); 534 bool user_specified_duration_to_stop = (req->duration > 0);
526 int64_t starttime = -1; 535 int64_t starttime = -1;
527 536
@@ -530,7 +539,9 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -530,7 +539,9 @@ int SrsRtmpConn::playing(SrsSource* source)
530 pithy_print.elapse(); 539 pithy_print.elapse();
531 540
532 // read from client. 541 // read from client.
533 - if (true) { 542 + if (poll_fd.active()) {
  543 + poll_fd.set_active(false);
  544 +
534 SrsMessage* msg = NULL; 545 SrsMessage* msg = NULL;
535 ret = rtmp->recv_message(&msg); 546 ret = rtmp->recv_message(&msg);
536 srs_verbose("play loop recv message. ret=%d", ret); 547 srs_verbose("play loop recv message. ret=%d", ret);
@@ -560,6 +571,13 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -560,6 +571,13 @@ int SrsRtmpConn::playing(SrsSource* source)
560 return ret; 571 return ret;
561 } 572 }
562 573
  574 + // no data, sleep a while.
  575 + // for the poll_fd maybe not active, and no message.
  576 + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  577 + if (count <= 0) {
  578 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  579 + }
  580 +
563 // reportable 581 // reportable
564 if (pithy_print.can_print()) { 582 if (pithy_print.can_print()) {
565 kbps->sample(); 583 kbps->sample();
@@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
44 #include <srs_app_source.hpp> 44 #include <srs_app_source.hpp>
45 #include <srs_app_utility.hpp> 45 #include <srs_app_utility.hpp>
46 #include <srs_app_heartbeat.hpp> 46 #include <srs_app_heartbeat.hpp>
  47 +#include <srs_app_poll.hpp>
47 48
48 // signal defines. 49 // signal defines.
49 #define SIGNAL_RELOAD SIGHUP 50 #define SIGNAL_RELOAD SIGHUP
@@ -664,6 +665,14 @@ int SrsServer::do_cycle() @@ -664,6 +665,14 @@ int SrsServer::do_cycle()
664 { 665 {
665 int ret = ERROR_SUCCESS; 666 int ret = ERROR_SUCCESS;
666 667
  668 + // start the poll for play clients.
  669 + // performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  670 + SrsPoll* poll = SrsPoll::instance();
  671 + if ((ret = poll->start()) != ERROR_SUCCESS) {
  672 + srs_error("start poll failed. ret=%d", ret);
  673 + return ret;
  674 + }
  675 +
667 // find the max loop 676 // find the max loop
668 int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES); 677 int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
669 678
@@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
185 #define ERROR_EDGE_VHOST_REMOVED 3039 185 #define ERROR_EDGE_VHOST_REMOVED 3039
186 #define ERROR_HLS_AVC_TRY_OTHERS 3040 186 #define ERROR_HLS_AVC_TRY_OTHERS 3040
187 #define ERROR_H264_API_NO_PREFIXED 3041 187 #define ERROR_H264_API_NO_PREFIXED 3041
  188 +#define ERROR_RTMP_POLL_FD_DUPLICATED 3042
188 189
189 /** 190 /**
190 * whether the error code is an system control error. 191 * whether the error code is an system control error.
@@ -92,6 +92,8 @@ file @@ -92,6 +92,8 @@ file
92 ..\app\srs_app_kbps.cpp, 92 ..\app\srs_app_kbps.cpp,
93 ..\app\srs_app_log.hpp, 93 ..\app\srs_app_log.hpp,
94 ..\app\srs_app_log.cpp, 94 ..\app\srs_app_log.cpp,
  95 + ..\app\srs_app_poll.hpp,
  96 + ..\app\srs_app_poll.cpp,
95 ..\app\srs_app_refer.hpp, 97 ..\app\srs_app_refer.hpp,
96 ..\app\srs_app_refer.cpp, 98 ..\app\srs_app_refer.cpp,
97 ..\app\srs_app_reload.hpp, 99 ..\app\srs_app_reload.hpp,