winlin

for bug #237, when recv thread failed, quit the cycle. 2.0.44

@@ -81,6 +81,9 @@ int SrsRecvThread::cycle() @@ -81,6 +81,9 @@ int SrsRecvThread::cycle()
81 // we use no timeout to recv, should never got any error. 81 // we use no timeout to recv, should never got any error.
82 trd->stop_loop(); 82 trd->stop_loop();
83 83
  84 + // notice the handler got a recv error.
  85 + handler->on_recv_error(ret);
  86 +
84 return ret; 87 return ret;
85 } 88 }
86 srs_verbose("play loop recv message. ret=%d", ret); 89 srs_verbose("play loop recv message. ret=%d", ret);
@@ -122,6 +125,7 @@ void SrsRecvThread::on_thread_stop() @@ -122,6 +125,7 @@ void SrsRecvThread::on_thread_stop()
122 SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) 125 SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms)
123 : trd(this, rtmp_sdk, timeout_ms) 126 : trd(this, rtmp_sdk, timeout_ms)
124 { 127 {
  128 + recv_error_code = ERROR_SUCCESS;
125 } 129 }
126 130
127 SrsQueueRecvThread::~SrsQueueRecvThread() 131 SrsQueueRecvThread::~SrsQueueRecvThread()
@@ -168,6 +172,11 @@ SrsMessage* SrsQueueRecvThread::pump() @@ -168,6 +172,11 @@ SrsMessage* SrsQueueRecvThread::pump()
168 return msg; 172 return msg;
169 } 173 }
170 174
  175 +int SrsQueueRecvThread::error_code()
  176 +{
  177 + return recv_error_code;
  178 +}
  179 +
171 bool SrsQueueRecvThread::can_handle() 180 bool SrsQueueRecvThread::can_handle()
172 { 181 {
173 // we only recv one message and then process it, 182 // we only recv one message and then process it,
@@ -186,6 +195,11 @@ int SrsQueueRecvThread::handle(SrsMessage* msg) @@ -186,6 +195,11 @@ int SrsQueueRecvThread::handle(SrsMessage* msg)
186 return ERROR_SUCCESS; 195 return ERROR_SUCCESS;
187 } 196 }
188 197
  198 +void SrsQueueRecvThread::on_recv_error(int ret)
  199 +{
  200 + recv_error_code = ret;
  201 +}
  202 +
189 SrsPublishRecvThread::SrsPublishRecvThread( 203 SrsPublishRecvThread::SrsPublishRecvThread(
190 SrsRtmpServer* rtmp_sdk, int timeout_ms, 204 SrsRtmpServer* rtmp_sdk, int timeout_ms,
191 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge 205 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
@@ -254,3 +268,8 @@ int SrsPublishRecvThread::handle(SrsMessage* msg) @@ -254,3 +268,8 @@ int SrsPublishRecvThread::handle(SrsMessage* msg)
254 // TODO: FIXME: implements it. 268 // TODO: FIXME: implements it.
255 return ret; 269 return ret;
256 } 270 }
  271 +
  272 +void SrsPublishRecvThread::on_recv_error(int ret)
  273 +{
  274 + recv_error_code = ret;
  275 +}
@@ -59,6 +59,10 @@ public: @@ -59,6 +59,10 @@ public:
59 * process the received message. 59 * process the received message.
60 */ 60 */
61 virtual int handle(SrsMessage* msg) = 0; 61 virtual int handle(SrsMessage* msg) = 0;
  62 + /**
  63 + * when recv message error.
  64 + */
  65 + virtual void on_recv_error(int ret) = 0;
62 }; 66 };
63 67
64 /** 68 /**
@@ -95,6 +99,8 @@ class SrsQueueRecvThread : public ISrsMessageHandler @@ -95,6 +99,8 @@ class SrsQueueRecvThread : public ISrsMessageHandler
95 private: 99 private:
96 std::vector<SrsMessage*> queue; 100 std::vector<SrsMessage*> queue;
97 SrsRecvThread trd; 101 SrsRecvThread trd;
  102 + // the recv thread error code.
  103 + int recv_error_code;
98 public: 104 public:
99 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); 105 SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms);
100 virtual ~SrsQueueRecvThread(); 106 virtual ~SrsQueueRecvThread();
@@ -105,9 +111,11 @@ public: @@ -105,9 +111,11 @@ public:
105 virtual bool empty(); 111 virtual bool empty();
106 virtual int size(); 112 virtual int size();
107 virtual SrsMessage* pump(); 113 virtual SrsMessage* pump();
  114 + virtual int error_code();
108 public: 115 public:
109 virtual bool can_handle(); 116 virtual bool can_handle();
110 virtual int handle(SrsMessage* msg); 117 virtual int handle(SrsMessage* msg);
  118 + virtual void on_recv_error(int ret);
111 }; 119 };
112 120
113 /** 121 /**
@@ -139,6 +147,7 @@ public: @@ -139,6 +147,7 @@ public:
139 public: 147 public:
140 virtual bool can_handle(); 148 virtual bool can_handle();
141 virtual int handle(SrsMessage* msg); 149 virtual int handle(SrsMessage* msg);
  150 + virtual void on_recv_error(int ret);
142 }; 151 };
143 152
144 #endif 153 #endif
@@ -558,13 +558,21 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) @@ -558,13 +558,21 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
558 srs_verbose("pump client message to process."); 558 srs_verbose("pump client message to process.");
559 559
560 if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { 560 if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
561 - if (!srs_is_system_control_error(ret)) { 561 + if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
562 srs_error("process play control message failed. ret=%d", ret); 562 srs_error("process play control message failed. ret=%d", ret);
563 } 563 }
564 return ret; 564 return ret;
565 } 565 }
566 } 566 }
567 567
  568 + // quit when recv thread error.
  569 + if ((ret = trd->error_code()) != ERROR_SUCCESS) {
  570 + if (!srs_is_client_gracefully_close(ret)) {
  571 + srs_error("recv thread failed. ret=%d", ret);
  572 + }
  573 + return ret;
  574 + }
  575 +
568 // collect elapse for pithy print. 576 // collect elapse for pithy print.
569 pithy_print.elapse(); 577 pithy_print.elapse();
570 578
@@ -744,6 +752,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -744,6 +752,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
744 752
745 // check the thread error code. 753 // check the thread error code.
746 if ((ret = trd->error_code()) != ERROR_SUCCESS) { 754 if ((ret = trd->error_code()) != ERROR_SUCCESS) {
  755 + if (!srs_is_client_gracefully_close(ret)) {
  756 + srs_error("recv thread failed. ret=%d", ret);
  757 + }
747 return ret; 758 return ret;
748 } 759 }
749 } 760 }
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 43 34 +#define VERSION_REVISION 44
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"