winlin

for bug #237, extract a queue recv thread.

@@ -26,69 +26,45 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,69 +26,45 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #include <srs_protocol_rtmp.hpp> 26 #include <srs_protocol_rtmp.hpp>
27 #include <srs_protocol_stack.hpp> 27 #include <srs_protocol_stack.hpp>
28 28
29 -SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk)  
30 -{  
31 - rtmp = rtmp_sdk;  
32 - trd = new SrsThread(this, 0, true);  
33 -}  
34 -  
35 -SrsQueueRecvThread::~SrsQueueRecvThread() 29 +ISrsMessageHandler::ISrsMessageHandler()
36 { 30 {
37 - // stop recv thread.  
38 - stop();  
39 -  
40 - // destroy the thread.  
41 - srs_freep(trd);  
42 -  
43 - // clear all messages.  
44 - std::vector<SrsMessage*>::iterator it;  
45 - for (it = queue.begin(); it != queue.end(); ++it) {  
46 - SrsMessage* msg = *it;  
47 - srs_freep(msg);  
48 - }  
49 - queue.clear();  
50 } 31 }
51 32
52 -bool SrsQueueRecvThread::empty() 33 +ISrsMessageHandler::~ISrsMessageHandler()
53 { 34 {
54 - return queue.empty();  
55 } 35 }
56 36
57 -int SrsQueueRecvThread::size() 37 +SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk)
58 { 38 {
59 - return (int)queue.size(); 39 + handler = msg_handler;
  40 + rtmp = rtmp_sdk;
  41 + trd = new SrsThread(this, 0, true);
60 } 42 }
61 43
62 -SrsMessage* SrsQueueRecvThread::pump() 44 +SrsRecvThread::~SrsRecvThread()
63 { 45 {
64 - srs_assert(!queue.empty());  
65 -  
66 - SrsMessage* msg = *queue.begin();  
67 -  
68 - queue.erase(queue.begin()); 46 + // stop recv thread.
  47 + stop();
69 48
70 - return msg; 49 + // destroy the thread.
  50 + srs_freep(trd);
71 } 51 }
72 52
73 -int SrsQueueRecvThread::start() 53 +int SrsRecvThread::start()
74 { 54 {
75 return trd->start(); 55 return trd->start();
76 } 56 }
77 57
78 -void SrsQueueRecvThread::stop() 58 +void SrsRecvThread::stop()
79 { 59 {
80 trd->stop(); 60 trd->stop();
81 } 61 }
82 62
83 -int SrsQueueRecvThread::cycle() 63 +int SrsRecvThread::cycle()
84 { 64 {
85 int ret = ERROR_SUCCESS; 65 int ret = ERROR_SUCCESS;
86 66
87 - // we only recv one message and then process it,  
88 - // for the message may cause the thread to stop,  
89 - // when stop, the thread is freed, so the messages  
90 - // are dropped.  
91 - if (!queue.empty()) { 67 + if (!handler->can_handle()) {
92 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 68 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
93 return ret; 69 return ret;
94 } 70 }
@@ -107,14 +83,12 @@ int SrsQueueRecvThread::cycle() @@ -107,14 +83,12 @@ int SrsQueueRecvThread::cycle()
107 } 83 }
108 srs_verbose("play loop recv message. ret=%d", ret); 84 srs_verbose("play loop recv message. ret=%d", ret);
109 85
110 - // put into queue, the send thread will get and process it,  
111 - // @see SrsRtmpConn::process_play_control_msg  
112 - queue.push_back(msg); 86 + handler->handle(msg);
113 87
114 return ret; 88 return ret;
115 } 89 }
116 90
117 -void SrsQueueRecvThread::on_thread_start() 91 +void SrsRecvThread::on_thread_start()
118 { 92 {
119 // the multiple messages writev improve performance large, 93 // the multiple messages writev improve performance large,
120 // but the timeout recv will cause 33% sys call performance, 94 // but the timeout recv will cause 33% sys call performance,
@@ -128,7 +102,7 @@ void SrsQueueRecvThread::on_thread_start() @@ -128,7 +102,7 @@ void SrsQueueRecvThread::on_thread_start()
128 rtmp->set_auto_response(false); 102 rtmp->set_auto_response(false);
129 } 103 }
130 104
131 -void SrsQueueRecvThread::on_thread_stop() 105 +void SrsRecvThread::on_thread_stop()
132 { 106 {
133 // enable the protocol auto response, 107 // enable the protocol auto response,
134 // for the isolate recv thread terminated. 108 // for the isolate recv thread terminated.
@@ -138,3 +112,59 @@ void SrsQueueRecvThread::on_thread_stop() @@ -138,3 +112,59 @@ void SrsQueueRecvThread::on_thread_stop()
138 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 112 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
139 } 113 }
140 114
  115 +SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk)
  116 + : SrsRecvThread(this, rtmp_sdk)
  117 +{
  118 +}
  119 +
  120 +SrsQueueRecvThread::~SrsQueueRecvThread()
  121 +{
  122 + stop();
  123 +
  124 + // clear all messages.
  125 + std::vector<SrsMessage*>::iterator it;
  126 + for (it = queue.begin(); it != queue.end(); ++it) {
  127 + SrsMessage* msg = *it;
  128 + srs_freep(msg);
  129 + }
  130 + queue.clear();
  131 +}
  132 +
  133 +bool SrsQueueRecvThread::empty()
  134 +{
  135 + return queue.empty();
  136 +}
  137 +
  138 +int SrsQueueRecvThread::size()
  139 +{
  140 + return (int)queue.size();
  141 +}
  142 +
  143 +SrsMessage* SrsQueueRecvThread::pump()
  144 +{
  145 + srs_assert(!queue.empty());
  146 +
  147 + SrsMessage* msg = *queue.begin();
  148 +
  149 + queue.erase(queue.begin());
  150 +
  151 + return msg;
  152 +}
  153 +
  154 +bool SrsQueueRecvThread::can_handle()
  155 +{
  156 + // we only recv one message and then process it,
  157 + // for the message may cause the thread to stop,
  158 + // when stop, the thread is freed, so the messages
  159 + // are dropped.
  160 + return empty();
  161 +}
  162 +
  163 +int SrsQueueRecvThread::handle(SrsMessage* msg)
  164 +{
  165 + // put into queue, the send thread will get and process it,
  166 + // @see SrsRtmpConn::process_play_control_msg
  167 + queue.push_back(msg);
  168 +
  169 + return ERROR_SUCCESS;
  170 +}
@@ -38,16 +38,57 @@ class SrsRtmpServer; @@ -38,16 +38,57 @@ class SrsRtmpServer;
38 class SrsMessage; 38 class SrsMessage;
39 39
40 /** 40 /**
  41 + * for the recv thread to handle the message.
  42 + */
  43 +class ISrsMessageHandler
  44 +{
  45 +public:
  46 + ISrsMessageHandler();
  47 + virtual ~ISrsMessageHandler();
  48 +public:
  49 + /**
  50 + * whether the handler can handle,
  51 + * for example, when queue recv handler got an message,
  52 + * it wait the user to process it, then the recv thread
  53 + * never recv message util the handler is ok.
  54 + */
  55 + virtual bool can_handle() = 0;
  56 + /**
  57 + * process the received message.
  58 + */
  59 + virtual int handle(SrsMessage* msg) = 0;
  60 +};
  61 +
  62 +/**
  63 + * the recv thread, use message handler to handle each received message.
  64 + */
  65 +class SrsRecvThread : public ISrsThreadHandler
  66 +{
  67 +protected:
  68 + SrsThread* trd;
  69 + ISrsMessageHandler* handler;
  70 + SrsRtmpServer* rtmp;
  71 +public:
  72 + SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk);
  73 + virtual ~SrsRecvThread();
  74 +public:
  75 + virtual int start();
  76 + virtual void stop();
  77 + virtual int cycle();
  78 +public:
  79 + virtual void on_thread_start();
  80 + virtual void on_thread_stop();
  81 +};
  82 +
  83 +/**
41 * the recv thread used to replace the timeout recv, 84 * the recv thread used to replace the timeout recv,
42 * which hurt performance for the epoll_ctrl is frequently used. 85 * which hurt performance for the epoll_ctrl is frequently used.
43 * @see: SrsRtmpConn::playing 86 * @see: SrsRtmpConn::playing
44 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 87 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
45 */ 88 */
46 -class SrsQueueRecvThread : public ISrsThreadHandler 89 +class SrsQueueRecvThread : virtual public ISrsMessageHandler, virtual public SrsRecvThread
47 { 90 {
48 private: 91 private:
49 - SrsThread* trd;  
50 - SrsRtmpServer* rtmp;  
51 std::vector<SrsMessage*> queue; 92 std::vector<SrsMessage*> queue;
52 public: 93 public:
53 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk); 94 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk);
@@ -57,12 +98,8 @@ public: @@ -57,12 +98,8 @@ public:
57 virtual int size(); 98 virtual int size();
58 virtual SrsMessage* pump(); 99 virtual SrsMessage* pump();
59 public: 100 public:
60 - virtual int start();  
61 - virtual void stop();  
62 - virtual int cycle();  
63 -public:  
64 - virtual void on_thread_start();  
65 - virtual void on_thread_stop(); 101 + virtual bool can_handle();
  102 + virtual int handle(SrsMessage* msg);
66 }; 103 };
67 104
68 #endif 105 #endif