winlin

for bug #194, add fds poll, just sleep to send without recv.

@@ -24,14 +24,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,14 +24,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_app_poll.hpp> 24 #include <srs_app_poll.hpp>
25 25
26 #include <srs_kernel_error.hpp> 26 #include <srs_kernel_error.hpp>
  27 +#include <srs_kernel_log.hpp>
27 28
28 SrsPoll::SrsPoll() 29 SrsPoll::SrsPoll()
29 { 30 {
  31 + _pds = NULL;
30 pthread = new SrsThread(this, 0, false); 32 pthread = new SrsThread(this, 0, false);
31 } 33 }
32 34
33 SrsPoll::~SrsPoll() 35 SrsPoll::~SrsPoll()
34 { 36 {
  37 + srs_freep(_pds);
35 srs_freep(pthread); 38 srs_freep(pthread);
36 fds.clear(); 39 fds.clear();
37 } 40 }
@@ -44,10 +47,84 @@ int SrsPoll::start() @@ -44,10 +47,84 @@ int SrsPoll::start()
44 int SrsPoll::cycle() 47 int SrsPoll::cycle()
45 { 48 {
46 int ret = ERROR_SUCCESS; 49 int ret = ERROR_SUCCESS;
47 - // TODO: FIXME: implements it. 50 +
  51 + if (fds.size() == 0) {
  52 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  53 + return ret;
  54 + }
  55 +
  56 + int nb_pds = (int)fds.size();
  57 +
  58 +st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  59 +return ret;
  60 +
  61 + srs_freep(_pds);
  62 + _pds = new pollfd[nb_pds];
  63 +
  64 + if (true) {
  65 + int index = 0;
  66 +
  67 + std::map<int, SrsPollFD*>::iterator it;
  68 + for (it = fds.begin(); it != fds.end(); ++it) {
  69 + int fd = it->first;
  70 +
  71 + pollfd& pfd = _pds[index++];
  72 + pfd.fd = fd;
  73 + pfd.events = POLLIN;
  74 + pfd.revents = 0;
  75 + }
  76 +
  77 + srs_assert(index == (int)fds.size());
  78 + }
  79 +
  80 + if (st_poll(_pds, nb_pds, ST_UTIME_NO_TIMEOUT) <= 0) {
  81 + srs_warn("ignore st_poll failed, size=%d", nb_pds);
  82 + return ret;
  83 + }
  84 +
  85 + for (int i = 0; i < nb_pds; i++) {
  86 + if (!(_pds[i].revents & POLLIN)) {
  87 + continue;
  88 + }
  89 +
  90 + int fd = _pds[i].fd;
  91 + if (fds.find(fd) == fds.end()) {
  92 + continue;
  93 + }
  94 +
  95 + SrsPollFD* owner = fds[fd];
  96 + owner->set_active(true);
  97 + }
  98 +
  99 + return ret;
  100 +}
  101 +
  102 +int SrsPoll::add(st_netfd_t stfd, SrsPollFD* owner)
  103 +{
  104 + int ret = ERROR_SUCCESS;
  105 +
  106 + int fd = st_netfd_fileno(stfd);
  107 + if (fds.find(fd) != fds.end()) {
  108 + ret = ERROR_RTMP_POLL_FD_DUPLICATED;
  109 + srs_error("fd exists, fd=%d, ret=%d", fd, ret);
  110 + return ret;
  111 + }
  112 +
  113 + fds[fd] = owner;
  114 +
48 return ret; 115 return ret;
49 } 116 }
50 117
  118 +void SrsPoll::remove(st_netfd_t stfd, SrsPollFD* owner)
  119 +{
  120 + std::map<int, SrsPollFD*>::iterator it;
  121 +
  122 + int fd = st_netfd_fileno(stfd);
  123 + if ((it = fds.find(fd)) != fds.end()) {
  124 + fds.erase(it);
  125 + }
  126 +}
  127 +
51 SrsPoll* SrsPoll::_instance = new SrsPoll(); 128 SrsPoll* SrsPoll::_instance = new SrsPoll();
52 129
53 SrsPoll* SrsPoll::instance() 130 SrsPoll* SrsPoll::instance()
@@ -58,10 +135,15 @@ SrsPoll* SrsPoll::instance() @@ -58,10 +135,15 @@ SrsPoll* SrsPoll::instance()
58 SrsPollFD::SrsPollFD() 135 SrsPollFD::SrsPollFD()
59 { 136 {
60 _stfd = NULL; 137 _stfd = NULL;
  138 + _active = false;
61 } 139 }
62 140
63 SrsPollFD::~SrsPollFD() 141 SrsPollFD::~SrsPollFD()
64 { 142 {
  143 + if (_stfd) {
  144 + SrsPoll* poll = SrsPoll::instance();
  145 + poll->remove(_stfd, this);
  146 + }
65 } 147 }
66 148
67 int SrsPollFD::initialize(st_netfd_t stfd) 149 int SrsPollFD::initialize(st_netfd_t stfd)
@@ -70,6 +152,22 @@ int SrsPollFD::initialize(st_netfd_t stfd) @@ -70,6 +152,22 @@ int SrsPollFD::initialize(st_netfd_t stfd)
70 152
71 _stfd = stfd; 153 _stfd = stfd;
72 154
  155 + SrsPoll* poll = SrsPoll::instance();
  156 + if ((ret = poll->add(stfd, this)) != ERROR_SUCCESS) {
  157 + srs_error("add fd to poll failed. ret=%d", ret);
  158 + return ret;
  159 + }
  160 +
73 return ret; 161 return ret;
74 } 162 }
75 163
  164 +bool SrsPollFD::active()
  165 +{
  166 + return _active;
  167 +}
  168 +
  169 +void SrsPollFD::set_active(bool v)
  170 +{
  171 + _active = v;
  172 +}
  173 +
@@ -46,7 +46,8 @@ class SrsPoll : public ISrsThreadHandler @@ -46,7 +46,8 @@ class SrsPoll : public ISrsThreadHandler
46 { 46 {
47 private: 47 private:
48 SrsThread* pthread; 48 SrsThread* pthread;
49 - std::map<st_netfd_t, SrsPollFD*> fds; 49 + pollfd* _pds;
  50 + std::map<int, SrsPollFD*> fds;
50 public: 51 public:
51 SrsPoll(); 52 SrsPoll();
52 virtual ~SrsPoll(); 53 virtual ~SrsPoll();
@@ -59,6 +60,15 @@ public: @@ -59,6 +60,15 @@ public:
59 * start an cycle thread. 60 * start an cycle thread.
60 */ 61 */
61 virtual int cycle(); 62 virtual int cycle();
  63 +public:
  64 + /**
  65 + * add the fd to poll.
  66 + */
  67 + virtual int add(st_netfd_t stfd, SrsPollFD* owner);
  68 + /**
  69 + * remove the fd to poll, ignore any error.
  70 + */
  71 + virtual void remove(st_netfd_t stfd, SrsPollFD* owner);
62 // singleton 72 // singleton
63 private: 73 private:
64 static SrsPoll* _instance; 74 static SrsPoll* _instance;
@@ -73,6 +83,8 @@ class SrsPollFD @@ -73,6 +83,8 @@ class SrsPollFD
73 { 83 {
74 private: 84 private:
75 st_netfd_t _stfd; 85 st_netfd_t _stfd;
  86 + // whether current fd is active.
  87 + bool _active;
76 public: 88 public:
77 SrsPollFD(); 89 SrsPollFD();
78 virtual ~SrsPollFD(); 90 virtual ~SrsPollFD();
@@ -82,6 +94,15 @@ public: @@ -82,6 +94,15 @@ public:
82 * @param stfd the fd to poll. 94 * @param stfd the fd to poll.
83 */ 95 */
84 virtual int initialize(st_netfd_t stfd); 96 virtual int initialize(st_netfd_t stfd);
  97 + /**
  98 + * whether fd is active.
  99 + */
  100 + virtual bool active();
  101 + /**
  102 + * the poll will set to fd active when got data to read,
  103 + * the connection will set to deactive when data read.
  104 + */
  105 + virtual void set_active(bool v);
85 }; 106 };
86 107
87 #endif 108 #endif
@@ -517,26 +517,31 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -517,26 +517,31 @@ int SrsRtmpConn::playing(SrsSource* source)
517 SrsAutoFree(SrsConsumer, consumer); 517 SrsAutoFree(SrsConsumer, consumer);
518 srs_verbose("consumer created success."); 518 srs_verbose("consumer created success.");
519 519
520 - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 520 + // use poll fd to manage the connection, read when active.
  521 + SrsPollFD poll_fd;
  522 + if ((ret = poll_fd.initialize(stfd)) != ERROR_SUCCESS) {
  523 + return ret;
  524 + }
521 525
522 - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); 526 + // TODO: FIXME: remove following.
  527 + //rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  528 + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
  529 + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT);
523 530
  531 + // initialize other components
  532 + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
524 SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); 533 SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);
525 -  
526 bool user_specified_duration_to_stop = (req->duration > 0); 534 bool user_specified_duration_to_stop = (req->duration > 0);
527 int64_t starttime = -1; 535 int64_t starttime = -1;
528 536
529 - SrsPollFD poll;  
530 - if ((ret = poll.initialize(stfd)) != ERROR_SUCCESS) {  
531 - return ret;  
532 - }  
533 -  
534 while (true) { 537 while (true) {
535 // collect elapse for pithy print. 538 // collect elapse for pithy print.
536 pithy_print.elapse(); 539 pithy_print.elapse();
537 540
538 // read from client. 541 // read from client.
539 - if (true) { 542 + if (poll_fd.active()) {
  543 + poll_fd.set_active(false);
  544 +
540 SrsMessage* msg = NULL; 545 SrsMessage* msg = NULL;
541 ret = rtmp->recv_message(&msg); 546 ret = rtmp->recv_message(&msg);
542 srs_verbose("play loop recv message. ret=%d", ret); 547 srs_verbose("play loop recv message. ret=%d", ret);
@@ -565,6 +570,13 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -565,6 +570,13 @@ int SrsRtmpConn::playing(SrsSource* source)
565 srs_error("get messages from consumer failed. ret=%d", ret); 570 srs_error("get messages from consumer failed. ret=%d", ret);
566 return ret; 571 return ret;
567 } 572 }
  573 +
  574 + // no data, sleep a while.
  575 + // for the poll_fd maybe not active, and no message.
  576 + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  577 + if (count <= 0) {
  578 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  579 + }
568 580
569 // reportable 581 // reportable
570 if (pithy_print.can_print()) { 582 if (pithy_print.can_print()) {
@@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
185 #define ERROR_EDGE_VHOST_REMOVED 3039 185 #define ERROR_EDGE_VHOST_REMOVED 3039
186 #define ERROR_HLS_AVC_TRY_OTHERS 3040 186 #define ERROR_HLS_AVC_TRY_OTHERS 3040
187 #define ERROR_H264_API_NO_PREFIXED 3041 187 #define ERROR_H264_API_NO_PREFIXED 3041
  188 +#define ERROR_RTMP_POLL_FD_DUPLICATED 3042
188 189
189 /** 190 /**
190 * whether the error code is an system control error. 191 * whether the error code is an system control error.