lovacat

fix #442: HTTP API kickoff client.

@@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #ifdef SRS_AUTO_HTTP_API 26 #ifdef SRS_AUTO_HTTP_API
27 27
28 #include <sstream> 28 #include <sstream>
  29 +#include <stdlib.h>
29 using namespace std; 30 using namespace std;
30 31
31 #include <srs_kernel_log.hpp> 32 #include <srs_kernel_log.hpp>
@@ -39,6 +40,7 @@ using namespace std; @@ -39,6 +40,7 @@ using namespace std;
39 #include <srs_rtmp_stack.hpp> 40 #include <srs_rtmp_stack.hpp>
40 #include <srs_app_dvr.hpp> 41 #include <srs_app_dvr.hpp>
41 #include <srs_app_config.hpp> 42 #include <srs_app_config.hpp>
  43 +#include <srs_app_source.hpp>
42 #include <srs_app_http_conn.hpp> 44 #include <srs_app_http_conn.hpp>
43 45
44 SrsGoApiRoot::SrsGoApiRoot() 46 SrsGoApiRoot::SrsGoApiRoot()
@@ -459,21 +461,67 @@ SrsGoApiStreams::~SrsGoApiStreams() @@ -459,21 +461,67 @@ SrsGoApiStreams::~SrsGoApiStreams()
459 461
460 int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) 462 int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
461 { 463 {
462 - std::stringstream data; 464 + int ret = ERROR_SUCCESS;
463 SrsStatistic* stat = SrsStatistic::instance(); 465 SrsStatistic* stat = SrsStatistic::instance();
464 - int ret = stat->dumps_streams(data);  
465 -  
466 std::stringstream ss; 466 std::stringstream ss;
467 -  
468 - ss << SRS_JOBJECT_START  
469 - << SRS_JFIELD_ERROR(ret) << SRS_JFIELD_CONT  
470 - << SRS_JFIELD_ORG("server", stat->server_id()) << SRS_JFIELD_CONT  
471 - << SRS_JFIELD_ORG("streams", data.str())  
472 - << SRS_JOBJECT_END;  
473 -  
474 - return srs_http_response_json(w, ss.str()); 467 +
  468 + if (r->is_http_delete()) {
  469 + // path: {pattern}{stream_id}
  470 + // e.g. /api/v1/streams/100 pattern= /api/v1/streams/, stream_id=100
  471 + string sid = r->path().substr((int)entry->pattern.length());
  472 + if (sid.empty()) {
  473 + ret = ERROR_REQUEST_DATA;
  474 + srs_error("invalid param, stream_id=%s. ret=%d", sid.c_str(), ret);
  475 +
  476 + ss << SRS_JOBJECT_START
  477 + << SRS_JFIELD_ERROR(ret)
  478 + << SRS_JOBJECT_END;
  479 +
  480 + return srs_http_response_json(w, ss.str());
  481 + }
  482 +
  483 + int stream_id = ::atoi(sid.c_str());
  484 + SrsStatisticStream* stream = stat->find_stream(stream_id);
  485 + if (stream == NULL) {
  486 + ret = ERROR_RTMP_STREAM_NOT_FOUND;
  487 + srs_error("stream stream_id=%s not found. ret=%d", sid.c_str(), ret);
  488 +
  489 + ss << SRS_JOBJECT_START
  490 + << SRS_JFIELD_ERROR(ret)
  491 + << SRS_JOBJECT_END;
  492 +
  493 + return srs_http_response_json(w, ss.str());
  494 + }
  495 +
  496 + SrsSource* source = SrsSource::fetch(stream->vhost->vhost, stream->app, stream->stream);
  497 + if (source) {
  498 + source->set_expired();
  499 + srs_warn("disconnent stream=%d successfully. vhost=%s, app=%s, stream=%s.",
  500 + stream_id, stream->vhost->vhost.c_str(), stream->app.c_str(), stream->stream.c_str());
  501 + } else {
  502 + ret = ERROR_SOURCE_NOT_FOUND;
  503 + }
  504 +
  505 + ss << SRS_JOBJECT_START
  506 + << SRS_JFIELD_ERROR(ret)
  507 + << SRS_JOBJECT_END;
  508 +
  509 + return srs_http_response_json(w, ss.str());
  510 + } else {
  511 + std::stringstream data;
  512 + int ret = stat->dumps_streams(data);
  513 +
  514 + ss << SRS_JOBJECT_START
  515 + << SRS_JFIELD_ERROR(ret) << SRS_JFIELD_CONT
  516 + << SRS_JFIELD_ORG("server", stat->server_id()) << SRS_JFIELD_CONT
  517 + << SRS_JFIELD_ORG("streams", data.str())
  518 + << SRS_JOBJECT_END;
  519 +
  520 + return srs_http_response_json(w, ss.str());
  521 + }
475 } 522 }
476 523
  524 +
477 SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) 525 SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
478 : SrsConnection(cm, fd) 526 : SrsConnection(cm, fd)
479 { 527 {
@@ -104,11 +104,11 @@ int SrsHttpResponseWriter::write(char* data, int size) @@ -104,11 +104,11 @@ int SrsHttpResponseWriter::write(char* data, int size)
104 104
105 if (!header_wrote) { 105 if (!header_wrote) {
106 write_header(SRS_CONSTS_HTTP_OK); 106 write_header(SRS_CONSTS_HTTP_OK);
107 -  
108 - if ((ret = send_header(data, size)) != ERROR_SUCCESS) {  
109 - srs_error("http: send header failed. ret=%d", ret);  
110 - return ret;  
111 - } 107 + }
  108 +
  109 + if ((ret = send_header(data, size)) != ERROR_SUCCESS) {
  110 + srs_error("http: send header failed. ret=%d", ret);
  111 + return ret;
112 } 112 }
113 113
114 // check the bytes send and content length. 114 // check the bytes send and content length.
@@ -773,6 +773,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -773,6 +773,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
773 int64_t nb_msgs = 0; 773 int64_t nb_msgs = 0;
774 while (!disposed) { 774 while (!disposed) {
775 pprint->elapse(); 775 pprint->elapse();
  776 +
  777 + // when source is set to expired, disconnect it.
  778 + if (source->expired()) {
  779 + ret = ERROR_USER_DISCONNECT;
  780 + srs_error("source is expired. ret=%d", ret);
  781 + return ret;
  782 + }
776 783
777 // cond wait for timeout. 784 // cond wait for timeout.
778 if (nb_msgs == 0) { 785 if (nb_msgs == 0) {
@@ -806,7 +806,7 @@ int SrsServer::http_handle() @@ -806,7 +806,7 @@ int SrsServer::http_handle()
806 if ((ret = http_api_mux->handle("/api/v1/vhosts", new SrsGoApiVhosts())) != ERROR_SUCCESS) { 806 if ((ret = http_api_mux->handle("/api/v1/vhosts", new SrsGoApiVhosts())) != ERROR_SUCCESS) {
807 return ret; 807 return ret;
808 } 808 }
809 - if ((ret = http_api_mux->handle("/api/v1/streams", new SrsGoApiStreams())) != ERROR_SUCCESS) { 809 + if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {
810 return ret; 810 return ret;
811 } 811 }
812 #endif 812 #endif
@@ -890,6 +890,7 @@ SrsSource::SrsSource() @@ -890,6 +890,7 @@ SrsSource::SrsSource()
890 jitter_algorithm = SrsRtmpJitterAlgorithmOFF; 890 jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
891 mix_correct = false; 891 mix_correct = false;
892 mix_queue = new SrsMixQueue(); 892 mix_queue = new SrsMixQueue();
  893 + is_expired = false;
893 894
894 #ifdef SRS_AUTO_HLS 895 #ifdef SRS_AUTO_HLS
895 hls = new SrsHls(); 896 hls = new SrsHls();
@@ -2071,6 +2072,7 @@ void SrsSource::on_unpublish() @@ -2071,6 +2072,7 @@ void SrsSource::on_unpublish()
2071 2072
2072 _can_publish = true; 2073 _can_publish = true;
2073 _source_id = -1; 2074 _source_id = -1;
  2075 + is_expired = false;
2074 2076
2075 // notify the handler. 2077 // notify the handler.
2076 srs_assert(handler); 2078 srs_assert(handler);
@@ -2229,3 +2231,13 @@ void SrsSource::destroy_forwarders() @@ -2229,3 +2231,13 @@ void SrsSource::destroy_forwarders()
2229 forwarders.clear(); 2231 forwarders.clear();
2230 } 2232 }
2231 2233
  2234 +bool SrsSource::expired()
  2235 +{
  2236 + return is_expired;
  2237 +}
  2238 +
  2239 +void SrsSource::set_expired()
  2240 +{
  2241 + is_expired = true;
  2242 +}
  2243 +
@@ -452,6 +452,8 @@ private: @@ -452,6 +452,8 @@ private:
452 // whether use interlaced/mixed algorithm to correct timestamp. 452 // whether use interlaced/mixed algorithm to correct timestamp.
453 bool mix_correct; 453 bool mix_correct;
454 SrsMixQueue* mix_queue; 454 SrsMixQueue* mix_queue;
  455 + // the flag of source expired or not.
  456 + bool is_expired;
455 // whether stream is monotonically increase. 457 // whether stream is monotonically increase.
456 bool is_monotonically_increase; 458 bool is_monotonically_increase;
457 int64_t last_packet_time; 459 int64_t last_packet_time;
@@ -583,6 +585,12 @@ public: @@ -583,6 +585,12 @@ public:
583 private: 585 private:
584 virtual int create_forwarders(); 586 virtual int create_forwarders();
585 virtual void destroy_forwarders(); 587 virtual void destroy_forwarders();
  588 +public:
  589 + virtual bool expired();
  590 + /**
  591 + * set source expired.
  592 + */
  593 + virtual void set_expired();
586 }; 594 };
587 595
588 #endif 596 #endif
@@ -134,6 +134,20 @@ SrsStatistic* SrsStatistic::instance() @@ -134,6 +134,20 @@ SrsStatistic* SrsStatistic::instance()
134 return _instance; 134 return _instance;
135 } 135 }
136 136
  137 +SrsStatisticStream* SrsStatistic::find_stream(int stream_id)
  138 +{
  139 + std::map<int, SrsStatisticClient*>::iterator it;
  140 + for (it = clients.begin(); it != clients.end(); it++) {
  141 + SrsStatisticClient* client = it->second;
  142 + SrsStatisticStream* stream = client->stream;
  143 +
  144 + if (stream_id == stream->id) {
  145 + return stream;
  146 + }
  147 + }
  148 + return NULL;
  149 +}
  150 +
137 int SrsStatistic::on_video_info(SrsRequest* req, 151 int SrsStatistic::on_video_info(SrsRequest* req,
138 SrsCodecVideo vcodec, SrsAvcProfile avc_profile, SrsAvcLevel avc_level 152 SrsCodecVideo vcodec, SrsAvcProfile avc_profile, SrsAvcLevel avc_level
139 ) { 153 ) {
@@ -131,6 +131,7 @@ private: @@ -131,6 +131,7 @@ private:
131 public: 131 public:
132 static SrsStatistic* instance(); 132 static SrsStatistic* instance();
133 public: 133 public:
  134 + virtual SrsStatisticStream* find_stream(int stream_id);
134 /** 135 /**
135 * when got video info for stream. 136 * when got video info for stream.
136 */ 137 */
@@ -149,6 +149,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -149,6 +149,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
149 #define ERROR_RTP_TYPE96_CORRUPT 2045 149 #define ERROR_RTP_TYPE96_CORRUPT 2045
150 #define ERROR_RTP_TYPE97_CORRUPT 2046 150 #define ERROR_RTP_TYPE97_CORRUPT 2046
151 #define ERROR_RTSP_AUDIO_CONFIG 2047 151 #define ERROR_RTSP_AUDIO_CONFIG 2047
  152 +#define ERROR_RTMP_STREAM_NOT_FOUND 2048
152 // 153 //
153 // system control message, 154 // system control message,
154 // not an error, but special control logic. 155 // not an error, but special control logic.
@@ -225,6 +226,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -225,6 +226,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
225 #define ERROR_HLS_NO_STREAM 3062 226 #define ERROR_HLS_NO_STREAM 3062
226 #define ERROR_JSON_LOADS 3063 227 #define ERROR_JSON_LOADS 3063
227 #define ERROR_RESPONSE_CODE 3064 228 #define ERROR_RESPONSE_CODE 3064
  229 +#define ERROR_RESPONSE_DATA 3065
  230 +#define ERROR_REQUEST_DATA 3066
228 231
229 /////////////////////////////////////////////////////// 232 ///////////////////////////////////////////////////////
230 // HTTP/StreamCaster protocol error. 233 // HTTP/StreamCaster protocol error.
@@ -246,7 +246,17 @@ SrsHttpRedirectHandler::~SrsHttpRedirectHandler() @@ -246,7 +246,17 @@ SrsHttpRedirectHandler::~SrsHttpRedirectHandler()
246 int SrsHttpRedirectHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) 246 int SrsHttpRedirectHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
247 { 247 {
248 int ret = ERROR_SUCCESS; 248 int ret = ERROR_SUCCESS;
249 - // TODO: FIXME: implements it. 249 + string msg = "Moved Permsanently";
  250 +
  251 + w->header()->set_content_type("text/plain; charset=utf-8");
  252 + w->header()->set_content_length(msg.length());
  253 + w->header()->set("Location", url);
  254 + w->write_header(code);
  255 +
  256 + w->write((char*)msg.data(), (int)msg.length());
  257 + w->final_request();
  258 +
  259 + srs_info("redirect to %s.", url.c_str());
250 return ret; 260 return ret;
251 } 261 }
252 262