winlin

fix the http read chunked encoding bug.

@@ -37,6 +37,8 @@ using namespace std; @@ -37,6 +37,8 @@ using namespace std;
37 #include <srs_app_http_conn.hpp> 37 #include <srs_app_http_conn.hpp>
38 #include <srs_core_autofree.hpp> 38 #include <srs_core_autofree.hpp>
39 39
  40 +#define SRS_HTTP_FLV_STREAM_BUFFER 4096
  41 +
40 SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c) 42 SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c)
41 { 43 {
42 http_mux = new SrsHttpServeMux(); 44 http_mux = new SrsHttpServeMux();
@@ -62,7 +64,7 @@ int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd) @@ -62,7 +64,7 @@ int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd)
62 { 64 {
63 int ret = ERROR_SUCCESS; 65 int ret = ERROR_SUCCESS;
64 66
65 - SrsHttpConn* conn = new SrsHttpConn(this, stfd, http_mux); 67 + SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux);
66 conns.push_back(conn); 68 conns.push_back(conn);
67 69
68 if ((ret = conn->start()) != ERROR_SUCCESS) { 70 if ((ret = conn->start()) != ERROR_SUCCESS) {
@@ -79,7 +81,7 @@ void SrsAppCasterFlv::remove(SrsConnection* c) @@ -79,7 +81,7 @@ void SrsAppCasterFlv::remove(SrsConnection* c)
79 conns.erase(it); 81 conns.erase(it);
80 } 82 }
81 } 83 }
82 -#define SRS_HTTP_FLV_STREAM_BUFFER 4096 84 +
83 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) 85 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
84 { 86 {
85 int ret = ERROR_SUCCESS; 87 int ret = ERROR_SUCCESS;
@@ -95,10 +97,25 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) @@ -95,10 +97,25 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
95 if ((ret = rr->read(buffer, SRS_HTTP_FLV_STREAM_BUFFER, &nb_read)) != ERROR_SUCCESS) { 97 if ((ret = rr->read(buffer, SRS_HTTP_FLV_STREAM_BUFFER, &nb_read)) != ERROR_SUCCESS) {
96 return ret; 98 return ret;
97 } 99 }
98 - srs_trace("flv: read %dB from %s", nb_read, r->path().c_str()); 100 + //srs_trace("flv: read %dB from %s", nb_read, r->path().c_str());
99 } 101 }
100 102
101 return ret; 103 return ret;
102 } 104 }
103 105
  106 +SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
  107 + : SrsHttpConn(cm, fd, m)
  108 +{
  109 +}
  110 +
  111 +SrsDynamicHttpConn::~SrsDynamicHttpConn()
  112 +{
  113 +}
  114 +
  115 +int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg)
  116 +{
  117 + int ret = ERROR_SUCCESS;
  118 + return ret;
  119 +}
  120 +
104 #endif 121 #endif
@@ -43,6 +43,7 @@ class SrsHttpConn; @@ -43,6 +43,7 @@ class SrsHttpConn;
43 #include <srs_app_listener.hpp> 43 #include <srs_app_listener.hpp>
44 #include <srs_app_conn.hpp> 44 #include <srs_app_conn.hpp>
45 #include <srs_app_http.hpp> 45 #include <srs_app_http.hpp>
  46 +#include <srs_app_http_conn.hpp>
46 47
47 class SrsAppCasterFlv : virtual public ISrsTcpHandler 48 class SrsAppCasterFlv : virtual public ISrsTcpHandler
48 , virtual public IConnectionManager, virtual public ISrsHttpHandler 49 , virtual public IConnectionManager, virtual public ISrsHttpHandler
@@ -67,6 +68,15 @@ public: @@ -67,6 +68,15 @@ public:
67 virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r); 68 virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r);
68 }; 69 };
69 70
  71 +class SrsDynamicHttpConn : public SrsHttpConn
  72 +{
  73 +public:
  74 + SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
  75 + virtual ~SrsDynamicHttpConn();
  76 +public:
  77 + virtual int on_got_http_message(SrsHttpMessage* msg);
  78 +};
  79 +
70 #endif 80 #endif
71 81
72 #endif 82 #endif
@@ -905,6 +905,7 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body) @@ -905,6 +905,7 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body)
905 { 905 {
906 int ret = ERROR_SUCCESS; 906 int ret = ERROR_SUCCESS;
907 907
  908 + nb_chunk = 0;
908 nb_left_chunk = 0; 909 nb_left_chunk = 0;
909 nb_total_read = 0; 910 nb_total_read = 0;
910 buffer = body; 911 buffer = body;
@@ -999,33 +1000,35 @@ int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read) @@ -999,33 +1000,35 @@ int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read)
999 } 1000 }
1000 1001
1001 // all bytes in chunk is left now. 1002 // all bytes in chunk is left now.
1002 - nb_left_chunk = ilength; 1003 + nb_chunk = nb_left_chunk = ilength;
1003 } 1004 }
1004 1005
1005 - // left bytes in chunk, read some.  
1006 - srs_assert(nb_left_chunk);  
1007 -  
1008 - int nb_bytes = srs_min(nb_left_chunk, nb_data);  
1009 - ret = read_specified(data, nb_bytes, &nb_bytes);  
1010 -  
1011 - // the nb_bytes used for output already read size of bytes.  
1012 - if (nb_read) {  
1013 - *nb_read = nb_bytes;  
1014 - }  
1015 - nb_left_chunk -= nb_bytes;  
1016 -  
1017 - // error or still left bytes in chunk, ignore and read in future.  
1018 - if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) {  
1019 - return ret;  
1020 - }  
1021 - srs_info("http: read %d bytes of chunk", nb_bytes);  
1022 -  
1023 - // read payload when length specifies some payload.  
1024 - if (nb_left_chunk <= 0) { 1006 + if (nb_chunk <= 0) {
  1007 + // for the last chunk, eof.
1025 is_eof = true; 1008 is_eof = true;
  1009 + } else {
  1010 + // for not the last chunk, there must always exists bytes.
  1011 + // left bytes in chunk, read some.
  1012 + srs_assert(nb_left_chunk);
  1013 +
  1014 + int nb_bytes = srs_min(nb_left_chunk, nb_data);
  1015 + ret = read_specified(data, nb_bytes, &nb_bytes);
  1016 +
  1017 + // the nb_bytes used for output already read size of bytes.
  1018 + if (nb_read) {
  1019 + *nb_read = nb_bytes;
  1020 + }
  1021 + nb_left_chunk -= nb_bytes;
  1022 + srs_info("http: read %d bytes of chunk", nb_bytes);
  1023 +
  1024 + // error or still left bytes in chunk, ignore and read in future.
  1025 + if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) {
  1026 + return ret;
  1027 + }
  1028 + srs_info("http: read total chunk %dB", nb_chunk);
1026 } 1029 }
1027 1030
1028 - // the CRLF of chunk payload end. 1031 + // for both the last or not, the CRLF of chunk payload end.
1029 if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) { 1032 if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) {
1030 if (!srs_is_client_gracefully_close(ret)) { 1033 if (!srs_is_client_gracefully_close(ret)) {
1031 srs_error("read EOF of chunk from server failed. ret=%d", ret); 1034 srs_error("read EOF of chunk from server failed. ret=%d", ret);
@@ -1064,9 +1067,12 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) @@ -1064,9 +1067,12 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
1064 // increase the total read to determine whether EOF. 1067 // increase the total read to determine whether EOF.
1065 nb_total_read += nb_bytes; 1068 nb_total_read += nb_bytes;
1066 1069
1067 - // when read completed, eof.  
1068 - if (nb_total_read >= (int)owner->content_length()) {  
1069 - is_eof = true; 1070 + // for not chunked
  1071 + if (!owner->is_chunked()) {
  1072 + // when read completed, eof.
  1073 + if (nb_total_read >= (int)owner->content_length()) {
  1074 + is_eof = true;
  1075 + }
1070 } 1076 }
1071 1077
1072 return ret; 1078 return ret;
@@ -436,6 +436,8 @@ private: @@ -436,6 +436,8 @@ private:
436 bool is_eof; 436 bool is_eof;
437 // the left bytes in chunk. 437 // the left bytes in chunk.
438 int nb_left_chunk; 438 int nb_left_chunk;
  439 + // the number of bytes of current chunk.
  440 + int nb_chunk;
439 // already read total bytes. 441 // already read total bytes.
440 int64_t nb_total_read; 442 int64_t nb_total_read;
441 public: 443 public:
@@ -1398,11 +1398,8 @@ int SrsHttpConn::do_cycle() @@ -1398,11 +1398,8 @@ int SrsHttpConn::do_cycle()
1398 // always free it in this scope. 1398 // always free it in this scope.
1399 SrsAutoFree(SrsHttpMessage, req); 1399 SrsAutoFree(SrsHttpMessage, req);
1400 1400
1401 - // TODO: FIXME: use the post body.  
1402 - std::string res;  
1403 -  
1404 - // get response body.  
1405 - if ((ret = req->body_read_all(res)) != ERROR_SUCCESS) { 1401 + // may should discard the body.
  1402 + if ((ret = on_got_http_message(req)) != ERROR_SUCCESS) {
1406 return ret; 1403 return ret;
1407 } 1404 }
1408 1405
@@ -1434,5 +1431,29 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r) @@ -1434,5 +1431,29 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
1434 return ret; 1431 return ret;
1435 } 1432 }
1436 1433
  1434 +SrsStaticHttpConn::SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
  1435 + : SrsHttpConn(cm, fd, m)
  1436 +{
  1437 +}
  1438 +
  1439 +SrsStaticHttpConn::~SrsStaticHttpConn()
  1440 +{
  1441 +}
  1442 +
  1443 +int SrsStaticHttpConn::on_got_http_message(SrsHttpMessage* msg)
  1444 +{
  1445 + int ret = ERROR_SUCCESS;
  1446 +
  1447 + // TODO: FIXME: use the post body.
  1448 + std::string res;
  1449 +
  1450 + // get response body.
  1451 + if ((ret = msg->body_read_all(res)) != ERROR_SUCCESS) {
  1452 + return ret;
  1453 + }
  1454 +
  1455 + return ret;
  1456 +}
  1457 +
1437 #endif 1458 #endif
1438 1459
@@ -388,10 +388,24 @@ public: @@ -388,10 +388,24 @@ public:
388 virtual void cleanup(); 388 virtual void cleanup();
389 protected: 389 protected:
390 virtual int do_cycle(); 390 virtual int do_cycle();
  391 +protected:
  392 + // when got http message,
  393 + // for the static service or api, discard any body.
  394 + // for the stream caster, for instance, http flv streaming, may discard the flv header or not.
  395 + virtual int on_got_http_message(SrsHttpMessage* msg) = 0;
391 private: 396 private:
392 virtual int process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r); 397 virtual int process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r);
393 }; 398 };
394 399
  400 +class SrsStaticHttpConn : public SrsHttpConn
  401 +{
  402 +public:
  403 + SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
  404 + virtual ~SrsStaticHttpConn();
  405 +public:
  406 + virtual int on_got_http_message(SrsHttpMessage* msg);
  407 +};
  408 +
395 #endif 409 #endif
396 410
397 #endif 411 #endif
@@ -1163,7 +1163,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1163,7 +1163,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1163 #endif 1163 #endif
1164 } else if (type == SrsListenerHttpStream) { 1164 } else if (type == SrsListenerHttpStream) {
1165 #ifdef SRS_AUTO_HTTP_SERVER 1165 #ifdef SRS_AUTO_HTTP_SERVER
1166 - conn = new SrsHttpConn(this, client_stfd, &http_stream_mux->mux); 1166 + conn = new SrsStaticHttpConn(this, client_stfd, &http_stream_mux->mux);
1167 #else 1167 #else
1168 srs_warn("close http client for server not support http-server"); 1168 srs_warn("close http client for server not support http-server");
1169 srs_close_stfd(client_stfd); 1169 srs_close_stfd(client_stfd);