winlin

for #742, refine code for recv thread.

@@ -37,19 +37,27 @@ using namespace std; @@ -37,19 +37,27 @@ using namespace std;
37 // the max small bytes to group 37 // the max small bytes to group
38 #define SRS_MR_SMALL_BYTES 4096 38 #define SRS_MR_SMALL_BYTES 4096
39 39
40 -ISrsMessageHandler::ISrsMessageHandler() 40 +ISrsMessageConsumer::ISrsMessageConsumer()
41 { 41 {
42 } 42 }
43 43
44 -ISrsMessageHandler::~ISrsMessageHandler() 44 +ISrsMessageConsumer::~ISrsMessageConsumer()
45 { 45 {
46 } 46 }
47 47
48 -SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm) 48 +ISrsMessagePumper::ISrsMessagePumper()
49 { 49 {
  50 +}
  51 +
  52 +ISrsMessagePumper::~ISrsMessagePumper()
  53 +{
  54 +}
  55 +
  56 +SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
  57 +{
  58 + rtmp = r;
  59 + pumper = p;
50 timeout = tm; 60 timeout = tm;
51 - handler = msg_handler;  
52 - rtmp = rtmp_sdk;  
53 trd = new SrsReusableThread2("recv", this); 61 trd = new SrsReusableThread2("recv", this);
54 } 62 }
55 63
@@ -87,29 +95,29 @@ int SrsRecvThread::cycle() @@ -87,29 +95,29 @@ int SrsRecvThread::cycle()
87 int ret = ERROR_SUCCESS; 95 int ret = ERROR_SUCCESS;
88 96
89 while (!trd->interrupted()) { 97 while (!trd->interrupted()) {
90 - if (!handler->can_handle()) { 98 + // When the pumper is interrupted, wait then retry.
  99 + if (pumper->interrupted()) {
91 st_usleep(timeout * 1000); 100 st_usleep(timeout * 1000);
92 continue; 101 continue;
93 } 102 }
94 103
95 SrsCommonMessage* msg = NULL; 104 SrsCommonMessage* msg = NULL;
96 105
97 - // recv and handle message  
98 - ret = rtmp->recv_message(&msg);  
99 - if (ret == ERROR_SUCCESS) {  
100 - ret = handler->handle(msg); 106 + // Process the received message.
  107 + if ((ret = rtmp->recv_message(&msg)) == ERROR_SUCCESS) {
  108 + ret = pumper->consume(msg);
101 } 109 }
102 110
103 if (ret != ERROR_SUCCESS) { 111 if (ret != ERROR_SUCCESS) {
104 if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { 112 if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
105 - srs_error("thread process message failed. ret=%d", ret); 113 + srs_error("recv thread error. ret=%d", ret);
106 } 114 }
107 115
108 - // we use no timeout to recv, should never got any error. 116 + // Interrupt the receive thread for any error.
109 trd->interrupt(); 117 trd->interrupt();
110 118
111 - // notice the handler got a recv error.  
112 - handler->on_recv_error(ret); 119 + // Notify the pumper to quit for error.
  120 + pumper->interrupt(ret);
113 121
114 return ret; 122 return ret;
115 } 123 }
@@ -128,7 +136,7 @@ void SrsRecvThread::on_thread_start() @@ -128,7 +136,7 @@ void SrsRecvThread::on_thread_start()
128 // @see: https://github.com/ossrs/srs/issues/217 136 // @see: https://github.com/ossrs/srs/issues/217
129 rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS); 137 rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);
130 138
131 - handler->on_thread_start(); 139 + pumper->on_start();
132 } 140 }
133 141
134 void SrsRecvThread::on_thread_stop() 142 void SrsRecvThread::on_thread_stop()
@@ -136,7 +144,7 @@ void SrsRecvThread::on_thread_stop() @@ -136,7 +144,7 @@ void SrsRecvThread::on_thread_stop()
136 // reset the timeout to pulse mode. 144 // reset the timeout to pulse mode.
137 rtmp->set_recv_timeout(timeout * 1000); 145 rtmp->set_recv_timeout(timeout * 1000);
138 146
139 - handler->on_thread_stop(); 147 + pumper->on_stop();
140 } 148 }
141 149
142 SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms) 150 SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
@@ -196,16 +204,7 @@ int SrsQueueRecvThread::error_code() @@ -196,16 +204,7 @@ int SrsQueueRecvThread::error_code()
196 return recv_error_code; 204 return recv_error_code;
197 } 205 }
198 206
199 -bool SrsQueueRecvThread::can_handle()  
200 -{  
201 - // we only recv one message and then process it,  
202 - // for the message may cause the thread to stop,  
203 - // when stop, the thread is freed, so the messages  
204 - // are dropped.  
205 - return empty();  
206 -}  
207 -  
208 -int SrsQueueRecvThread::handle(SrsCommonMessage* msg) 207 +int SrsQueueRecvThread::consume(SrsCommonMessage* msg)
209 { 208 {
210 // put into queue, the send thread will get and process it, 209 // put into queue, the send thread will get and process it,
211 // @see SrsRtmpConn::process_play_control_msg 210 // @see SrsRtmpConn::process_play_control_msg
@@ -218,9 +217,19 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) @@ -218,9 +217,19 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
218 return ERROR_SUCCESS; 217 return ERROR_SUCCESS;
219 } 218 }
220 219
221 -void SrsQueueRecvThread::on_recv_error(int ret) 220 +bool SrsQueueRecvThread::interrupted()
  221 +{
  222 + // we only recv one message and then process it,
  223 + // for the message may cause the thread to stop,
  224 + // when stop, the thread is freed, so the messages
  225 + // are dropped.
  226 + return !empty();
  227 +}
  228 +
  229 +void SrsQueueRecvThread::interrupt(int ret)
222 { 230 {
223 recv_error_code = ret; 231 recv_error_code = ret;
  232 +
224 #ifdef SRS_PERF_QUEUE_COND_WAIT 233 #ifdef SRS_PERF_QUEUE_COND_WAIT
225 if (_consumer) { 234 if (_consumer) {
226 _consumer->wakeup(); 235 _consumer->wakeup();
@@ -228,14 +237,14 @@ void SrsQueueRecvThread::on_recv_error(int ret) @@ -228,14 +237,14 @@ void SrsQueueRecvThread::on_recv_error(int ret)
228 #endif 237 #endif
229 } 238 }
230 239
231 -void SrsQueueRecvThread::on_thread_start() 240 +void SrsQueueRecvThread::on_start()
232 { 241 {
233 // disable the protocol auto response, 242 // disable the protocol auto response,
234 // for the isolate recv thread should never send any messages. 243 // for the isolate recv thread should never send any messages.
235 rtmp->set_auto_response(false); 244 rtmp->set_auto_response(false);
236 } 245 }
237 246
238 -void SrsQueueRecvThread::on_thread_stop() 247 +void SrsQueueRecvThread::on_stop()
239 { 248 {
240 // enable the protocol auto response, 249 // enable the protocol auto response,
241 // for the isolate recv thread terminated. 250 // for the isolate recv thread terminated.
@@ -325,7 +334,48 @@ void SrsPublishRecvThread::stop() @@ -325,7 +334,48 @@ void SrsPublishRecvThread::stop()
325 trd.stop(); 334 trd.stop();
326 } 335 }
327 336
328 -void SrsPublishRecvThread::on_thread_start() 337 +int SrsPublishRecvThread::consume(SrsCommonMessage* msg)
  338 +{
  339 + int ret = ERROR_SUCCESS;
  340 +
  341 + // when cid changed, change it.
  342 + if (ncid != cid) {
  343 + _srs_context->set_id(ncid);
  344 + cid = ncid;
  345 + }
  346 +
  347 + _nb_msgs++;
  348 +
  349 + // log to show the time of recv thread.
  350 + srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
  351 + srs_update_system_time_ms(), msg->header.timestamp, msg->size);
  352 +
  353 + // the rtmp connection will handle this message
  354 + ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
  355 +
  356 + // must always free it,
  357 + // the source will copy it if need to use.
  358 + srs_freep(msg);
  359 +
  360 + return ret;
  361 +}
  362 +
  363 +bool SrsPublishRecvThread::interrupted()
  364 +{
  365 + // Never interrupted, always can handle message.
  366 + return false;
  367 +}
  368 +
  369 +void SrsPublishRecvThread::interrupt(int ret)
  370 +{
  371 + recv_error_code = ret;
  372 +
  373 + // when recv thread error, signal the conn thread to process it.
  374 + // @see https://github.com/ossrs/srs/issues/244
  375 + st_cond_signal(error);
  376 +}
  377 +
  378 +void SrsPublishRecvThread::on_start()
329 { 379 {
330 // we donot set the auto response to false, 380 // we donot set the auto response to false,
331 // for the main thread never send message. 381 // for the main thread never send message.
@@ -342,7 +392,7 @@ void SrsPublishRecvThread::on_thread_start() @@ -342,7 +392,7 @@ void SrsPublishRecvThread::on_thread_start()
342 #endif 392 #endif
343 } 393 }
344 394
345 -void SrsPublishRecvThread::on_thread_stop() 395 +void SrsPublishRecvThread::on_stop()
346 { 396 {
347 // we donot set the auto response to true, 397 // we donot set the auto response to true,
348 // for we donot set to false yet. 398 // for we donot set to false yet.
@@ -360,47 +410,6 @@ void SrsPublishRecvThread::on_thread_stop() @@ -360,47 +410,6 @@ void SrsPublishRecvThread::on_thread_stop()
360 #endif 410 #endif
361 } 411 }
362 412
363 -bool SrsPublishRecvThread::can_handle()  
364 -{  
365 - // publish thread always can handle message.  
366 - return true;  
367 -}  
368 -  
369 -int SrsPublishRecvThread::handle(SrsCommonMessage* msg)  
370 -{  
371 - int ret = ERROR_SUCCESS;  
372 -  
373 - // when cid changed, change it.  
374 - if (ncid != cid) {  
375 - _srs_context->set_id(ncid);  
376 - cid = ncid;  
377 - }  
378 -  
379 - _nb_msgs++;  
380 -  
381 - // log to show the time of recv thread.  
382 - srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",  
383 - srs_update_system_time_ms(), msg->header.timestamp, msg->size);  
384 -  
385 - // the rtmp connection will handle this message  
386 - ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);  
387 -  
388 - // must always free it,  
389 - // the source will copy it if need to use.  
390 - srs_freep(msg);  
391 -  
392 - return ret;  
393 -}  
394 -  
395 -void SrsPublishRecvThread::on_recv_error(int ret)  
396 -{  
397 - recv_error_code = ret;  
398 -  
399 - // when recv thread error, signal the conn thread to process it.  
400 - // @see https://github.com/ossrs/srs/issues/244  
401 - st_cond_signal(error);  
402 -}  
403 -  
404 #ifdef SRS_PERF_MERGED_READ 413 #ifdef SRS_PERF_MERGED_READ
405 void SrsPublishRecvThread::on_read(ssize_t nread) 414 void SrsPublishRecvThread::on_read(ssize_t nread)
406 { 415 {
@@ -45,36 +45,48 @@ class SrsRequest; @@ -45,36 +45,48 @@ class SrsRequest;
45 class SrsConsumer; 45 class SrsConsumer;
46 46
47 /** 47 /**
48 - * for the recv thread to handle the message. 48 + * The message consumer which consume a message.
49 */ 49 */
50 -class ISrsMessageHandler 50 +class ISrsMessageConsumer
51 { 51 {
52 public: 52 public:
53 - ISrsMessageHandler();  
54 - virtual ~ISrsMessageHandler(); 53 + ISrsMessageConsumer();
  54 + virtual ~ISrsMessageConsumer();
55 public: 55 public:
56 /** 56 /**
57 - * whether the handler can handle,  
58 - * for example, when queue recv handler got an message,  
59 - * it wait the user to process it, then the recv thread  
60 - * never recv message util the handler is ok.  
61 - */  
62 - virtual bool can_handle() = 0;  
63 - /**  
64 - * process the received message. 57 + * Consume the received message.
65 * @remark user must free this message. 58 * @remark user must free this message.
66 */ 59 */
67 - virtual int handle(SrsCommonMessage* msg) = 0; 60 + virtual int consume(SrsCommonMessage* msg) = 0;
  61 +};
  62 +
  63 +/**
  64 + * The message pumper to pump messages to processer.
  65 + */
  66 +class ISrsMessagePumper : public ISrsMessageConsumer
  67 +{
  68 +public:
  69 + ISrsMessagePumper();
  70 + virtual ~ISrsMessagePumper();
  71 +public:
68 /** 72 /**
69 - * when recv message error.  
70 - */  
71 - virtual void on_recv_error(int ret) = 0; 73 + * Whether the pumper is interrupted.
  74 + * For example, when pumpter is busy, it's interrupted,
  75 + * please wait for a while then try to feed the pumper.
  76 + */
  77 + virtual bool interrupted() = 0;
72 /** 78 /**
73 - * when thread start or stop,  
74 - * for example, the message handler can set whether auto response.  
75 - */  
76 - virtual void on_thread_start() = 0;  
77 - virtual void on_thread_stop() = 0; 79 + * Interrupt the pumper for a error.
  80 + */
  81 + virtual void interrupt(int error) = 0;
  82 + /**
  83 + * When start the pumper.
  84 + */
  85 + virtual void on_start() = 0;
  86 + /**
  87 + * When stop the pumper.
  88 + */
  89 + virtual void on_stop() = 0;
78 }; 90 };
79 91
80 /** 92 /**
@@ -84,14 +96,14 @@ class SrsRecvThread : public ISrsReusableThread2Handler @@ -84,14 +96,14 @@ class SrsRecvThread : public ISrsReusableThread2Handler
84 { 96 {
85 protected: 97 protected:
86 SrsReusableThread2* trd; 98 SrsReusableThread2* trd;
87 - ISrsMessageHandler* handler; 99 + ISrsMessagePumper* pumper;
88 SrsRtmpServer* rtmp; 100 SrsRtmpServer* rtmp;
89 // The recv timeout in ms. 101 // The recv timeout in ms.
90 int timeout; 102 int timeout;
91 public: 103 public:
92 // Constructor. 104 // Constructor.
93 // @param tm The receive timeout in ms. 105 // @param tm The receive timeout in ms.
94 - SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm); 106 + SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm);
95 virtual ~SrsRecvThread(); 107 virtual ~SrsRecvThread();
96 public: 108 public:
97 virtual int cid(); 109 virtual int cid();
@@ -112,7 +124,7 @@ public: @@ -112,7 +124,7 @@ public:
112 * @see: SrsRtmpConn::playing 124 * @see: SrsRtmpConn::playing
113 * @see: https://github.com/ossrs/srs/issues/217 125 * @see: https://github.com/ossrs/srs/issues/217
114 */ 126 */
115 -class SrsQueueRecvThread : public ISrsMessageHandler 127 +class SrsQueueRecvThread : public ISrsMessagePumper
116 { 128 {
117 private: 129 private:
118 std::vector<SrsCommonMessage*> queue; 130 std::vector<SrsCommonMessage*> queue;
@@ -132,24 +144,23 @@ public: @@ -132,24 +144,23 @@ public:
132 virtual int size(); 144 virtual int size();
133 virtual SrsCommonMessage* pump(); 145 virtual SrsCommonMessage* pump();
134 virtual int error_code(); 146 virtual int error_code();
  147 +// interface ISrsMessagePumper
135 public: 148 public:
136 - virtual bool can_handle();  
137 - virtual int handle(SrsCommonMessage* msg);  
138 - virtual void on_recv_error(int ret);  
139 -public:  
140 - virtual void on_thread_start();  
141 - virtual void on_thread_stop(); 149 + virtual int consume(SrsCommonMessage* msg);
  150 + virtual bool interrupted();
  151 + virtual void interrupt(int ret);
  152 + virtual void on_start();
  153 + virtual void on_stop();
142 }; 154 };
143 155
144 /** 156 /**
145 * the publish recv thread got message and callback the source method to process message. 157 * the publish recv thread got message and callback the source method to process message.
146 * @see: https://github.com/ossrs/srs/issues/237 158 * @see: https://github.com/ossrs/srs/issues/237
147 */ 159 */
148 -class SrsPublishRecvThread : virtual public ISrsMessageHandler 160 +class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler
149 #ifdef SRS_PERF_MERGED_READ 161 #ifdef SRS_PERF_MERGED_READ
150 , virtual public IMergeReadHandler 162 , virtual public IMergeReadHandler
151 #endif 163 #endif
152 - , virtual public ISrsReloadHandler  
153 { 164 {
154 private: 165 private:
155 SrsRecvThread trd; 166 SrsRecvThread trd;
@@ -195,13 +206,13 @@ public: @@ -195,13 +206,13 @@ public:
195 public: 206 public:
196 virtual int start(); 207 virtual int start();
197 virtual void stop(); 208 virtual void stop();
198 - virtual void on_thread_start();  
199 - virtual void on_thread_stop();  
200 -// interface ISrsMessageHandler 209 +// interface ISrsMessagePumper
201 public: 210 public:
202 - virtual bool can_handle();  
203 - virtual int handle(SrsCommonMessage* msg);  
204 - virtual void on_recv_error(int ret); 211 + virtual int consume(SrsCommonMessage* msg);
  212 + virtual bool interrupted();
  213 + virtual void interrupt(int ret);
  214 + virtual void on_start();
  215 + virtual void on_stop();
205 // interface IMergeReadHandler 216 // interface IMergeReadHandler
206 public: 217 public:
207 #ifdef SRS_PERF_MERGED_READ 218 #ifdef SRS_PERF_MERGED_READ