winlin

Merge branch '2.0release' into develop

@@ -36,6 +36,13 @@ using namespace std; @@ -36,6 +36,13 @@ using namespace std;
36 #include <srs_app_http.hpp> 36 #include <srs_app_http.hpp>
37 #include <srs_app_http_conn.hpp> 37 #include <srs_app_http_conn.hpp>
38 #include <srs_core_autofree.hpp> 38 #include <srs_core_autofree.hpp>
  39 +#include <srs_kernel_flv.hpp>
  40 +#include <srs_rtmp_sdk.hpp>
  41 +#include <srs_rtmp_utility.hpp>
  42 +#include <srs_app_st_socket.hpp>
  43 +#include <srs_app_utility.hpp>
  44 +#include <srs_rtmp_amf0.hpp>
  45 +#include <srs_kernel_utility.hpp>
39 46
40 #define SRS_HTTP_FLV_STREAM_BUFFER 4096 47 #define SRS_HTTP_FLV_STREAM_BUFFER 4096
41 48
@@ -84,37 +91,348 @@ void SrsAppCasterFlv::remove(SrsConnection* c) @@ -84,37 +91,348 @@ void SrsAppCasterFlv::remove(SrsConnection* c)
84 91
85 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) 92 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
86 { 93 {
  94 + SrsDynamicHttpConn* conn = dynamic_cast<SrsDynamicHttpConn*>(r->connection());
  95 + srs_assert(conn);
  96 +
  97 + std::string app = srs_path_dirname(r->path());
  98 + app = srs_string_trim_start(app, "/");
  99 +
  100 + std::string stream = srs_path_basename(r->path());
  101 + stream = srs_string_trim_start(stream, "/");
  102 +
  103 + std::string o = output;
  104 + if (!app.empty() && app != "/") {
  105 + o = srs_string_replace(o, "[app]", app);
  106 + }
  107 + o = srs_string_replace(o, "[stream]", stream);
  108 +
  109 + // remove the extension.
  110 + if (srs_string_ends_with(o, ".flv")) {
  111 + o = o.substr(0, o.length() - 4);
  112 + }
  113 +
  114 + return conn->proxy(w, r, o);
  115 +}
  116 +
  117 +SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
  118 + : SrsHttpConn(cm, fd, m)
  119 +{
  120 +
  121 + req = NULL;
  122 + io = NULL;
  123 + client = NULL;
  124 + stfd = NULL;
  125 + stream_id = 0;
  126 +
  127 + pprint = SrsPithyPrint::create_caster();
  128 +}
  129 +
  130 +SrsDynamicHttpConn::~SrsDynamicHttpConn()
  131 +{
  132 + close();
  133 +
  134 + srs_freep(pprint);
  135 +}
  136 +
  137 +int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg)
  138 +{
  139 + int ret = ERROR_SUCCESS;
  140 + return ret;
  141 +}
  142 +
  143 +int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o)
  144 +{
87 int ret = ERROR_SUCCESS; 145 int ret = ERROR_SUCCESS;
88 146
89 - srs_info("flv: handle request at %s", r->path().c_str()); 147 + output = o;
  148 + srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str());
90 149
91 char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER]; 150 char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER];
92 SrsAutoFree(char, buffer); 151 SrsAutoFree(char, buffer);
93 152
94 ISrsHttpResponseReader* rr = r->body_reader(); 153 ISrsHttpResponseReader* rr = r->body_reader();
  154 + SrsHttpFileReader reader(rr);
  155 + SrsFlvDecoder dec;
  156 +
  157 + if ((ret = dec.initialize(&reader)) != ERROR_SUCCESS) {
  158 + return ret;
  159 + }
  160 +
  161 + char header[9];
  162 + if ((ret = dec.read_header(header)) != ERROR_SUCCESS) {
  163 + if (!srs_is_client_gracefully_close(ret)) {
  164 + srs_error("flv: proxy flv header failed. ret=%d", ret);
  165 + }
  166 + return ret;
  167 + }
  168 + srs_trace("flv: proxy drop flv header.");
  169 +
  170 + char pps[4];
  171 + if ((ret = dec.read_previous_tag_size(pps)) != ERROR_SUCCESS) {
  172 + if (!srs_is_client_gracefully_close(ret)) {
  173 + srs_error("flv: proxy flv header pps failed. ret=%d", ret);
  174 + }
  175 + return ret;
  176 + }
  177 +
95 while (!rr->eof()) { 178 while (!rr->eof()) {
96 - int nb_read = 0;  
97 - if ((ret = rr->read(buffer, SRS_HTTP_FLV_STREAM_BUFFER, &nb_read)) != ERROR_SUCCESS) { 179 + pprint->elapse();
  180 +
  181 + if ((ret = connect()) != ERROR_SUCCESS) {
  182 + return ret;
  183 + }
  184 +
  185 + char type;
  186 + int32_t size;
  187 + u_int32_t time;
  188 + if ((ret = dec.read_tag_header(&type, &size, &time)) != ERROR_SUCCESS) {
  189 + if (!srs_is_client_gracefully_close(ret)) {
  190 + srs_error("flv: proxy tag header failed. ret=%d", ret);
  191 + }
  192 + return ret;
  193 + }
  194 +
  195 + char* data = new char[size];
  196 + if ((ret = dec.read_tag_data(data, size)) != ERROR_SUCCESS) {
  197 + srs_freep(data);
  198 + if (!srs_is_client_gracefully_close(ret)) {
  199 + srs_error("flv: proxy tag data failed. ret=%d", ret);
  200 + }
  201 + return ret;
  202 + }
  203 +
  204 + if ((ret = rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) {
  205 + if (!srs_is_client_gracefully_close(ret)) {
  206 + srs_error("flv: proxy rtmp packet failed. ret=%d", ret);
  207 + }
  208 + return ret;
  209 + }
  210 +
  211 + if ((ret = dec.read_previous_tag_size(pps)) != ERROR_SUCCESS) {
  212 + if (!srs_is_client_gracefully_close(ret)) {
  213 + srs_error("flv: proxy tag header pps failed. ret=%d", ret);
  214 + }
98 return ret; 215 return ret;
99 } 216 }
100 - //srs_trace("flv: read %dB from %s", nb_read, r->path().c_str());  
101 } 217 }
102 218
103 return ret; 219 return ret;
104 } 220 }
105 221
106 -SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)  
107 - : SrsHttpConn(cm, fd, m) 222 +int SrsDynamicHttpConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
108 { 223 {
  224 + int ret = ERROR_SUCCESS;
  225 +
  226 + SrsSharedPtrMessage* msg = NULL;
  227 +
  228 + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
  229 + srs_error("flv: create shared ptr msg failed. ret=%d", ret);
  230 + return ret;
  231 + }
  232 + srs_assert(msg);
  233 +
  234 + if (pprint->can_print()) {
  235 + srs_trace("flv: send msg %s age=%d, dts=%"PRId64", size=%d",
  236 + msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size);
  237 + }
  238 +
  239 + // send out encoded msg.
  240 + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
  241 + return ret;
  242 + }
  243 +
  244 + return ret;
109 } 245 }
110 246
111 -SrsDynamicHttpConn::~SrsDynamicHttpConn() 247 +int SrsDynamicHttpConn::connect()
  248 +{
  249 + int ret = ERROR_SUCCESS;
  250 +
  251 + // when ok, ignore.
  252 + // TODO: FIXME: should reconnect when disconnected.
  253 + if (io || client) {
  254 + return ret;
  255 + }
  256 +
  257 + // parse uri
  258 + if (!req) {
  259 + req = new SrsRequest();
  260 +
  261 + size_t pos = string::npos;
  262 + string uri = req->tcUrl = output;
  263 +
  264 + // tcUrl, stream
  265 + if ((pos = uri.rfind("/")) != string::npos) {
  266 + req->stream = uri.substr(pos + 1);
  267 + req->tcUrl = uri = uri.substr(0, pos);
  268 + }
  269 +
  270 + srs_discovery_tc_url(req->tcUrl,
  271 + req->schema, req->host, req->vhost, req->app, req->port,
  272 + req->param);
  273 + }
  274 +
  275 + // connect host.
  276 + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
  277 + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret);
  278 + return ret;
  279 + }
  280 + io = new SrsStSocket(stfd);
  281 + client = new SrsRtmpClient(io);
  282 +
  283 + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  284 + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  285 +
  286 + // connect to vhost/app
  287 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  288 + srs_error("mpegts: handshake with server failed. ret=%d", ret);
  289 + return ret;
  290 + }
  291 + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
  292 + srs_error("mpegts: connect with server failed. ret=%d", ret);
  293 + return ret;
  294 + }
  295 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  296 + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  297 + return ret;
  298 + }
  299 +
  300 + // publish.
  301 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  302 + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
  303 + req->stream.c_str(), stream_id, ret);
  304 + return ret;
  305 + }
  306 +
  307 + return ret;
  308 +}
  309 +
  310 +// TODO: FIXME: refine the connect_app.
  311 +int SrsDynamicHttpConn::connect_app(string ep_server, string ep_port)
112 { 312 {
  313 + int ret = ERROR_SUCCESS;
  314 +
  315 + // args of request takes the srs info.
  316 + if (req->args == NULL) {
  317 + req->args = SrsAmf0Any::object();
  318 + }
  319 +
  320 + // notify server the edge identity,
  321 + // @see https://github.com/simple-rtmp-server/srs/issues/147
  322 + SrsAmf0Object* data = req->args;
  323 + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
  324 + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  325 + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
  326 + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
  327 + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
  328 + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
  329 + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
  330 + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
  331 + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
  332 + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
  333 + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
  334 + // for edge to directly get the id of client.
  335 + data->set("srs_pid", SrsAmf0Any::number(getpid()));
  336 + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
  337 +
  338 + // local ip of edge
  339 + std::vector<std::string> ips = srs_get_local_ipv4_ips();
  340 + assert(_srs_config->get_stats_network() < (int)ips.size());
  341 + std::string local_ip = ips[_srs_config->get_stats_network()];
  342 + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
  343 +
  344 + // generate the tcUrl
  345 + std::string param = "";
  346 + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
  347 +
  348 + // upnode server identity will show in the connect_app of client.
  349 + // @see https://github.com/simple-rtmp-server/srs/issues/160
  350 + // the debug_srs_upnode is config in vhost and default to true.
  351 + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
  352 + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
  353 + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
  354 + tc_url.c_str(), debug_srs_upnode, ret);
  355 + return ret;
  356 + }
  357 +
  358 + return ret;
113 } 359 }
114 360
115 -int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) 361 +void SrsDynamicHttpConn::close()
  362 +{
  363 + srs_freep(client);
  364 + srs_freep(io);
  365 + srs_freep(req);
  366 + srs_close_stfd(stfd);
  367 +}
  368 +
  369 +SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
  370 +{
  371 + http = h;
  372 +}
  373 +
  374 +SrsHttpFileReader::~SrsHttpFileReader()
  375 +{
  376 +}
  377 +
  378 +int SrsHttpFileReader::open(std::string /*file*/)
  379 +{
  380 + return ERROR_SUCCESS;
  381 +}
  382 +
  383 +void SrsHttpFileReader::close()
  384 +{
  385 +}
  386 +
  387 +bool SrsHttpFileReader::is_open()
  388 +{
  389 + return true;
  390 +}
  391 +
  392 +int64_t SrsHttpFileReader::tellg()
  393 +{
  394 + return 0;
  395 +}
  396 +
  397 +void SrsHttpFileReader::skip(int64_t /*size*/)
  398 +{
  399 +}
  400 +
  401 +int64_t SrsHttpFileReader::lseek(int64_t offset)
  402 +{
  403 + return offset;
  404 +}
  405 +
  406 +int64_t SrsHttpFileReader::filesize()
  407 +{
  408 + return 0;
  409 +}
  410 +
  411 +int SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread)
116 { 412 {
117 int ret = ERROR_SUCCESS; 413 int ret = ERROR_SUCCESS;
  414 +
  415 + if (http->eof()) {
  416 + ret = ERROR_HTTP_REQUEST_EOF;
  417 + srs_error("flv: encoder EOF. ret=%d", ret);
  418 + return ret;
  419 + }
  420 +
  421 + int total_read = 0;
  422 + while (total_read < count) {
  423 + int nread = 0;
  424 + if ((ret = http->read((char*)buf + total_read, (int)(count - total_read), &nread)) != ERROR_SUCCESS) {
  425 + return ret;
  426 + }
  427 +
  428 + srs_assert(nread);
  429 + total_read += nread;
  430 + }
  431 +
  432 + if (pnread) {
  433 + *pnread = total_read;
  434 + }
  435 +
118 return ret; 436 return ret;
119 } 437 }
120 438
@@ -38,13 +38,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -38,13 +38,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
38 class SrsConfDirective; 38 class SrsConfDirective;
39 class SrsHttpServeMux; 39 class SrsHttpServeMux;
40 class SrsHttpConn; 40 class SrsHttpConn;
  41 +class SrsRtmpClient;
  42 +class SrsStSocket;
  43 +class SrsRequest;
  44 +class SrsPithyPrint;
41 45
42 #include <srs_app_st.hpp> 46 #include <srs_app_st.hpp>
43 #include <srs_app_listener.hpp> 47 #include <srs_app_listener.hpp>
44 #include <srs_app_conn.hpp> 48 #include <srs_app_conn.hpp>
45 #include <srs_app_http.hpp> 49 #include <srs_app_http.hpp>
46 #include <srs_app_http_conn.hpp> 50 #include <srs_app_http_conn.hpp>
  51 +#include <srs_kernel_file.hpp>
47 52
  53 +/**
  54 + * the stream caster for flv stream over HTTP POST.
  55 + */
48 class SrsAppCasterFlv : virtual public ISrsTcpHandler 56 class SrsAppCasterFlv : virtual public ISrsTcpHandler
49 , virtual public IConnectionManager, virtual public ISrsHttpHandler 57 , virtual public IConnectionManager, virtual public ISrsHttpHandler
50 { 58 {
@@ -68,13 +76,68 @@ public: @@ -68,13 +76,68 @@ public:
68 virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r); 76 virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r);
69 }; 77 };
70 78
  79 +/**
  80 + * the dynamic http connection, never drop the body.
  81 + */
71 class SrsDynamicHttpConn : public SrsHttpConn 82 class SrsDynamicHttpConn : public SrsHttpConn
72 { 83 {
  84 +private:
  85 + std::string output;
  86 + SrsPithyPrint* pprint;
  87 +private:
  88 + SrsRequest* req;
  89 + st_netfd_t stfd;
  90 + SrsStSocket* io;
  91 + SrsRtmpClient* client;
  92 + int stream_id;
73 public: 93 public:
74 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); 94 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
75 virtual ~SrsDynamicHttpConn(); 95 virtual ~SrsDynamicHttpConn();
76 public: 96 public:
77 virtual int on_got_http_message(SrsHttpMessage* msg); 97 virtual int on_got_http_message(SrsHttpMessage* msg);
  98 +public:
  99 + virtual int proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o);
  100 +private:
  101 + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
  102 +private:
  103 + // connect to rtmp output url.
  104 + // @remark ignore when not connected, reconnect when disconnected.
  105 + virtual int connect();
  106 + virtual int connect_app(std::string ep_server, std::string ep_port);
  107 + // close the connected io and rtmp to ready to be re-connect.
  108 + virtual void close();
  109 +};
  110 +
  111 +/**
  112 + * the http wrapper for file reader,
  113 + * to read http post stream like a file.
  114 + */
  115 +class SrsHttpFileReader : public SrsFileReader
  116 +{
  117 +private:
  118 + ISrsHttpResponseReader* http;
  119 +public:
  120 + SrsHttpFileReader(ISrsHttpResponseReader* h);
  121 + virtual ~SrsHttpFileReader();
  122 +public:
  123 + /**
  124 + * open file reader, can open then close then open...
  125 + */
  126 + virtual int open(std::string file);
  127 + virtual void close();
  128 +public:
  129 + // TODO: FIXME: extract interface.
  130 + virtual bool is_open();
  131 + virtual int64_t tellg();
  132 + virtual void skip(int64_t size);
  133 + virtual int64_t lseek(int64_t offset);
  134 + virtual int64_t filesize();
  135 +public:
  136 + /**
  137 + * read from file.
  138 + * @param pnread the output nb_read, NULL to ignore.
  139 + */
  140 + virtual int read(void* buf, size_t count, ssize_t* pnread);
78 }; 141 };
79 142
80 #endif 143 #endif
@@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) @@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
1078 return ret; 1078 return ret;
1079 } 1079 }
1080 1080
1081 -SrsHttpMessage::SrsHttpMessage(SrsStSocket* io) 1081 +SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c)
1082 { 1082 {
  1083 + conn = c;
1083 chunked = false; 1084 chunked = false;
1084 _uri = new SrsHttpUri(); 1085 _uri = new SrsHttpUri();
1085 _body = new SrsHttpResponseReader(this, io); 1086 _body = new SrsHttpResponseReader(this, io);
@@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer() @@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer()
1165 return _http_ts_send_buffer; 1166 return _http_ts_send_buffer;
1166 } 1167 }
1167 1168
  1169 +SrsConnection* SrsHttpMessage::connection()
  1170 +{
  1171 + return conn;
  1172 +}
  1173 +
1168 u_int8_t SrsHttpMessage::method() 1174 u_int8_t SrsHttpMessage::method()
1169 { 1175 {
1170 return (u_int8_t)_header.method; 1176 return (u_int8_t)_header.method;
@@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri() @@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri()
1230 { 1236 {
1231 std::string uri = _uri->get_schema(); 1237 std::string uri = _uri->get_schema();
1232 if (uri.empty()) { 1238 if (uri.empty()) {
1233 - uri += "http://"; 1239 + uri += "http";
1234 } 1240 }
  1241 + uri += "://";
1235 1242
1236 uri += host(); 1243 uri += host();
1237 uri += path(); 1244 uri += path();
@@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type) @@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type)
1393 return ret; 1400 return ret;
1394 } 1401 }
1395 1402
1396 -int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) 1403 +int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg)
1397 { 1404 {
1398 *ppmsg = NULL; 1405 *ppmsg = NULL;
1399 1406
@@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) @@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
1418 } 1425 }
1419 1426
1420 // create msg 1427 // create msg
1421 - SrsHttpMessage* msg = new SrsHttpMessage(skt); 1428 + SrsHttpMessage* msg = new SrsHttpMessage(skt, conn);
1422 1429
1423 // initalize http msg, parse url. 1430 // initalize http msg, parse url.
1424 if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) { 1431 if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) {
@@ -50,6 +50,7 @@ class SrsSimpleBuffer; @@ -50,6 +50,7 @@ class SrsSimpleBuffer;
50 class SrsHttpMuxEntry; 50 class SrsHttpMuxEntry;
51 class ISrsHttpResponseWriter; 51 class ISrsHttpResponseWriter;
52 class SrsFastBuffer; 52 class SrsFastBuffer;
  53 +class SrsConnection;
53 54
54 // http specification 55 // http specification
55 // CR = <US-ASCII CR, carriage return (13)> 56 // CR = <US-ASCII CR, carriage return (13)>
@@ -506,8 +507,10 @@ private: @@ -506,8 +507,10 @@ private:
506 std::vector<SrsHttpHeaderField> _headers; 507 std::vector<SrsHttpHeaderField> _headers;
507 // the query map 508 // the query map
508 std::map<std::string, std::string> _query; 509 std::map<std::string, std::string> _query;
  510 + // the transport connection, can be NULL.
  511 + SrsConnection* conn;
509 public: 512 public:
510 - SrsHttpMessage(SrsStSocket* io); 513 + SrsHttpMessage(SrsStSocket* io, SrsConnection* c);
511 virtual ~SrsHttpMessage(); 514 virtual ~SrsHttpMessage();
512 public: 515 public:
513 /** 516 /**
@@ -518,6 +521,7 @@ public: @@ -518,6 +521,7 @@ public:
518 ); 521 );
519 public: 522 public:
520 virtual char* http_ts_send_buffer(); 523 virtual char* http_ts_send_buffer();
  524 + virtual SrsConnection* connection();
521 public: 525 public:
522 virtual u_int8_t method(); 526 virtual u_int8_t method();
523 virtual u_int16_t status_code(); 527 virtual u_int16_t status_code();
@@ -617,7 +621,7 @@ public: @@ -617,7 +621,7 @@ public:
617 * or error and *ppmsg must be NULL. 621 * or error and *ppmsg must be NULL.
618 * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). 622 * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
619 */ 623 */
620 - virtual int parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg); 624 + virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg);
621 private: 625 private:
622 /** 626 /**
623 * parse the HTTP message to member field: msg. 627 * parse the HTTP message to member field: msg.
@@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle() @@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle()
528 SrsHttpMessage* req = NULL; 528 SrsHttpMessage* req = NULL;
529 529
530 // get a http message 530 // get a http message
531 - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { 531 + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
532 return ret; 532 return ret;
533 } 533 }
534 534
@@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg) @@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg)
105 } 105 }
106 106
107 SrsHttpMessage* msg = NULL; 107 SrsHttpMessage* msg = NULL;
108 - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { 108 + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) {
109 srs_error("parse http post response failed. ret=%d", ret); 109 srs_error("parse http post response failed. ret=%d", ret);
110 return ret; 110 return ret;
111 } 111 }
@@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg) @@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg)
151 } 151 }
152 152
153 SrsHttpMessage* msg = NULL; 153 SrsHttpMessage* msg = NULL;
154 - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { 154 + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) {
155 srs_error("parse http post response failed. ret=%d", ret); 155 srs_error("parse http post response failed. ret=%d", ret);
156 return ret; 156 return ret;
157 } 157 }
@@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle() @@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle()
1388 SrsHttpMessage* req = NULL; 1388 SrsHttpMessage* req = NULL;
1389 1389
1390 // get a http message 1390 // get a http message
1391 - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { 1391 + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
1392 return ret; 1392 return ret;
1393 } 1393 }
1394 1394
@@ -255,6 +255,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -255,6 +255,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
255 #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 255 #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026
256 #define ERROR_AVC_NALU_UEV 4027 256 #define ERROR_AVC_NALU_UEV 4027
257 #define ERROR_AAC_BYTES_INVALID 4028 257 #define ERROR_AAC_BYTES_INVALID 4028
  258 +#define ERROR_HTTP_REQUEST_EOF 4029
258 259
259 /////////////////////////////////////////////////////// 260 ///////////////////////////////////////////////////////
260 // user-define error. 261 // user-define error.