winlin

support http callback hooks: on_connect

@@ -49,14 +49,53 @@ if len(sys.argv) <= 1: @@ -49,14 +49,53 @@ if len(sys.argv) <= 1:
49 print "See also: https://github.com/winlinvip/simple-rtmp-server" 49 print "See also: https://github.com/winlinvip/simple-rtmp-server"
50 sys.exit(1) 50 sys.exit(1)
51 51
52 -import datetime, cherrypy 52 +import json, datetime, cherrypy
53 53
54 def trace(msg): 54 def trace(msg):
55 date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 55 date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
56 print "[%s][trace] %s"%(date, msg) 56 print "[%s][trace] %s"%(date, msg)
57 57
  58 +def enable_crossdomain():
  59 + cherrypy.response.headers["Access-Control-Allow-Origin"] = "*"
  60 + cherrypy.response.headers["Access-Control-Allow-Methods"] = "GET, POST, HEAD, PUT, DELETE"
  61 + # generate allow headers for crossdomain.
  62 + allow_headers = ["Cache-Control", "X-Proxy-Authorization", "X-Requested-With", "Content-Type"]
  63 + cherrypy.response.headers["Access-Control-Allow-Headers"] = ",".join(allow_headers)
  64 +
  65 +class Error:
  66 + # ok, success, completed.
  67 + success = 0
  68 + # error when parse json
  69 + system_parse_json = 100
  70 +
  71 +'''
  72 +handle the clients requests:
  73 +POST: create new client, handle the SRS on_connect callback.
  74 +'''
58 class RESTClients(object): 75 class RESTClients(object):
59 - pass; 76 + exposed = True
  77 + def GET(self):
  78 + enable_crossdomain();
  79 +
  80 + clients = {};
  81 + return json.dumps(clients);
  82 +
  83 + def POST(self):
  84 + enable_crossdomain();
  85 +
  86 + req = cherrypy.request.body.read();
  87 + trace("post to clients, req=%s"%(req));
  88 + try:
  89 + json_req = json.loads(req)
  90 + except Exception, ex:
  91 + trace("parse the request to json failed, req=%s, ex=%s"%(req, ex))
  92 + return str(Error.system_parse_json);
  93 +
  94 + trace("valid clients post request success.")
  95 + return str(Error.success);
  96 +
  97 + def OPTIONS(self):
  98 + enable_crossdomain()
60 99
61 class Root(object): 100 class Root(object):
62 def __init__(self): 101 def __init__(self):
@@ -144,5 +144,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -144,5 +144,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
144 144
145 #define ERROR_HTTP_PARSE_URI 800 145 #define ERROR_HTTP_PARSE_URI 800
146 #define ERROR_HTTP_DATA_INVLIAD 801 146 #define ERROR_HTTP_DATA_INVLIAD 801
  147 +#define ERROR_HTTP_PARSE_HEADER 802
147 148
148 #endif 149 #endif
@@ -23,13 +23,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,13 +23,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_core_http.hpp> 24 #include <srs_core_http.hpp>
25 25
  26 +#ifdef SRS_HTTP
  27 +
  28 +#include <sstream>
  29 +
26 #include <stdlib.h> 30 #include <stdlib.h>
  31 +#include <sys/socket.h>
  32 +#include <netinet/in.h>
  33 +#include <arpa/inet.h>
27 34
28 #include <srs_core_error.hpp> 35 #include <srs_core_error.hpp>
29 #include <srs_core_rtmp.hpp> 36 #include <srs_core_rtmp.hpp>
30 #include <srs_core_log.hpp> 37 #include <srs_core_log.hpp>
31 -  
32 -#ifdef SRS_HTTP 38 +#include <srs_core_socket.hpp>
33 39
34 #define SRS_DEFAULT_HTTP_PORT 80 40 #define SRS_DEFAULT_HTTP_PORT 80
35 #define SRS_HTTP_RESPONSE_OK "0" 41 #define SRS_HTTP_RESPONSE_OK "0"
@@ -97,6 +103,11 @@ int SrsHttpUri::get_port() @@ -97,6 +103,11 @@ int SrsHttpUri::get_port()
97 return port; 103 return port;
98 } 104 }
99 105
  106 +const char* SrsHttpUri::get_path()
  107 +{
  108 + return path.c_str();
  109 +}
  110 +
100 std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, http_parser_url_fields field) 111 std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, http_parser_url_fields field)
101 { 112 {
102 if((hp_u->field_set & (1 << field)) == 0){ 113 if((hp_u->field_set & (1 << field)) == 0){
@@ -117,18 +128,270 @@ std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, ht @@ -117,18 +128,270 @@ std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, ht
117 128
118 SrsHttpClient::SrsHttpClient() 129 SrsHttpClient::SrsHttpClient()
119 { 130 {
  131 + connected = false;
  132 + stfd = NULL;
120 } 133 }
121 134
122 SrsHttpClient::~SrsHttpClient() 135 SrsHttpClient::~SrsHttpClient()
123 { 136 {
  137 + disconnect();
124 } 138 }
125 139
126 int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res) 140 int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res)
127 { 141 {
128 int ret = ERROR_SUCCESS; 142 int ret = ERROR_SUCCESS;
  143 +
  144 + if ((ret = connect(uri)) != ERROR_SUCCESS) {
  145 + srs_error("http connect server failed. ret=%d", ret);
  146 + return ret;
  147 + }
  148 +
  149 + // send POST request to uri
  150 + // POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
  151 + std::stringstream ss;
  152 + ss << "POST " << uri->get_path() << " "
  153 + << "HTTP/1.1\r\n"
  154 + << "Host: " << uri->get_host() << "\r\n"
  155 + << "Connection: Keep-Alive" << "\r\n"
  156 + << "Content-Length: " << std::dec << req.length() << "\r\n"
  157 + << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << "\r\n"
  158 + << "Content-Type: text/html" << "\r\n"
  159 + << "\r\n"
  160 + << req;
  161 +
  162 + SrsSocket skt(stfd);
  163 +
  164 + std::string data = ss.str();
  165 + ssize_t nwrite;
  166 + if ((ret = skt.write(data.c_str(), data.length(), &nwrite)) != ERROR_SUCCESS) {
  167 + // disconnect when error.
  168 + disconnect();
  169 +
  170 + srs_error("write http post failed. ret=%d", ret);
  171 + return ret;
  172 + }
  173 +
  174 + if ((ret = parse_response(uri, &skt, &res)) != ERROR_SUCCESS) {
  175 + srs_error("parse http post response failed. ret=%d", ret);
  176 + return ret;
  177 + }
  178 + srs_info("parse http post response success.");
  179 +
129 return ret; 180 return ret;
130 } 181 }
131 182
  183 +void SrsHttpClient::disconnect()
  184 +{
  185 + connected = false;
  186 +
  187 + if (stfd) {
  188 + int fd = st_netfd_fileno(stfd);
  189 + st_netfd_close(stfd);
  190 + stfd = NULL;
  191 +
  192 + // st does not close it sometimes,
  193 + // close it manually.
  194 + ::close(fd);
  195 + }
  196 +}
  197 +
  198 +int SrsHttpClient::connect(SrsHttpUri* uri)
  199 +{
  200 + int ret = ERROR_SUCCESS;
  201 +
  202 + if (connected) {
  203 + return ret;
  204 + }
  205 +
  206 + disconnect();
  207 +
  208 + std::string ip = srs_dns_resolve(uri->get_host());
  209 + if (ip.empty()) {
  210 + ret = ERROR_SYSTEM_IP_INVALID;
  211 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  212 + return ret;
  213 + }
  214 +
  215 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  216 + if(sock == -1){
  217 + ret = ERROR_SOCKET_CREATE;
  218 + srs_error("create socket error. ret=%d", ret);
  219 + return ret;
  220 + }
  221 +
  222 + stfd = st_netfd_open_socket(sock);
  223 + if(stfd == NULL){
  224 + ret = ERROR_ST_OPEN_SOCKET;
  225 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  226 + return ret;
  227 + }
  228 +
  229 + sockaddr_in addr;
  230 + addr.sin_family = AF_INET;
  231 + addr.sin_port = htons(uri->get_port());
  232 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  233 +
  234 + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
  235 + ret = ERROR_ST_CONNECT;
  236 + srs_error("connect to server error. "
  237 + "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret);
  238 + return ret;
  239 + }
  240 + srs_info("connect to server success. "
  241 + "http url=%s, server=%s, ip=%s, port=%d",
  242 + uri->get_url(), uri->get_host(), ip.c_str(), uri->get_port());
  243 +
  244 + connected = true;
  245 +
  246 + return ret;
  247 +}
  248 +
  249 +int SrsHttpClient::parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response)
  250 +{
  251 + int ret = ERROR_SUCCESS;
  252 +
  253 + int body_received = 0;
  254 + if ((ret = parse_response_header(skt, response, body_received)) != ERROR_SUCCESS) {
  255 + srs_error("parse response header failed. ret=%d", ret);
  256 + return ret;
  257 + }
  258 +
  259 + if ((ret = parse_response_body(uri, skt, response, body_received)) != ERROR_SUCCESS) {
  260 + srs_error("parse response body failed. ret=%d", ret);
  261 + return ret;
  262 + }
  263 +
  264 + srs_info("url %s download, body size=%"PRId64, uri->get_url(), http_header.content_length);
  265 +
  266 + return ret;
  267 +}
  268 +
  269 +int SrsHttpClient::parse_response_header(SrsSocket* skt, std::string* response, int& body_received)
  270 +{
  271 + int ret = ERROR_SUCCESS;
  272 +
  273 + http_parser_settings settings;
  274 +
  275 + memset(&settings, 0, sizeof(settings));
  276 + settings.on_headers_complete = on_headers_complete;
  277 +
  278 + http_parser parser;
  279 + http_parser_init(&parser, HTTP_RESPONSE);
  280 + // callback object ptr.
  281 + parser.data = (void*)this;
  282 +
  283 + // reset response header.
  284 + memset(&http_header, 0, sizeof(http_header));
  285 +
  286 + // parser header.
  287 + char buf[SRS_HTTP_HEADER_BUFFER];
  288 + for (;;) {
  289 + ssize_t nread;
  290 + if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) {
  291 + srs_error("read body from server failed. ret=%d", ret);
  292 + return ret;
  293 + }
  294 +
  295 + ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread);
  296 + srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed);
  297 +
  298 + // check header size.
  299 + if (http_header.nread != 0) {
  300 + body_received = nread - nparsed;
  301 +
  302 + srs_info("http header parsed, size=%d, content-length=%"PRId64", body-received=%d",
  303 + http_header.nread, http_header.content_length, body_received);
  304 +
  305 + if(response != NULL && body_received > 0){
  306 + response->append(buf + nparsed, body_received);
  307 + }
  308 +
  309 + return ret;
  310 + }
  311 +
  312 + if (nparsed != nread) {
  313 + ret = ERROR_HTTP_PARSE_HEADER;
  314 + srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret);
  315 + return ret;
  316 + }
  317 + }
  318 +
  319 + return ret;
  320 +}
  321 +
  322 +int SrsHttpClient::parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received)
  323 +{
  324 + int ret = ERROR_SUCCESS;
  325 +
  326 + srs_assert(uri != NULL);
  327 +
  328 + uint64_t body_left = http_header.content_length - body_received;
  329 +
  330 + if (body_left <= 0) {
  331 + return ret;
  332 + }
  333 +
  334 + if (response != NULL) {
  335 + char buf[SRS_HTTP_BODY_BUFFER];
  336 +
  337 + return parse_response_body_data(
  338 + uri, skt, response, (size_t)body_left,
  339 + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER
  340 + );
  341 + } else {
  342 + // if ignore response, use shared fast memory.
  343 + static char buf[SRS_HTTP_BODY_BUFFER];
  344 +
  345 + return parse_response_body_data(
  346 + uri, skt, response, (size_t)body_left,
  347 + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER
  348 + );
  349 + }
  350 +
  351 + return ret;
  352 +}
  353 +
  354 +int SrsHttpClient::parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size)
  355 +{
  356 + int ret = ERROR_SUCCESS;
  357 +
  358 + srs_assert(uri != NULL);
  359 +
  360 + while (body_left > 0) {
  361 + ssize_t nread;
  362 + int size_to_read = srs_min(size, body_left);
  363 + if ((ret = skt->read(buf, size_to_read, &nread)) != ERROR_SUCCESS) {
  364 + srs_error("read header from server failed. ret=%d", ret);
  365 + return ret;
  366 + }
  367 +
  368 + if (response != NULL && nread > 0) {
  369 + response->append((char*)buf, nread);
  370 + }
  371 +
  372 + body_left -= nread;
  373 + srs_info("read url(%s) content partial %"PRId64"/%"PRId64"",
  374 + uri->get_url(), http_header.content_length - body_left, http_header.content_length);
  375 + }
  376 +
  377 + return ret;
  378 +}
  379 +
  380 +int SrsHttpClient::on_headers_complete(http_parser* parser)
  381 +{
  382 + SrsHttpClient* obj = (SrsHttpClient*)parser->data;
  383 + obj->comple_header(parser);
  384 +
  385 + // see http_parser.c:1570, return 1 to skip body.
  386 + return 1;
  387 +}
  388 +
  389 +void SrsHttpClient::comple_header(http_parser* parser)
  390 +{
  391 + // save the parser status when header parse completed.
  392 + memcpy(&http_header, parser, sizeof(http_header));
  393 +}
  394 +
132 SrsHttpHooks::SrsHttpHooks() 395 SrsHttpHooks::SrsHttpHooks()
133 { 396 {
134 } 397 }
@@ -148,33 +411,33 @@ int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req) @@ -148,33 +411,33 @@ int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req)
148 return ret; 411 return ret;
149 } 412 }
150 413
151 - std::string res;  
152 - std::string data;  
153 /** 414 /**
154 { 415 {
155 "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", 416 "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
156 "pageUrl": "http://www.test.com/live.html" 417 "pageUrl": "http://www.test.com/live.html"
157 } 418 }
158 */ 419 */
159 - data += "{";  
160 - // ip  
161 - data += "\"ip\":";  
162 - data += "\"" + ip + "\"";  
163 - data += ",";  
164 - // vhost  
165 - data += "\"vhost\":";  
166 - data += "\"" + req->vhost + "\"";  
167 - data += ",";  
168 - data += ",";  
169 - // app  
170 - data += "\"vhost\":";  
171 - data += "\"" + req->app + "\"";  
172 - data += ",";  
173 - // pageUrl  
174 - data += "\"vhost\":";  
175 - data += "\"" + req->pageUrl + "\"";  
176 - //data += ",";  
177 - data += "}"; 420 + std::stringstream ss;
  421 + ss << "{"
  422 + // ip
  423 + << '"' << "ip" << '"' << ':'
  424 + << '"' << ip << '"'
  425 + << ','
  426 + // vhost
  427 + << '"' << "vhost" << '"' << ':'
  428 + << '"' << req->vhost << '"'
  429 + << ','
  430 + // app
  431 + << '"' << "app" << '"' << ':'
  432 + << '"' << req->app << '"'
  433 + << ','
  434 + // pageUrl
  435 + << '"' << "pageUrl" << '"' << ':'
  436 + << '"' << req->pageUrl << '"'
  437 + //<< ','
  438 + << "}";
  439 + std::string data = ss.str();
  440 + std::string res;
178 441
179 SrsHttpClient http; 442 SrsHttpClient http;
180 if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { 443 if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
@@ -29,14 +29,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,14 +29,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
  32 +#ifdef SRS_HTTP
  33 +
32 class SrsRequest; 34 class SrsRequest;
  35 +class SrsSocket;
33 36
34 #include <string> 37 #include <string>
35 38
36 -#ifdef SRS_HTTP  
37 - 39 +#include <st.h>
38 #include <http_parser.h> 40 #include <http_parser.h>
39 41
  42 +#define SRS_HTTP_HEADER_BUFFER 1024
  43 +#define SRS_HTTP_BODY_BUFFER 32 * 1024
  44 +
40 /** 45 /**
41 * used to resolve the http uri. 46 * used to resolve the http uri.
42 */ 47 */
@@ -61,6 +66,7 @@ public: @@ -61,6 +66,7 @@ public:
61 virtual const char* get_schema(); 66 virtual const char* get_schema();
62 virtual const char* get_host(); 67 virtual const char* get_host();
63 virtual int get_port(); 68 virtual int get_port();
  69 + virtual const char* get_path();
64 private: 70 private:
65 /** 71 /**
66 * get the parsed url field. 72 * get the parsed url field.
@@ -74,6 +80,11 @@ private: @@ -74,6 +80,11 @@ private:
74 */ 80 */
75 class SrsHttpClient 81 class SrsHttpClient
76 { 82 {
  83 +private:
  84 + bool connected;
  85 + st_netfd_t stfd;
  86 +private:
  87 + http_parser http_header;
77 public: 88 public:
78 SrsHttpClient(); 89 SrsHttpClient();
79 virtual ~SrsHttpClient(); 90 virtual ~SrsHttpClient();
@@ -84,6 +95,17 @@ public: @@ -84,6 +95,17 @@ public:
84 * @param res the response data from server. 95 * @param res the response data from server.
85 */ 96 */
86 virtual int post(SrsHttpUri* uri, std::string req, std::string& res); 97 virtual int post(SrsHttpUri* uri, std::string req, std::string& res);
  98 +private:
  99 + virtual void disconnect();
  100 + virtual int connect(SrsHttpUri* uri);
  101 +private:
  102 + virtual int parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response);
  103 + virtual int parse_response_header(SrsSocket* skt, std::string* response, int& body_received);
  104 + virtual int parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received);
  105 + virtual int parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size);
  106 +private:
  107 + static int on_headers_complete(http_parser* parser);
  108 + virtual void comple_header(http_parser* parser);
87 }; 109 };
88 110
89 /** 111 /**