winlin

refine the http request reader.

... ... @@ -74,6 +74,13 @@ function Ubuntu_prepare()
echo "install patch success"
fi
unzip --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "install unzip"
require_sudoer "sudo apt-get install -y --force-yes unzip"
sudo apt-get install -y --force-yes unzip; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi
echo "install unzip success"
fi
if [ $SRS_FFMPEG_TOOL = YES ]; then
autoconf --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "install autoconf"
... ... @@ -170,6 +177,13 @@ function Centos_prepare()
echo "install patch success"
fi
unzip --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "install unzip"
require_sudoer "sudo yum install -y unzip"
sudo yum install -y unzip; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi
echo "install unzip success"
fi
if [ $SRS_FFMPEG_TOOL = YES ]; then
automake --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "install automake"
... ...
... ... @@ -43,6 +43,7 @@ using namespace std;
#define SRS_DEFAULT_HTTP_PORT 80
#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_BODY_BUFFER 1024
// for http parser macros
#define SRS_CONSTS_HTTP_OPTIONS HTTP_OPTIONS
... ... @@ -227,6 +228,22 @@ ISrsHttpResponseWriter::~ISrsHttpResponseWriter()
{
}
ISrsHttpResponseReader::ISrsHttpResponseReader()
{
}
ISrsHttpResponseReader::~ISrsHttpResponseReader()
{
}
ISrsHttpResponseAppender::ISrsHttpResponseAppender()
{
}
ISrsHttpResponseAppender::~ISrsHttpResponseAppender()
{
}
ISrsHttpHandler::ISrsHttpHandler()
{
entry = NULL;
... ... @@ -833,11 +850,63 @@ int SrsHttpResponseWriter::send_header(char* data, int size)
return skt->write((void*)buf.c_str(), buf.length(), NULL);
}
SrsHttpMessage::SrsHttpMessage()
SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io)
{
skt = io;
owner = msg;
cache = new SrsSimpleBuffer();
}
SrsHttpResponseReader::~SrsHttpResponseReader()
{
srs_freep(cache);
}
int SrsHttpResponseReader::read(int max, std::string& data)
{
int ret = ERROR_SUCCESS;
// read from cache first.
if (cache->length() > 0) {
int nb_bytes = srs_min(cache->length(), max);
data.append(cache->bytes(), nb_bytes);
cache->erase(nb_bytes);
return ret;
}
// read some from underlayer.
int left = srs_max(SRS_HTTP_BODY_BUFFER, max);
// read from io.
char* buf = new char[left];
SrsAutoFree(char, buf);
ssize_t nread = 0;
if ((ret = skt->read(buf, left, &nread)) != ERROR_SUCCESS) {
return ret;
}
if (nread) {
data.append(buf, nread);
}
return ret;
}
int SrsHttpResponseReader::append(char* data, int size)
{
int ret = ERROR_SUCCESS;
cache->append(data, size);
return ret;
}
SrsHttpMessage::SrsHttpMessage(SrsStSocket* io)
{
_body = new SrsSimpleBuffer();
_state = SrsHttpParseStateInit;
_uri = new SrsHttpUri();
_body = new SrsHttpResponseReader(this, io);
_http_ts_send_buffer = new char[__SRS_HTTP_TS_SEND_BUFFER_SIZE];
}
... ... @@ -898,18 +967,6 @@ char* SrsHttpMessage::http_ts_send_buffer()
return _http_ts_send_buffer;
}
void SrsHttpMessage::reset()
{
_state = SrsHttpParseStateInit;
_body->erase(_body->length());
_url = "";
}
bool SrsHttpMessage::is_complete()
{
return _state == SrsHttpParseStateComplete;
}
u_int8_t SrsHttpMessage::method()
{
return (u_int8_t)_header.method;
... ... @@ -993,25 +1050,49 @@ string SrsHttpMessage::path()
return _uri->get_path();
}
string SrsHttpMessage::body()
void SrsHttpMessage::set(string url, http_parser* header, string body, vector<SrsHttpHeaderField>& headers)
{
std::string b;
if (_body && _body->length() > 0) {
b.append(_body->bytes(), _body->length());
}
return b;
}
_url = url;
_header = *header;
_headers = headers;
char* SrsHttpMessage::body_raw()
{
return _body? _body->bytes() : NULL;
if (!body.empty()) {
_body->append((char*)body.data(), (int)body.length());
}
}
int64_t SrsHttpMessage::body_size()
int SrsHttpMessage::body_read_all(string body)
{
return (int64_t)_body->length();
int ret = ERROR_SUCCESS;
// when content length specified, read specified length.
if ((int64_t)_header.content_length > 0) {
int left = (int)(int64_t)_header.content_length;
while (left > 0) {
int nb_read = (int)body.length();
if ((ret = _body->read(left, body)) != ERROR_SUCCESS) {
return ret;
}
left -= (int)body.length() - nb_read;
}
return ret;
}
// chunked encoding, read util got size=0 chunk.
for (;;) {
int nb_read = (int)body.length();
if ((ret = _body->read(0, body)) != ERROR_SUCCESS) {
return ret;
}
// eof.
if (nb_read == (int)body.length()) {
break;
}
}
return ret;
}
int64_t SrsHttpMessage::content_length()
... ... @@ -1019,26 +1100,6 @@ int64_t SrsHttpMessage::content_length()
return _header.content_length;
}
void SrsHttpMessage::set_url(string url)
{
_url = url;
}
void SrsHttpMessage::set_state(SrsHttpParseState state)
{
_state = state;
}
void SrsHttpMessage::set_header(http_parser* header)
{
memcpy(&_header, header, sizeof(http_parser));
}
void SrsHttpMessage::append_body(const char* body, int length)
{
_body->append(body, length);
}
string SrsHttpMessage::query_get(string key)
{
std::string v;
... ... @@ -1052,33 +1113,28 @@ string SrsHttpMessage::query_get(string key)
int SrsHttpMessage::request_header_count()
{
return (int)headers.size();
return (int)_headers.size();
}
string SrsHttpMessage::request_header_key_at(int index)
{
srs_assert(index < request_header_count());
SrsHttpHeaderField item = headers[index];
SrsHttpHeaderField item = _headers[index];
return item.first;
}
string SrsHttpMessage::request_header_value_at(int index)
{
srs_assert(index < request_header_count());
SrsHttpHeaderField item = headers[index];
SrsHttpHeaderField item = _headers[index];
return item.second;
}
void SrsHttpMessage::set_request_header(string key, string value)
{
headers.push_back(std::make_pair(key, value));
}
string SrsHttpMessage::get_request_header(string name)
{
std::vector<SrsHttpHeaderField>::iterator it;
for (it = headers.begin(); it != headers.end(); ++it) {
for (it = _headers.begin(); it != _headers.end(); ++it) {
SrsHttpHeaderField& elem = *it;
std::string key = elem.first;
std::string value = elem.second;
... ... @@ -1092,12 +1148,10 @@ string SrsHttpMessage::get_request_header(string name)
SrsHttpParser::SrsHttpParser()
{
msg = NULL;
}
SrsHttpParser::~SrsHttpParser()
{
srs_freep(msg);
}
int SrsHttpParser::initialize(enum http_parser_type type)
... ... @@ -1125,26 +1179,31 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
*ppmsg = NULL;
int ret = ERROR_SUCCESS;
// the msg must be always NULL
srs_assert(msg == NULL);
msg = new SrsHttpMessage();
// reset request data.
filed_name = "";
// reset response header.
msg->reset();
field_value = "";
expect_filed_name = true;
state = SrsHttpParseStateInit;
header = http_parser();
url = "";
headers.clear();
body = "";
// do parse
if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("parse http msg failed. ret=%d", ret);
}
srs_freep(msg);
return ret;
}
// create msg
SrsHttpMessage* msg = new SrsHttpMessage(skt);
// dumps the header and body read.
msg->set(url, &header, body, headers);
// initalize http msg, parse url.
if ((ret = msg->initialize()) != ERROR_SUCCESS) {
srs_error("initialize http msg failed. ret=%d", ret);
... ... @@ -1154,7 +1213,6 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
// parse ok, return the msg.
*ppmsg = msg;
msg = NULL;
return ret;
}
... ... @@ -1163,13 +1221,14 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
{
int ret = ERROR_SUCCESS;
// the msg should never be NULL
srs_assert(msg != NULL);
ssize_t nread = 0;
ssize_t nparsed = 0;
char* buf = new char[SRS_HTTP_HEADER_BUFFER];
SrsAutoFree(char, buf);
// parser header.
char buf[SRS_HTTP_HEADER_BUFFER];
for (;;) {
ssize_t nread;
if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
... ... @@ -1177,20 +1236,28 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
return ret;
}
ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread);
nparsed = http_parser_execute(&parser, &settings, buf, nread);
srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed);
// check header size.
if (msg->is_complete()) {
return ret;
// ok atleast header completed,
// never wait for body completed, for maybe chunked.
if (state == SrsHttpParseStateHeaderComplete || state == SrsHttpParseStateMessageComplete) {
break;
}
// when not complete, the parser should consume all bytes.
if (nparsed != nread) {
ret = ERROR_HTTP_PARSE_HEADER;
srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret);
return ret;
}
}
// when parse completed, cache the left body.
if (nread && nparsed < nread) {
body.append(buf + nparsed, nread - nparsed);
srs_info("cache %d bytes read body.", nread - nparsed);
}
return ret;
}
... ... @@ -1198,7 +1265,9 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
int SrsHttpParser::on_message_begin(http_parser* parser)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
obj->msg->set_state(SrsHttpParseStateStart);
srs_assert(obj);
obj->state = SrsHttpParseStateStart;
srs_info("***MESSAGE BEGIN***");
... ... @@ -1208,7 +1277,11 @@ int SrsHttpParser::on_message_begin(http_parser* parser)
int SrsHttpParser::on_headers_complete(http_parser* parser)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
obj->msg->set_header(parser);
srs_assert(obj);
obj->header = *parser;
// save the parser when header parse completed.
obj->state = SrsHttpParseStateHeaderComplete;
srs_info("***HEADERS COMPLETE***");
... ... @@ -1219,8 +1292,10 @@ int SrsHttpParser::on_headers_complete(http_parser* parser)
int SrsHttpParser::on_message_complete(http_parser* parser)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
// save the parser when header parse completed.
obj->msg->set_state(SrsHttpParseStateComplete);
srs_assert(obj);
// save the parser when body parse completed.
obj->state = SrsHttpParseStateMessageComplete;
srs_info("***MESSAGE COMPLETE***\n");
... ... @@ -1230,13 +1305,10 @@ int SrsHttpParser::on_message_complete(http_parser* parser)
int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
if (length > 0) {
std::string url;
url.append(at, (int)length);
obj->msg->set_url(url);
obj->url.append(at, (int)length);
}
srs_info("Method: %d, Url: %.*s", parser->method, (int)length, at);
... ... @@ -1247,44 +1319,47 @@ int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length)
int SrsHttpParser::on_header_field(http_parser* parser, const char* at, size_t length)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
// field value=>name, reap the field.
if (!obj->expect_filed_name) {
obj->headers.push_back(std::make_pair(obj->filed_name, obj->field_value));
// reset the field name when value parsed.
obj->filed_name = "";
obj->field_value = "";
}
obj->expect_filed_name = true;
if (length > 0) {
srs_assert(obj);
obj->filed_name.append(at, (int)length);
}
srs_info("Header field: %.*s", (int)length, at);
srs_trace("Header field: %.*s", (int)length, at);
return 0;
}
int SrsHttpParser::on_header_value(http_parser* parser, const char* at, size_t length)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
if (length > 0) {
srs_assert(obj);
srs_assert(obj->msg);
std::string field_value;
field_value.append(at, (int)length);
obj->msg->set_request_header(obj->filed_name, field_value);
obj->filed_name = "";
obj->field_value.append(at, (int)length);
}
obj->expect_filed_name = false;
srs_info("Header value: %.*s", (int)length, at);
srs_trace("Header value: %.*s", (int)length, at);
return 0;
}
int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
if (length > 0) {
srs_assert(obj);
srs_assert(obj->msg);
obj->msg->append_body(at, (int)length);
obj->body.append(at, (int)length);
}
srs_info("Body: %.*s", (int)length, at);
... ...
... ... @@ -76,7 +76,8 @@ extern int srs_go_http_response_json(ISrsHttpResponseWriter* w, std::string data
enum SrsHttpParseState {
SrsHttpParseStateInit = 0,
SrsHttpParseStateStart,
SrsHttpParseStateComplete
SrsHttpParseStateHeaderComplete,
SrsHttpParseStateMessageComplete
};
// A Header represents the key-value pairs in an HTTP header.
... ... @@ -154,6 +155,8 @@ public:
public:
// when chunked mode,
// final the request to complete the chunked encoding.
// for no-chunked mode,
// final to send request, for example, content-length is 0.
virtual int final_request() = 0;
// Header returns the header map that will be sent by WriteHeader.
... ... @@ -178,6 +181,39 @@ public:
virtual void write_header(int code) = 0;
};
/**
* the reader interface for http response.
*/
class ISrsHttpResponseReader
{
public:
ISrsHttpResponseReader();
virtual ~ISrsHttpResponseReader();
public:
/**
* read from the response body.
* @param max the max size to read. 0 to ignore.
*/
virtual int read(int max, std::string& data) = 0;
};
/**
* for connection response only.
*/
class ISrsHttpResponseAppender
{
public:
ISrsHttpResponseAppender();
virtual ~ISrsHttpResponseAppender();
public:
/**
* append specified size of bytes data to reader.
* when we read http message from socket, we maybe read header+body,
* so the reader should provides stream cache feature.
*/
virtual int append(char* data, int size) = 0;
};
// Objects implementing the Handler interface can be
// registered to serve a particular path or subtree
// in the HTTP server.
... ... @@ -366,6 +402,27 @@ public:
virtual int send_header(char* data, int size);
};
/**
* response reader use st socket.
*/
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
, virtual public ISrsHttpResponseAppender
{
private:
SrsStSocket* skt;
SrsHttpMessage* owner;
SrsSimpleBuffer* cache;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io);
virtual ~SrsHttpResponseReader();
public:
virtual int read(int max, std::string& data);
virtual int append(char* data, int size);
};
// for http header.
typedef std::pair<std::string, std::string> SrsHttpHeaderField;
// A Request represents an HTTP request received by a server
// or to be sent by a client.
//
... ... @@ -387,15 +444,10 @@ private:
*/
http_parser _header;
/**
* body object, in bytes.
* body object, reader object.
* @remark, user can get body in string by get_body().
*/
SrsSimpleBuffer* _body;
/**
* parser state
* @remark, user can use is_complete() to determine the state.
*/
SrsHttpParseState _state;
SrsHttpResponseReader* _body;
/**
* uri parser
*/
... ... @@ -405,20 +457,17 @@ private:
*/
char* _http_ts_send_buffer;
// http headers
typedef std::pair<std::string, std::string> SrsHttpHeaderField;
std::vector<SrsHttpHeaderField> headers;
std::vector<SrsHttpHeaderField> _headers;
// the query map
std::map<std::string, std::string> _query;
public:
SrsHttpMessage();
SrsHttpMessage(SrsStSocket* io);
virtual ~SrsHttpMessage();
public:
virtual int initialize();
public:
virtual char* http_ts_send_buffer();
virtual void reset();
public:
virtual bool is_complete();
virtual u_int8_t method();
virtual u_int16_t status_code();
virtual std::string method_str();
... ... @@ -432,14 +481,10 @@ public:
virtual std::string host();
virtual std::string path();
public:
virtual std::string body();
virtual char* body_raw();
virtual int64_t body_size();
virtual void set(std::string url, http_parser* header, std::string body, std::vector<SrsHttpHeaderField>& headers);
public:
virtual int body_read_all(std::string body);
virtual int64_t content_length();
virtual void set_url(std::string url);
virtual void set_state(SrsHttpParseState state);
virtual void set_header(http_parser* header);
virtual void append_body(const char* body, int length);
/**
* get the param in query string,
* for instance, query is "start=100&end=200",
... ... @@ -449,7 +494,6 @@ public:
virtual int request_header_count();
virtual std::string request_header_key_at(int index);
virtual std::string request_header_value_at(int index);
virtual void set_request_header(std::string key, std::string value);
virtual std::string get_request_header(std::string name);
};
... ... @@ -462,8 +506,16 @@ class SrsHttpParser
private:
http_parser_settings settings;
http_parser parser;
SrsHttpMessage* msg;
private:
// http parse data, reset before parse message.
bool expect_filed_name;
std::string filed_name;
std::string field_value;
SrsHttpParseState state;
http_parser header;
std::string url;
std::vector<SrsHttpHeaderField> headers;
std::string body;
public:
SrsHttpParser();
virtual ~SrsHttpParser();
... ...
... ... @@ -527,13 +527,20 @@ int SrsHttpApi::do_cycle()
return ret;
}
// if SUCCESS, always NOT-NULL and completed message.
// if SUCCESS, always NOT-NULL.
srs_assert(req);
srs_assert(req->is_complete());
// always free it in this scope.
SrsAutoFree(SrsHttpMessage, req);
// TODO: FIXME: use the post body.
std::string res;
// get response body.
if ((ret = req->body_read_all(res)) != ERROR_SUCCESS) {
return ret;
}
// ok, handle http request.
SrsHttpResponseWriter writer(&skt);
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
... ...
... ... @@ -35,6 +35,7 @@ using namespace std;
#include <srs_app_st_socket.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
// when error, http client sleep for a while and retry.
#define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL)
... ... @@ -103,18 +104,18 @@ int SrsHttpClient::post(SrsHttpUri* uri, string req, int& status_code, string& r
}
srs_assert(msg);
srs_assert(msg->is_complete());
// always free it in this scope.
SrsAutoFree(SrsHttpMessage, msg);
status_code = (int)msg->status_code();
// get response body.
if (msg->body_size() > 0) {
res = msg->body();
if ((ret = msg->body_read_all(res)) != ERROR_SUCCESS) {
return ret;
}
srs_info("parse http post response success.");
srs_freep(msg);
return ret;
}
... ...
... ... @@ -1194,13 +1194,20 @@ int SrsHttpConn::do_cycle()
return ret;
}
// if SUCCESS, always NOT-NULL and completed message.
// if SUCCESS, always NOT-NULL.
srs_assert(req);
srs_assert(req->is_complete());
// always free it in this scope.
SrsAutoFree(SrsHttpMessage, req);
// TODO: FIXME: use the post body.
std::string res;
// get response body.
if ((ret = req->body_read_all(res)) != ERROR_SUCCESS) {
return ret;
}
// ok, handle http request.
SrsHttpResponseWriter writer(&skt);
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
... ...