winlin

refine srs buffer, min interfaces

@@ -552,7 +552,7 @@ char* SrsHttpMessage::http_ts_send_buffer() @@ -552,7 +552,7 @@ char* SrsHttpMessage::http_ts_send_buffer()
552 void SrsHttpMessage::reset() 552 void SrsHttpMessage::reset()
553 { 553 {
554 _state = SrsHttpParseStateInit; 554 _state = SrsHttpParseStateInit;
555 - _body->clear(); 555 + _body->erase(_body->length());
556 _url = ""; 556 _url = "";
557 } 557 }
558 558
@@ -666,8 +666,8 @@ string SrsHttpMessage::body() @@ -666,8 +666,8 @@ string SrsHttpMessage::body()
666 { 666 {
667 std::string b; 667 std::string b;
668 668
669 - if (_body && !_body->empty()) {  
670 - b.append(_body->bytes(), _body->size()); 669 + if (_body && _body->length() > 0) {
  670 + b.append(_body->bytes(), _body->length());
671 } 671 }
672 672
673 return b; 673 return b;
@@ -675,16 +675,12 @@ string SrsHttpMessage::body() @@ -675,16 +675,12 @@ string SrsHttpMessage::body()
675 675
676 char* SrsHttpMessage::body_raw() 676 char* SrsHttpMessage::body_raw()
677 { 677 {
678 - if (_body && !_body->empty()) {  
679 - return _body->bytes();  
680 - }  
681 -  
682 - return NULL; 678 + return _body? _body->bytes() : NULL;
683 } 679 }
684 680
685 int64_t SrsHttpMessage::body_size() 681 int64_t SrsHttpMessage::body_size()
686 { 682 {
687 - return (int64_t)_body->size(); 683 + return (int64_t)_body->length();
688 } 684 }
689 685
690 int64_t SrsHttpMessage::content_length() 686 int64_t SrsHttpMessage::content_length()
@@ -44,43 +44,38 @@ SrsBuffer::~SrsBuffer() @@ -44,43 +44,38 @@ SrsBuffer::~SrsBuffer()
44 { 44 {
45 } 45 }
46 46
47 -int SrsBuffer::size() 47 +int SrsBuffer::length()
48 { 48 {
49 - return (int)data.size();  
50 -}  
51 -  
52 -bool SrsBuffer::empty()  
53 -{  
54 - return size() <= 0; 49 + int len = (int)data.size();
  50 + srs_assert(len >= 0);
  51 + return len;
55 } 52 }
56 53
57 char* SrsBuffer::bytes() 54 char* SrsBuffer::bytes()
58 { 55 {
59 - return &data.at(0); 56 + return (length() == 0)? NULL : &data.at(0);
60 } 57 }
61 58
62 -void SrsBuffer::erase(int _size) 59 +void SrsBuffer::erase(int size)
63 { 60 {
64 - if (_size == size()) {  
65 - clear(); 61 + srs_assert(size > 0);
  62 +
  63 + if (size == length()) {
  64 + data.clear();
66 return; 65 return;
67 } 66 }
68 67
69 - data.erase(data.begin(), data.begin() + _size);  
70 -}  
71 -  
72 -void SrsBuffer::clear()  
73 -{  
74 - data.clear(); 68 + data.erase(data.begin(), data.begin() + size);
75 } 69 }
76 70
77 void SrsBuffer::append(const char* bytes, int size) 71 void SrsBuffer::append(const char* bytes, int size)
78 { 72 {
79 srs_assert(size > 0); 73 srs_assert(size > 0);
  74 +
80 data.insert(data.end(), bytes, bytes + size); 75 data.insert(data.end(), bytes, bytes + size);
81 } 76 }
82 77
83 -int SrsBuffer::ensure_buffer_bytes(ISrsBufferReader* skt, int required_size) 78 +int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
84 { 79 {
85 int ret = ERROR_SUCCESS; 80 int ret = ERROR_SUCCESS;
86 81
@@ -90,11 +85,11 @@ int SrsBuffer::ensure_buffer_bytes(ISrsBufferReader* skt, int required_size) @@ -90,11 +85,11 @@ int SrsBuffer::ensure_buffer_bytes(ISrsBufferReader* skt, int required_size)
90 return ret; 85 return ret;
91 } 86 }
92 87
93 - while (size() < required_size) { 88 + while (length() < required_size) {
94 char buffer[SOCKET_READ_SIZE]; 89 char buffer[SOCKET_READ_SIZE];
95 90
96 ssize_t nread; 91 ssize_t nread;
97 - if ((ret = skt->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) { 92 + if ((ret = reader->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
98 return ret; 93 return ret;
99 } 94 }
100 95
@@ -50,7 +50,7 @@ public: @@ -50,7 +50,7 @@ public:
50 * protocol recv data from socket, put into buffer, decode to RTMP message. 50 * protocol recv data from socket, put into buffer, decode to RTMP message.
51 * protocol encode RTMP message to bytes, put into buffer, send to socket. 51 * protocol encode RTMP message to bytes, put into buffer, send to socket.
52 */ 52 */
53 -class SrsBuffer 53 + class SrsBuffer
54 { 54 {
55 private: 55 private:
56 std::vector<char> data; 56 std::vector<char> data;
@@ -58,14 +58,34 @@ public: @@ -58,14 +58,34 @@ public:
58 SrsBuffer(); 58 SrsBuffer();
59 virtual ~SrsBuffer(); 59 virtual ~SrsBuffer();
60 public: 60 public:
61 - virtual int size();  
62 - virtual bool empty(); 61 + /**
  62 + * get the length of buffer.
  63 + * never negative, empty if zero.
  64 + */
  65 + virtual int length();
  66 + /**
  67 + * get the buffer bytes.
  68 + * @return the bytes, NULL if empty.
  69 + */
63 virtual char* bytes(); 70 virtual char* bytes();
  71 + /**
  72 + * erase size of bytes from begin.
  73 + * if size equals to length(), clear buffer.
  74 + * @param size
  75 + */
64 virtual void erase(int size); 76 virtual void erase(int size);
65 - virtual void clear(); 77 + /**
  78 + * append specified bytes to buffer.
  79 + * @param size the size of bytes, assert positive.
  80 + */
66 virtual void append(const char* bytes, int size); 81 virtual void append(const char* bytes, int size);
67 public: 82 public:
68 - virtual int ensure_buffer_bytes(ISrsBufferReader* skt, int required_size); 83 + /**
  84 + * grow buffer to the required size, loop to read from skt to fill.
  85 + * @param reader, read more bytes from reader to fill the buffer to required size.
  86 + * @param required_size, loop to fill to ensure buffer size to required.
  87 + */
  88 + virtual int grow(ISrsBufferReader* reader, int required_size);
69 }; 89 };
70 90
71 #endif 91 #endif
@@ -869,7 +869,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -869,7 +869,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
869 int ret = ERROR_SUCCESS; 869 int ret = ERROR_SUCCESS;
870 870
871 int required_size = 1; 871 int required_size = 1;
872 - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 872 + if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
873 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 873 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
874 srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 874 srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
875 } 875 }
@@ -889,7 +889,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -889,7 +889,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
889 889
890 if (cid == 0) { 890 if (cid == 0) {
891 required_size = 2; 891 required_size = 2;
892 - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 892 + if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
893 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 893 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
894 srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 894 srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
895 } 895 }
@@ -902,7 +902,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -902,7 +902,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
902 srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); 902 srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
903 } else if (cid == 1) { 903 } else if (cid == 1) {
904 required_size = 3; 904 required_size = 3;
905 - if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { 905 + if ((ret = buffer->grow(skt, 3)) != ERROR_SUCCESS) {
906 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 906 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
907 srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 907 srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
908 } 908 }
@@ -982,7 +982,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -982,7 +982,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
982 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); 982 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
983 983
984 int required_size = bh_size + mh_size; 984 int required_size = bh_size + mh_size;
985 - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 985 + if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
986 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 986 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
987 srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 987 srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
988 } 988 }
@@ -1092,7 +1092,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1092,7 +1092,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1092 mh_size += 4; 1092 mh_size += 4;
1093 required_size = bh_size + mh_size; 1093 required_size = bh_size + mh_size;
1094 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); 1094 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
1095 - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 1095 + if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
1096 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1096 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1097 srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 1097 srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
1098 } 1098 }
@@ -1219,7 +1219,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -1219,7 +1219,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
1219 1219
1220 // read payload to buffer 1220 // read payload to buffer
1221 int required_size = bh_size + mh_size + payload_size; 1221 int required_size = bh_size + mh_size + payload_size;
1222 - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 1222 + if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
1223 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1223 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1224 srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); 1224 srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
1225 } 1225 }