winlin

fix #442, support kickoff client.

@@ -41,6 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) @@ -41,6 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
41 manager = cm; 41 manager = cm;
42 stfd = c; 42 stfd = c;
43 disposed = false; 43 disposed = false;
  44 + expired = false;
44 45
45 // the client thread should reap itself, 46 // the client thread should reap itself,
46 // so we never use joinable. 47 // so we never use joinable.
@@ -116,4 +117,9 @@ int SrsConnection::srs_id() @@ -116,4 +117,9 @@ int SrsConnection::srs_id()
116 return id; 117 return id;
117 } 118 }
118 119
  120 +void SrsConnection::expire()
  121 +{
  122 + expired = true;
  123 +}
  124 +
119 125
@@ -88,6 +88,11 @@ protected: @@ -88,6 +88,11 @@ protected:
88 * when disposed, connection should stop cycle and cleanup itself. 88 * when disposed, connection should stop cycle and cleanup itself.
89 */ 89 */
90 bool disposed; 90 bool disposed;
  91 + /**
  92 + * whether connection is expired, application definition.
  93 + * when expired, the connection must never be served and quit ASAP.
  94 + */
  95 + bool expired;
91 public: 96 public:
92 SrsConnection(IConnectionManager* cm, st_netfd_t c); 97 SrsConnection(IConnectionManager* cm, st_netfd_t c);
93 virtual ~SrsConnection(); 98 virtual ~SrsConnection();
@@ -125,6 +130,10 @@ public: @@ -125,6 +130,10 @@ public:
125 * get the srs id which identify the client. 130 * get the srs id which identify the client.
126 */ 131 */
127 virtual int srs_id(); 132 virtual int srs_id();
  133 + /**
  134 + * set connection to expired.
  135 + */
  136 + virtual void expire();
128 protected: 137 protected:
129 /** 138 /**
130 * for concrete connection to do the cycle. 139 * for concrete connection to do the cycle.
@@ -660,22 +660,8 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -660,22 +660,8 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
660 srs_error("stream stream_id=%d not found. ret=%d", sid, ret); 660 srs_error("stream stream_id=%d not found. ret=%d", sid, ret);
661 return srs_http_response_code(w, ret); 661 return srs_http_response_code(w, ret);
662 } 662 }
663 -  
664 - if (r->is_http_delete()) {  
665 - srs_assert(stream);  
666 -  
667 - SrsSource* source = SrsSource::fetch(stream->vhost->vhost, stream->app, stream->stream);  
668 - if (!source) {  
669 - ret = ERROR_SOURCE_NOT_FOUND;  
670 - srs_warn("source not found for sid=%d", sid);  
671 - return srs_http_response_code(w, ret);  
672 - }  
673 -  
674 - source->set_expired();  
675 - srs_warn("disconnent stream=%d successfully. vhost=%s, app=%s, stream=%s.",  
676 - sid, stream->vhost->vhost.c_str(), stream->app.c_str(), stream->stream.c_str());  
677 - return srs_http_response_code(w, ret);  
678 - } else if (r->is_http_get()) { 663 +
  664 + if (r->is_http_get()) {
679 std::stringstream data; 665 std::stringstream data;
680 666
681 if (!stream) { 667 if (!stream) {
@@ -726,10 +712,15 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -726,10 +712,15 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
726 ret = ERROR_RTMP_STREAM_NOT_FOUND; 712 ret = ERROR_RTMP_STREAM_NOT_FOUND;
727 srs_error("stream client_id=%d not found. ret=%d", cid, ret); 713 srs_error("stream client_id=%d not found. ret=%d", cid, ret);
728 return srs_http_response_code(w, ret); 714 return srs_http_response_code(w, ret);
729 -  
730 } 715 }
731 716
732 - if (r->is_http_get()) { 717 + if (r->is_http_delete()) {
  718 + srs_assert(client);
  719 +
  720 + client->conn->expire();
  721 + srs_warn("delete client=%d ok", cid);
  722 + return srs_http_response_code(w, ret);
  723 + } else if (r->is_http_get()) {
733 std::stringstream data; 724 std::stringstream data;
734 725
735 if (!client) { 726 if (!client) {
@@ -751,6 +742,8 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -751,6 +742,8 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
751 } 742 }
752 743
753 return srs_http_response_json(w, ss.str()); 744 return srs_http_response_json(w, ss.str());
  745 + } else {
  746 + return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed);
754 } 747 }
755 748
756 return ret; 749 return ret;
@@ -495,7 +495,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -495,7 +495,7 @@ int SrsRtmpConn::stream_service_cycle()
495 495
496 // update the statistic when source disconveried. 496 // update the statistic when source disconveried.
497 SrsStatistic* stat = SrsStatistic::instance(); 497 SrsStatistic* stat = SrsStatistic::instance();
498 - if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) { 498 + if ((ret = stat->on_client(_srs_context->get_id(), req, this)) != ERROR_SUCCESS) {
499 srs_error("stat client failed. ret=%d", ret); 499 srs_error("stat client failed. ret=%d", ret);
500 return ret; 500 return ret;
501 } 501 }
@@ -671,6 +671,13 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe @@ -671,6 +671,13 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
671 while (!disposed) { 671 while (!disposed) {
672 // collect elapse for pithy print. 672 // collect elapse for pithy print.
673 pprint->elapse(); 673 pprint->elapse();
  674 +
  675 + // when source is set to expired, disconnect it.
  676 + if (expired) {
  677 + ret = ERROR_USER_DISCONNECT;
  678 + srs_error("connection expired. ret=%d", ret);
  679 + return ret;
  680 + }
674 681
675 // to use isolate thread to recv, can improve about 33% performance. 682 // to use isolate thread to recv, can improve about 33% performance.
676 // @see: https://github.com/simple-rtmp-server/srs/issues/196 683 // @see: https://github.com/simple-rtmp-server/srs/issues/196
@@ -875,9 +882,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -875,9 +882,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
875 pprint->elapse(); 882 pprint->elapse();
876 883
877 // when source is set to expired, disconnect it. 884 // when source is set to expired, disconnect it.
878 - if (source->expired()) { 885 + if (expired) {
879 ret = ERROR_USER_DISCONNECT; 886 ret = ERROR_USER_DISCONNECT;
880 - srs_error("source is expired. ret=%d", ret); 887 + srs_error("connection expired. ret=%d", ret);
881 return ret; 888 return ret;
882 } 889 }
883 890
@@ -898,7 +898,6 @@ SrsSource::SrsSource() @@ -898,7 +898,6 @@ SrsSource::SrsSource()
898 jitter_algorithm = SrsRtmpJitterAlgorithmOFF; 898 jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
899 mix_correct = false; 899 mix_correct = false;
900 mix_queue = new SrsMixQueue(); 900 mix_queue = new SrsMixQueue();
901 - is_expired = false;  
902 901
903 #ifdef SRS_AUTO_HLS 902 #ifdef SRS_AUTO_HLS
904 hls = new SrsHls(); 903 hls = new SrsHls();
@@ -2101,7 +2100,6 @@ void SrsSource::on_unpublish() @@ -2101,7 +2100,6 @@ void SrsSource::on_unpublish()
2101 2100
2102 _can_publish = true; 2101 _can_publish = true;
2103 _source_id = -1; 2102 _source_id = -1;
2104 - is_expired = false;  
2105 2103
2106 // notify the handler. 2104 // notify the handler.
2107 srs_assert(handler); 2105 srs_assert(handler);
@@ -2260,13 +2258,3 @@ void SrsSource::destroy_forwarders() @@ -2260,13 +2258,3 @@ void SrsSource::destroy_forwarders()
2260 forwarders.clear(); 2258 forwarders.clear();
2261 } 2259 }
2262 2260
2263 -bool SrsSource::expired()  
2264 -{  
2265 - return is_expired;  
2266 -}  
2267 -  
2268 -void SrsSource::set_expired()  
2269 -{  
2270 - is_expired = true;  
2271 -}  
2272 -  
@@ -453,8 +453,6 @@ private: @@ -453,8 +453,6 @@ private:
453 // whether use interlaced/mixed algorithm to correct timestamp. 453 // whether use interlaced/mixed algorithm to correct timestamp.
454 bool mix_correct; 454 bool mix_correct;
455 SrsMixQueue* mix_queue; 455 SrsMixQueue* mix_queue;
456 - // the flag of source expired or not.  
457 - bool is_expired;  
458 // whether stream is monotonically increase. 456 // whether stream is monotonically increase.
459 bool is_monotonically_increase; 457 bool is_monotonically_increase;
460 int64_t last_packet_time; 458 int64_t last_packet_time;
@@ -586,12 +584,6 @@ public: @@ -586,12 +584,6 @@ public:
586 private: 584 private:
587 virtual int create_forwarders(); 585 virtual int create_forwarders();
588 virtual void destroy_forwarders(); 586 virtual void destroy_forwarders();
589 -public:  
590 - virtual bool expired();  
591 - /**  
592 - * set source expired.  
593 - */  
594 - virtual void set_expired();  
595 }; 587 };
596 588
597 #endif 589 #endif
@@ -322,7 +322,7 @@ void SrsStatistic::on_stream_close(SrsRequest* req) @@ -322,7 +322,7 @@ void SrsStatistic::on_stream_close(SrsRequest* req)
322 stream->close(); 322 stream->close();
323 } 323 }
324 324
325 -int SrsStatistic::on_client(int id, SrsRequest* req) 325 +int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn)
326 { 326 {
327 int ret = ERROR_SUCCESS; 327 int ret = ERROR_SUCCESS;
328 328
@@ -341,6 +341,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req) @@ -341,6 +341,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req)
341 } 341 }
342 342
343 // got client. 343 // got client.
  344 + client->conn = conn;
344 stream->nb_clients++; 345 stream->nb_clients++;
345 vhost->nb_clients++; 346 vhost->nb_clients++;
346 347
@@ -112,6 +112,7 @@ struct SrsStatisticClient @@ -112,6 +112,7 @@ struct SrsStatisticClient
112 { 112 {
113 public: 113 public:
114 SrsStatisticStream* stream; 114 SrsStatisticStream* stream;
  115 + SrsConnection* conn;
115 int id; 116 int id;
116 public: 117 public:
117 SrsStatisticClient(); 118 SrsStatisticClient();
@@ -181,8 +182,9 @@ public: @@ -181,8 +182,9 @@ public:
181 * when got a client to publish/play stream, 182 * when got a client to publish/play stream,
182 * @param id, the client srs id. 183 * @param id, the client srs id.
183 * @param req, the client request object. 184 * @param req, the client request object.
  185 + * @param conn, the physical absract connection object.
184 */ 186 */
185 - virtual int on_client(int id, SrsRequest* req); 187 + virtual int on_client(int id, SrsRequest* req, SrsConnection* conn);
186 /** 188 /**
187 * client disconnect 189 * client disconnect
188 * @remark the on_disconnect always call, while the on_client is call when 190 * @remark the on_disconnect always call, while the on_client is call when
@@ -264,6 +264,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -264,6 +264,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
264 #define ERROR_HTTP_REQUEST_EOF 4029 264 #define ERROR_HTTP_REQUEST_EOF 4029
265 265
266 /////////////////////////////////////////////////////// 266 ///////////////////////////////////////////////////////
  267 +// HTTP API error.
  268 +///////////////////////////////////////////////////////
  269 +//#define ERROR_API_METHOD_NOT_ALLOWD
  270 +
  271 +///////////////////////////////////////////////////////
267 // user-define error. 272 // user-define error.
268 /////////////////////////////////////////////////////// 273 ///////////////////////////////////////////////////////
269 #define ERROR_USER_START 9000 274 #define ERROR_USER_START 9000
@@ -120,8 +120,11 @@ string srs_go_http_detect(char* data, int size) @@ -120,8 +120,11 @@ string srs_go_http_detect(char* data, int size)
120 return "application/octet-stream"; // fallback 120 return "application/octet-stream"; // fallback
121 } 121 }
122 122
123 -// Error replies to the request with the specified error message and HTTP code.  
124 -// The error message should be plain text. 123 +int srs_go_http_error(ISrsHttpResponseWriter* w, int code)
  124 +{
  125 + return srs_go_http_error(w, code, srs_generate_http_status_text(code));
  126 +}
  127 +
125 int srs_go_http_error(ISrsHttpResponseWriter* w, int code, string error) 128 int srs_go_http_error(ISrsHttpResponseWriter* w, int code, string error)
126 { 129 {
127 int ret = ERROR_SUCCESS; 130 int ret = ERROR_SUCCESS;
@@ -287,7 +290,7 @@ bool SrsHttpNotFoundHandler::is_not_found() @@ -287,7 +290,7 @@ bool SrsHttpNotFoundHandler::is_not_found()
287 290
288 int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) 291 int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
289 { 292 {
290 - return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound, SRS_CONSTS_HTTP_NotFound_str); 293 + return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound);
291 } 294 }
292 295
293 SrsHttpFileServer::SrsHttpFileServer(string root_dir) 296 SrsHttpFileServer::SrsHttpFileServer(string root_dir)
@@ -76,6 +76,11 @@ class ISrsHttpResponseWriter; @@ -76,6 +76,11 @@ class ISrsHttpResponseWriter;
76 #define SRS_CONSTS_HTTP_PUT HTTP_PUT 76 #define SRS_CONSTS_HTTP_PUT HTTP_PUT
77 #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE 77 #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE
78 78
  79 +// Error replies to the request with the specified error message and HTTP code.
  80 +// The error message should be plain text.
  81 +extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code);
  82 +extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code, std::string error);
  83 +
79 // helper function: response in json format. 84 // helper function: response in json format.
80 extern int srs_http_response_json(ISrsHttpResponseWriter* w, std::string data); 85 extern int srs_http_response_json(ISrsHttpResponseWriter* w, std::string data);
81 /** 86 /**