winlin

Merge pull request #259 from zhengfl/master

merge from feilong: 用户连接没有断开
@@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #include <srs_kernel_utility.hpp> 30 #include <srs_kernel_utility.hpp>
31 #include <srs_core_performance.hpp> 31 #include <srs_core_performance.hpp>
32 #include <srs_app_config.hpp> 32 #include <srs_app_config.hpp>
  33 +#include <srs_app_source.hpp>
33 34
34 using namespace std; 35 using namespace std;
35 36
@@ -138,11 +139,12 @@ SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) @@ -138,11 +139,12 @@ SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms)
138 { 139 {
139 rtmp = rtmp_sdk; 140 rtmp = rtmp_sdk;
140 recv_error_code = ERROR_SUCCESS; 141 recv_error_code = ERROR_SUCCESS;
  142 + _consumer = NULL;
141 } 143 }
142 144
143 SrsQueueRecvThread::~SrsQueueRecvThread() 145 SrsQueueRecvThread::~SrsQueueRecvThread()
144 { 146 {
145 - trd.stop(); 147 + stop();
146 148
147 // clear all messages. 149 // clear all messages.
148 std::vector<SrsCommonMessage*>::iterator it; 150 std::vector<SrsCommonMessage*>::iterator it;
@@ -160,6 +162,7 @@ int SrsQueueRecvThread::start() @@ -160,6 +162,7 @@ int SrsQueueRecvThread::start()
160 162
161 void SrsQueueRecvThread::stop() 163 void SrsQueueRecvThread::stop()
162 { 164 {
  165 + _consumer = NULL;
163 trd.stop(); 166 trd.stop();
164 } 167 }
165 168
@@ -203,13 +206,22 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) @@ -203,13 +206,22 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
203 // put into queue, the send thread will get and process it, 206 // put into queue, the send thread will get and process it,
204 // @see SrsRtmpConn::process_play_control_msg 207 // @see SrsRtmpConn::process_play_control_msg
205 queue.push_back(msg); 208 queue.push_back(msg);
206 - 209 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  210 + if (_consumer) {
  211 + _consumer->on_dispose();
  212 + }
  213 +#endif
207 return ERROR_SUCCESS; 214 return ERROR_SUCCESS;
208 } 215 }
209 216
210 void SrsQueueRecvThread::on_recv_error(int ret) 217 void SrsQueueRecvThread::on_recv_error(int ret)
211 { 218 {
212 recv_error_code = ret; 219 recv_error_code = ret;
  220 +#ifdef SRS_PERF_QUEUE_COND_WAIT
  221 + if (_consumer) {
  222 + _consumer->on_dispose();
  223 + }
  224 +#endif
213 } 225 }
214 226
215 void SrsQueueRecvThread::on_thread_start() 227 void SrsQueueRecvThread::on_thread_start()
@@ -226,6 +238,11 @@ void SrsQueueRecvThread::on_thread_stop() @@ -226,6 +238,11 @@ void SrsQueueRecvThread::on_thread_stop()
226 rtmp->set_auto_response(true); 238 rtmp->set_auto_response(true);
227 } 239 }
228 240
  241 +void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer)
  242 +{
  243 + _consumer = consumer;
  244 +}
  245 +
229 SrsPublishRecvThread::SrsPublishRecvThread( 246 SrsPublishRecvThread::SrsPublishRecvThread(
230 SrsRtmpServer* rtmp_sdk, 247 SrsRtmpServer* rtmp_sdk,
231 SrsRequest* _req, int mr_sock_fd, int timeout_ms, 248 SrsRequest* _req, int mr_sock_fd, int timeout_ms,
@@ -42,6 +42,7 @@ class SrsCommonMessage; @@ -42,6 +42,7 @@ class SrsCommonMessage;
42 class SrsRtmpConn; 42 class SrsRtmpConn;
43 class SrsSource; 43 class SrsSource;
44 class SrsRequest; 44 class SrsRequest;
  45 +class SrsConsumer;
45 46
46 /** 47 /**
47 * for the recv thread to handle the message. 48 * for the recv thread to handle the message.
@@ -112,6 +113,7 @@ private: @@ -112,6 +113,7 @@ private:
112 SrsRtmpServer* rtmp; 113 SrsRtmpServer* rtmp;
113 // the recv thread error code. 114 // the recv thread error code.
114 int recv_error_code; 115 int recv_error_code;
  116 + SrsConsumer *_consumer;
115 public: 117 public:
116 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); 118 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms);
117 virtual ~SrsQueueRecvThread(); 119 virtual ~SrsQueueRecvThread();
@@ -130,6 +132,8 @@ public: @@ -130,6 +132,8 @@ public:
130 public: 132 public:
131 virtual void on_thread_start(); 133 virtual void on_thread_start();
132 virtual void on_thread_stop(); 134 virtual void on_thread_stop();
  135 +public:
  136 + virtual void set_consumer(SrsConsumer *consumer);
133 }; 137 };
134 138
135 /** 139 /**
@@ -595,6 +595,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -595,6 +595,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
595 // when mw_sleep changed, resize the socket send buffer. 595 // when mw_sleep changed, resize the socket send buffer.
596 mw_enabled = true; 596 mw_enabled = true;
597 change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); 597 change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
  598 + trd->set_consumer(consumer);
598 599
599 while (true) { 600 while (true) {
600 // to use isolate thread to recv, can improve about 33% performance. 601 // to use isolate thread to recv, can improve about 33% performance.
@@ -515,6 +515,14 @@ void SrsConsumer::wait(int nb_msgs, int duration) @@ -515,6 +515,14 @@ void SrsConsumer::wait(int nb_msgs, int duration)
515 // use cond block wait for high performance mode. 515 // use cond block wait for high performance mode.
516 st_cond_wait(mw_wait); 516 st_cond_wait(mw_wait);
517 } 517 }
  518 +
  519 +void SrsConsumer::on_dispose()
  520 +{
  521 + if (mw_waiting) {
  522 + st_cond_signal(mw_wait);
  523 + mw_waiting = false;
  524 + }
  525 +}
518 #endif 526 #endif
519 527
520 int SrsConsumer::on_play_client_pause(bool is_pause) 528 int SrsConsumer::on_play_client_pause(bool is_pause)
@@ -246,6 +246,10 @@ public: @@ -246,6 +246,10 @@ public:
246 * @param duration the messgae duration to wait. 246 * @param duration the messgae duration to wait.
247 */ 247 */
248 virtual void wait(int nb_msgs, int duration); 248 virtual void wait(int nb_msgs, int duration);
  249 + /**
  250 + * when waiting, a message incomming, we rouse it
  251 + */
  252 + virtual void on_dispose();
249 #endif 253 #endif
250 /** 254 /**
251 * when client send the pause message. 255 * when client send the pause message.