winlin

refine http request parse. 2.0.132.

... ... @@ -550,7 +550,8 @@ Supported operating systems and hardware:
## History
### SRS 2.0 history
.
* v2.0, 2015-03-06, refine http request parse. 2.0.132.
* v2.0, 2015-03-01, for [#179](https://github.com/winlinvip/simple-rtmp-server/issues/179), revert dvr http api. 2.0.128.
* v2.0, 2015-02-24, for [#304](https://github.com/winlinvip/simple-rtmp-server/issues/304), fix hls bug, write pts/dts error. 2.0.124
* v2.0, 2015-02-19, refine dvr, append file when dvr file exists. 2.0.122.
... ...
... ... @@ -39,6 +39,7 @@ using namespace std;
#include <srs_rtmp_buffer.hpp>
#include <srs_kernel_file.hpp>
#include <srs_core_autofree.hpp>
#include <srs_rtmp_buffer.hpp>
#define SRS_DEFAULT_HTTP_PORT 80
... ... @@ -846,57 +847,152 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i
{
skt = io;
owner = msg;
cache = new SrsSimpleBuffer();
is_eof = false;
nb_read = 0;
buffer = NULL;
}
SrsHttpResponseReader::~SrsHttpResponseReader()
{
srs_freep(cache);
}
bool SrsHttpResponseReader::empty()
int SrsHttpResponseReader::initialize(SrsFastBuffer* body)
{
int ret = ERROR_SUCCESS;
buffer = body;
return ret;
}
bool SrsHttpResponseReader::eof()
{
return cache->length() == 0;
return is_eof;
}
int SrsHttpResponseReader::append(char* data, int size)
int SrsHttpResponseReader::read(std::string& data)
{
int ret = ERROR_SUCCESS;
cache->append(data, size);
if (is_eof) {
ret = ERROR_HTTP_RESPONSE_EOF;
srs_error("http: response EOF. ret=%d", ret);
return ret;
}
return ret;
// chunked encoding.
if (owner->is_chunked()) {
return read_chunked(data);
}
// read by specified content-length
int max = (int)owner->content_length() - nb_read;
if (max <= 0) {
is_eof = true;
return ret;
}
return read_specified(max, data);
}
int SrsHttpResponseReader::read(int max, std::string& data)
int SrsHttpResponseReader::read_chunked(std::string& data)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: decode the chunked bytes.
// read from cache first.
if (cache->length() > 0) {
int nb_bytes = srs_min(cache->length(), max);
data.append(cache->bytes(), nb_bytes);
cache->erase(nb_bytes);
// parse the chunk length first.
char* at = NULL;
int length = 0;
while (!at) {
// find the CRLF of chunk header end.
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p < end - 1; p++) {
if (p[0] == __SRS_HTTP_CR && p[1] == __SRS_HTTP_LF) {
// invalid chunk, ignore.
if (p == start) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header start with CRLF. ret=%d", ret);
return ret;
}
length = p - start + 2;
at = buffer->read_slice(length);
break;
}
}
return ret;
// got at, ok.
if (at) {
break;
}
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
}
srs_assert(length >= 3);
// read some from underlayer.
int left = srs_max(SRS_HTTP_BODY_BUFFER, max);
// it's ok to set the pos and pos+1 to NULL.
at[length - 1] = NULL;
at[length - 2] = NULL;
// read from io.
char* buf = new char[left];
SrsAutoFree(char, buf);
// size is the bytes size, excludes the chunk header and end CRLF.
int ilength = ::strtol(at, NULL, 16);
if (ilength < 0) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header negative, length=%d. ret=%d", ilength, ret);
return ret;
}
ssize_t nread = 0;
if ((ret = skt->read(buf, left, &nread)) != ERROR_SUCCESS) {
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
srs_trace("http: read %d chunk", ilength);
if (nread) {
data.append(buf, nread);
// read payload when length specifies some payload.
if (ilength <= 0) {
is_eof = true;
} else {
srs_assert(ilength);
data.append(buffer->read_slice(ilength), ilength);
nb_read += ilength;
}
// the CRLF of chunk payload end.
buffer->read_slice(2);
return ret;
}
int SrsHttpResponseReader::read_specified(int max, std::string& data)
{
int ret = ERROR_SUCCESS;
if (buffer->size() <= 0) {
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
}
int nb_bytes = srs_min(max, buffer->size());
srs_assert(nb_bytes);
data.append(buffer->read_slice(nb_bytes), nb_bytes);
nb_read += nb_bytes;
// when read completed, eof.
if (nb_read >= (int)owner->content_length()) {
is_eof = true;
}
return ret;
... ... @@ -917,7 +1013,7 @@ SrsHttpMessage::~SrsHttpMessage()
srs_freep(_http_ts_send_buffer);
}
int SrsHttpMessage::initialize(string url, http_parser* header, string body, vector<SrsHttpHeaderField>& headers)
int SrsHttpMessage::update(string url, http_parser* header, SrsFastBuffer* body, vector<SrsHttpHeaderField>& headers)
{
int ret = ERROR_SUCCESS;
... ... @@ -928,10 +1024,10 @@ int SrsHttpMessage::initialize(string url, http_parser* header, string body, vec
// whether chunked.
std::string transfer_encoding = get_request_header("Transfer-Encoding");
chunked = (transfer_encoding == "chunked");
// TODO: FIXME: remove it, use fast buffer instead.
if (!body.empty()) {
_body->append((char*)body.data(), (int)body.length());
// set the buffer.
if ((ret = _body->initialize(body)) != ERROR_SUCCESS) {
return ret;
}
// parse uri from url.
... ... @@ -946,6 +1042,18 @@ int SrsHttpMessage::initialize(string url, http_parser* header, string body, vec
// parse uri to schema/server:port/path?query
std::string uri = "http://" + host + _url;
return update(uri);
}
int SrsHttpMessage::update(string uri)
{
int ret = ERROR_SUCCESS;
if (uri.empty()) {
return ret;
}
if ((ret = _uri->initialize(uri)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -1069,47 +1177,29 @@ string SrsHttpMessage::path()
return _uri->get_path();
}
int SrsHttpMessage::body_read_all(string body)
int SrsHttpMessage::body_read_all(string& body)
{
int ret = ERROR_SUCCESS;
int64_t content_length = (int64_t)_header.content_length;
// chunked, always read with
if (chunked) {
return _body->read(body);
}
int content_length = (int)(int64_t)_header.content_length;
// ignore if not set, should be zero length body.
if (content_length < 0) {
if (!_body->empty()) {
srs_warn("unspecified content-length with body cached.");
} else {
srs_info("unspecified content-length with body empty.");
}
if (content_length <= 0) {
srs_info("unspecified content-length with body empty.");
return ret;
}
// when content length specified, read specified length.
if (content_length > 0) {
int left = (int)content_length;
while (left > 0) {
int nb_read = (int)body.length();
if ((ret = _body->read(left, body)) != ERROR_SUCCESS) {
return ret;
}
left -= (int)body.length() - nb_read;
}
return ret;
}
// chunked encoding, read util got size=0 chunk.
for (;;) {
int nb_read = (int)body.length();
if ((ret = _body->read(0, body)) != ERROR_SUCCESS) {
int expect = content_length + (int)body.length();
while ((int)body.length() < expect) {
if ((ret = _body->read(body)) != ERROR_SUCCESS) {
return ret;
}
// eof.
if (nb_read == (int)body.length()) {
break;
}
}
return ret;
... ... @@ -1173,10 +1263,12 @@ string SrsHttpMessage::get_request_header(string name)
SrsHttpParser::SrsHttpParser()
{
buffer = new SrsFastBuffer();
}
SrsHttpParser::~SrsHttpParser()
{
srs_freep(buffer);
}
int SrsHttpParser::initialize(enum http_parser_type type)
... ... @@ -1213,7 +1305,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
header = http_parser();
url = "";
headers.clear();
body = "";
body_parsed = 0;
// do parse
if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) {
... ... @@ -1227,7 +1319,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
SrsHttpMessage* msg = new SrsHttpMessage(skt);
// initalize http msg, parse url.
if ((ret = msg->initialize(url, &header, body, headers)) != ERROR_SUCCESS) {
if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) {
srs_error("initialize http msg failed. ret=%d", ret);
srs_freep(msg);
return ret;
... ... @@ -1243,48 +1335,35 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
{
int ret = ERROR_SUCCESS;
ssize_t nread = 0;
ssize_t nparsed = 0;
char* buf = new char[SRS_HTTP_HEADER_BUFFER];
SrsAutoFree(char, buf);
// parser header.
for (;;) {
if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
while (true) {
if (buffer->size() <= 0) {
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
return ret;
}
nparsed = http_parser_execute(&parser, &settings, buf, nread);
srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed);
int nb_header = srs_min(SRS_HTTP_HEADER_BUFFER, buffer->size());
ssize_t nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), nb_header);
srs_info("buffer=%d, nparsed=%d, body=%d", buffer->size(), (int)nparsed, body_parsed);
if (nparsed - body_parsed > 0) {
buffer->read_slice(nparsed - body_parsed);
}
// ok atleast header completed,
// never wait for body completed, for maybe chunked.
if (state == SrsHttpParseStateHeaderComplete || state == SrsHttpParseStateMessageComplete) {
break;
}
// when not complete, the parser should consume all bytes.
if (nparsed != nread) {
ret = ERROR_HTTP_PARSE_HEADER;
srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret);
return ret;
}
}
// parse last header.
if (!filed_name.empty() && !field_value.empty()) {
headers.push_back(std::make_pair(filed_name, field_value));
}
// when parse completed, cache the left body.
if (nread && nparsed < nread) {
body.append(buf + nparsed, nread - nparsed);
srs_info("cache %d bytes read body.", nread - nparsed);
}
return ret;
}
... ... @@ -1385,9 +1464,7 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length)
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
if (length > 0) {
obj->body.append(at, (int)length);
}
obj->body_parsed += length;
srs_info("Body: %.*s", (int)length, at);
... ...
... ... @@ -192,10 +192,14 @@ public:
virtual ~ISrsHttpResponseReader();
public:
/**
* whether response read EOF.
*/
virtual bool eof() = 0;
/**
* read from the response body.
* @param max the max size to read. 0 to ignore.
* @remark when eof(), return error.
*/
virtual int read(int max, std::string& data) = 0;
virtual int read(std::string& data) = 0;
};
// Objects implementing the Handler interface can be
... ... @@ -394,24 +398,24 @@ class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
private:
SrsStSocket* skt;
SrsHttpMessage* owner;
SrsSimpleBuffer* cache;
SrsFastBuffer* buffer;
bool is_eof;
int64_t nb_read;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io);
virtual ~SrsHttpResponseReader();
public:
/**
* whether the cache is empty.
*/
virtual bool empty();
/**
* append specified size of bytes data to reader.
* when we read http message from socket, we maybe read header+body,
* so the reader should provides stream cache feature.
* initialize the response reader with buffer.
*/
virtual int append(char* data, int size);
virtual int initialize(SrsFastBuffer* buffer);
// interface ISrsHttpResponseReader
public:
virtual int read(int max, std::string& data);
virtual bool eof();
virtual int read(std::string& data);
private:
virtual int read_chunked(std::string& data);
virtual int read_specified(int max, std::string& data);
};
// for http header.
... ... @@ -453,6 +457,7 @@ private:
/**
* use a buffer to read and send ts file.
*/
// TODO: FIXME: remove it.
char* _http_ts_send_buffer;
// http headers
std::vector<SrsHttpHeaderField> _headers;
... ... @@ -463,11 +468,16 @@ public:
virtual ~SrsHttpMessage();
public:
/**
* set the original messages, then initialize the message.
* set the original messages, then update the message.
*/
virtual int initialize(std::string url, http_parser* header,
std::string body, std::vector<SrsHttpHeaderField>& headers
virtual int update(std::string url, http_parser* header,
SrsFastBuffer* body, std::vector<SrsHttpHeaderField>& headers
);
/**
* update the request with uri.
* @remark user can invoke this multiple times.
*/
virtual int update(std::string uri);
public:
virtual char* http_ts_send_buffer();
public:
... ... @@ -485,7 +495,7 @@ public:
virtual std::string host();
virtual std::string path();
public:
virtual int body_read_all(std::string body);
virtual int body_read_all(std::string& body);
virtual ISrsHttpResponseReader* body_reader();
virtual int64_t content_length();
/**
... ... @@ -510,7 +520,7 @@ private:
http_parser_settings settings;
http_parser parser;
// the global parse buffer.
SrsFastBuffer* fbuffer;
SrsFastBuffer* buffer;
private:
// http parse data, reset before parse message.
bool expect_filed_name;
... ... @@ -520,7 +530,7 @@ private:
http_parser header;
std::string url;
std::vector<SrsHttpHeaderField> headers;
std::string body;
int body_parsed;
public:
SrsHttpParser();
virtual ~SrsHttpParser();
... ...
... ... @@ -165,8 +165,14 @@ int SrsHttpClient::get(SrsHttpUri* uri, std::string req, SrsHttpMessage** ppmsg)
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
// for GET, server response no uri, we update with request uri.
if ((ret = msg->update(uri->get_url())) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
*ppmsg = msg;
srs_info("parse http get response success.");
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 131
#define VERSION_REVISION 132
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
... ...
... ... @@ -243,6 +243,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_STREAM_CASTER_AVC_SPS 4022
#define ERROR_STREAM_CASTER_AVC_PPS 4023
#define ERROR_STREAM_CASTER_FLV_TAG 4024
#define ERROR_HTTP_RESPONSE_EOF 4025
#define ERROR_HTTP_INVALID_CHUNK_HEADER 4026
/**
* whether the error code is an system control error.
... ...
... ... @@ -312,8 +312,6 @@ int run()
return run_master();
}
#include <srs_app_http.hpp>
#include <srs_app_http_client.hpp>
int run_master()
{
int ret = ERROR_SUCCESS;
... ... @@ -329,22 +327,6 @@ int run_master()
if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
return ret;
}
/*SrsHttpClient client;
SrsHttpUri uri;
if ((ret = uri.initialize("http://ossrs.net:8081/live/livestream.flv")) != ERROR_SUCCESS) {
return ret;
}
SrsHttpMessage* msg = NULL;
if ((ret = client.get(&uri, "", &msg)) != ERROR_SUCCESS) {
return ret;
}
for (;;) {
ISrsHttpResponseReader* br = msg->body_reader();
std::string data;
if ((ret = br->read(0, data)) != ERROR_SUCCESS) {
return ret;
}
}*/
if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
return ret;
... ...
... ... @@ -62,6 +62,16 @@ SrsFastBuffer::SrsFastBuffer()
p = end = buffer;
}
int SrsFastBuffer::size()
{
return end - p;
}
char* SrsFastBuffer::bytes()
{
return p;
}
void SrsFastBuffer::set_buffer(int buffer_size)
{
// the user-space buffer size limit to a max value.
... ...
... ... @@ -86,6 +86,16 @@ public:
virtual ~SrsFastBuffer();
public:
/**
* get the size of current bytes in buffer.
*/
virtual int size();
/**
* get the current bytes in buffer.
* @remark user should use read_slice() if possible,
* the bytes() is used to test bytes, for example, to detect the bytes schema.
*/
virtual char* bytes();
/**
* create buffer with specifeid size.
* @param buffer the size of buffer.
* @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
... ... @@ -110,6 +120,8 @@ public:
* skip some bytes in buffer.
* @param size the bytes to skip. positive to next; negative to previous.
* @remark assert buffer already grow(size).
* @remark always use read_slice to consume bytes, which will reset for EOF.
* while skip never consume bytes.
*/
virtual void skip(int size);
public:
... ...