winlin

fix #248, improve about 15% performance for fast buffer. 2.0.49

@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 48 34 +#define VERSION_REVISION 49
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -134,6 +134,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -134,6 +134,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
134 #define ERROR_OpenSslSha256DigestSize 2037 134 #define ERROR_OpenSslSha256DigestSize 2037
135 #define ERROR_OpenSslGetPeerPublicKey 2038 135 #define ERROR_OpenSslGetPeerPublicKey 2038
136 #define ERROR_OpenSslComputeSharedKey 2039 136 #define ERROR_OpenSslComputeSharedKey 2039
  137 +#define ERROR_RTMP_BUFFER_OVERFLOW 2040
137 // 138 //
138 // system control message, 139 // system control message,
139 // not an error, but special control logic. 140 // not an error, but special control logic.
@@ -27,6 +27,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -27,6 +27,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 #include <srs_kernel_log.hpp> 27 #include <srs_kernel_log.hpp>
28 #include <srs_kernel_utility.hpp> 28 #include <srs_kernel_utility.hpp>
29 29
  30 +// the max header size,
  31 +// @see SrsProtocol::read_message_header().
  32 +#define SRS_RTMP_MAX_MESSAGE_HEADER 11
  33 +
30 SrsSimpleBuffer::SrsSimpleBuffer() 34 SrsSimpleBuffer::SrsSimpleBuffer()
31 { 35 {
32 } 36 }
@@ -81,8 +85,10 @@ SrsFastBuffer::SrsFastBuffer() @@ -81,8 +85,10 @@ SrsFastBuffer::SrsFastBuffer()
81 merged_read = false; 85 merged_read = false;
82 _handler = NULL; 86 _handler = NULL;
83 87
84 - nb_buffer = SOCKET_READ_SIZE;  
85 - buffer = new char[nb_buffer]; 88 + p = end = buffer = NULL;
  89 + nb_buffer = 0;
  90 +
  91 + reset_buffer(SOCKET_READ_SIZE);
86 } 92 }
87 93
88 SrsFastBuffer::~SrsFastBuffer() 94 SrsFastBuffer::~SrsFastBuffer()
@@ -90,37 +96,34 @@ SrsFastBuffer::~SrsFastBuffer() @@ -90,37 +96,34 @@ SrsFastBuffer::~SrsFastBuffer()
90 srs_freep(buffer); 96 srs_freep(buffer);
91 } 97 }
92 98
93 -int SrsFastBuffer::length() 99 +char SrsFastBuffer::read_1byte()
94 { 100 {
95 - int len = (int)data.size();  
96 - srs_assert(len >= 0);  
97 - return len;  
98 -}  
99 -  
100 -char* SrsFastBuffer::bytes()  
101 -{  
102 - return (length() == 0)? NULL : &data.at(0); 101 + srs_assert(end - p >= 1);
  102 + return *p++;
103 } 103 }
104 104
105 -void SrsFastBuffer::erase(int size) 105 +char* SrsFastBuffer::read_slice(int size)
106 { 106 {
107 - if (size <= 0) {  
108 - return;  
109 - } 107 + srs_assert(end - p >= size);
  108 + srs_assert(p + size > buffer);
110 109
111 - if (size >= length()) {  
112 - data.clear();  
113 - return;  
114 - } 110 + char* ptr = p;
  111 + p += size;
115 112
116 - data.erase(data.begin(), data.begin() + size); 113 + // reset when consumed all.
  114 + if (p == end) {
  115 + p = end = buffer;
  116 + srs_verbose("all consumed, reset fast buffer");
  117 + }
  118 +
  119 + return ptr;
117 } 120 }
118 121
119 -void SrsFastBuffer::append(const char* bytes, int size) 122 +void SrsFastBuffer::skip(int size)
120 { 123 {
121 - srs_assert(size > 0);  
122 -  
123 - data.insert(data.end(), bytes, bytes + size); 124 + srs_assert(end - p >= size);
  125 + srs_assert(p + size > buffer);
  126 + p += size;
124 } 127 }
125 128
126 int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) 129 int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
@@ -133,9 +136,27 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -133,9 +136,27 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
133 return ret; 136 return ret;
134 } 137 }
135 138
136 - while (length() < required_size) { 139 + // when read payload and need to grow, reset buffer.
  140 + if (end - p < required_size && required_size > SRS_RTMP_MAX_MESSAGE_HEADER) {
  141 + int nb_cap = end - p;
  142 + srs_verbose("move fast buffer %d bytes", nb_cap);
  143 + buffer = (char*)memmove(buffer, p, nb_cap);
  144 + p = buffer;
  145 + end = p + nb_cap;
  146 + }
  147 +
  148 + while (end - p < required_size) {
  149 + // the max to read is the left bytes.
  150 + size_t max_to_read = buffer + nb_buffer - end;
  151 +
  152 + if (max_to_read <= 0) {
  153 + ret = ERROR_RTMP_BUFFER_OVERFLOW;
  154 + srs_error("buffer overflow, required=%d, max=%d, ret=%d", required_size, nb_buffer, ret);
  155 + return ret;
  156 + }
  157 +
137 ssize_t nread; 158 ssize_t nread;
138 - if ((ret = reader->read(buffer, nb_buffer, &nread)) != ERROR_SUCCESS) { 159 + if ((ret = reader->read(end, max_to_read, &nread)) != ERROR_SUCCESS) {
139 return ret; 160 return ret;
140 } 161 }
141 162
@@ -149,8 +170,9 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -149,8 +170,9 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
149 _handler->on_read(nread); 170 _handler->on_read(nread);
150 } 171 }
151 172
  173 + // we just move the ptr to next.
152 srs_assert((int)nread > 0); 174 srs_assert((int)nread > 0);
153 - append(buffer, (int)nread); 175 + end += nread;
154 } 176 }
155 177
156 return ret; 178 return ret;
@@ -198,8 +220,19 @@ int SrsFastBuffer::buffer_size() @@ -198,8 +220,19 @@ int SrsFastBuffer::buffer_size()
198 220
199 void SrsFastBuffer::reset_buffer(int size) 221 void SrsFastBuffer::reset_buffer(int size)
200 { 222 {
  223 + // remember the cap.
  224 + int nb_cap = end - p;
  225 +
  226 + // atleast to put the old data.
  227 + nb_buffer = srs_max(nb_cap, size);
  228 +
  229 + // copy old data to buf.
  230 + char* buf = new char[nb_buffer];
  231 + if (nb_cap > 0) {
  232 + memcpy(buf, p, nb_cap);
  233 + }
  234 +
201 srs_freep(buffer); 235 srs_freep(buffer);
202 -  
203 - nb_buffer = size;  
204 - buffer = new char[nb_buffer]; 236 + p = buffer = buf;
  237 + end = p + nb_cap;
205 } 238 }
@@ -116,39 +116,40 @@ private: @@ -116,39 +116,40 @@ private:
116 // the merged handler 116 // the merged handler
117 bool merged_read; 117 bool merged_read;
118 IMergeReadHandler* _handler; 118 IMergeReadHandler* _handler;
119 - // data and socket buffer  
120 - std::vector<char> data; 119 + // the user-space buffer to fill by reader,
  120 + // which use fast index and reset when chunk body read ok.
  121 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/248
  122 + // ptr to the current read position.
  123 + char* p;
  124 + // ptr to the content end.
  125 + char* end;
  126 + // ptr to the buffer.
  127 + // buffer <= p <= end <= buffer+nb_buffer
121 char* buffer; 128 char* buffer;
  129 + // the max size of buffer.
122 int nb_buffer; 130 int nb_buffer;
123 public: 131 public:
124 SrsFastBuffer(); 132 SrsFastBuffer();
125 virtual ~SrsFastBuffer(); 133 virtual ~SrsFastBuffer();
126 public: 134 public:
127 /** 135 /**
128 - * get the length of buffer. empty if zero.  
129 - * @remark assert length() is not negative.  
130 - */  
131 - virtual int length();  
132 - /**  
133 - * get the buffer bytes.  
134 - * @return the bytes, NULL if empty. 136 + * read 1byte from buffer, move to next bytes.
  137 + * @remark assert buffer already grow(1).
135 */ 138 */
136 - virtual char* bytes();  
137 -public: 139 + virtual char read_1byte();
138 /** 140 /**
139 - * erase size of bytes from begin.  
140 - * @param size to erase size of bytes.  
141 - * clear if size greater than or equals to length()  
142 - * @remark ignore size is not positive. 141 + * read a slice in size bytes, move to next bytes.
  142 + * user can use this char* ptr directly, and should never free it.
  143 + * @remark assert buffer already grow(size).
  144 + * @remark the ptr returned maybe invalid after grow(x).
143 */ 145 */
144 - virtual void erase(int size);  
145 -private: 146 + virtual char* read_slice(int size);
146 /** 147 /**
147 - * append specified bytes to buffer.  
148 - * @param size the size of bytes  
149 - * @remark assert size is positive. 148 + * skip some bytes in buffer.
  149 + * @param size the bytes to skip. positive to next; negative to previous.
  150 + * @remark assert buffer already grow(size).
150 */ 151 */
151 - virtual void append(const char* bytes, int size); 152 + virtual void skip(int size);
152 public: 153 public:
153 /** 154 /**
154 * grow buffer to the required size, loop to read from skt to fill. 155 * grow buffer to the required size, loop to read from skt to fill.
@@ -1075,14 +1075,13 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) @@ -1075,14 +1075,13 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
1075 // chunk stream basic header. 1075 // chunk stream basic header.
1076 char fmt = 0; 1076 char fmt = 0;
1077 int cid = 0; 1077 int cid = 0;
1078 - int bh_size = 0;  
1079 - if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { 1078 + if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
1080 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1079 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1081 srs_error("read basic header failed. ret=%d", ret); 1080 srs_error("read basic header failed. ret=%d", ret);
1082 } 1081 }
1083 return ret; 1082 return ret;
1084 } 1083 }
1085 - srs_verbose("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); 1084 + srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
1086 1085
1087 // once we got the chunk message header, 1086 // once we got the chunk message header,
1088 // that is there is a real message in cache, 1087 // that is there is a real message in cache,
@@ -1115,8 +1114,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) @@ -1115,8 +1114,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
1115 } 1114 }
1116 1115
1117 // chunk stream message header 1116 // chunk stream message header
1118 - int mh_size = 0;  
1119 - if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { 1117 + if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
1120 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1118 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1121 srs_error("read message header failed. ret=%d", ret); 1119 srs_error("read message header failed. ret=%d", ret);
1122 } 1120 }
@@ -1129,8 +1127,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) @@ -1129,8 +1127,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
1129 1127
1130 // read msg payload from chunk stream. 1128 // read msg payload from chunk stream.
1131 SrsMessage* msg = NULL; 1129 SrsMessage* msg = NULL;
1132 - int payload_size = 0;  
1133 - if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { 1130 + if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
1134 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1131 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1135 srs_error("read message payload failed. ret=%d", ret); 1132 srs_error("read message payload failed. ret=%d", ret);
1136 } 1133 }
@@ -1203,59 +1200,52 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) @@ -1203,59 +1200,52 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
1203 * Chunk stream IDs with values 64-319 could be represented by both 2- 1200 * Chunk stream IDs with values 64-319 could be represented by both 2-
1204 * byte version and 3-byte version of this field. 1201 * byte version and 3-byte version of this field.
1205 */ 1202 */
1206 -int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) 1203 +int SrsProtocol::read_basic_header(char& fmt, int& cid)
1207 { 1204 {
1208 int ret = ERROR_SUCCESS; 1205 int ret = ERROR_SUCCESS;
1209 1206
1210 - int required_size = 1;  
1211 - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { 1207 + if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) {
1212 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1208 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1213 - srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 1209 + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", 1, ret);
1214 } 1210 }
1215 return ret; 1211 return ret;
1216 } 1212 }
1217 1213
1218 - char* p = in_buffer->bytes();  
1219 -  
1220 - fmt = (*p >> 6) & 0x03;  
1221 - cid = *p & 0x3f;  
1222 - bh_size = 1; 1214 + fmt = in_buffer->read_1byte();
  1215 + cid = fmt & 0x3f;
  1216 + fmt = (fmt >> 6) & 0x03;
1223 1217
1224 // 2-63, 1B chunk header 1218 // 2-63, 1B chunk header
1225 if (cid > 1) { 1219 if (cid > 1) {
1226 - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); 1220 + srs_verbose("basic header parsed. fmt=%d, cid=%d", fmt, cid);
1227 return ret; 1221 return ret;
1228 } 1222 }
1229 1223
1230 // 64-319, 2B chunk header 1224 // 64-319, 2B chunk header
1231 if (cid == 0) { 1225 if (cid == 0) {
1232 - required_size = 2;  
1233 - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { 1226 + if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) {
1234 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1227 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1235 - srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 1228 + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", 1, ret);
1236 } 1229 }
1237 return ret; 1230 return ret;
1238 } 1231 }
1239 1232
1240 cid = 64; 1233 cid = 64;
1241 - cid += (u_int8_t)*(++p);  
1242 - bh_size = 2;  
1243 - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); 1234 + cid += (u_int8_t)in_buffer->read_1byte();
  1235 + srs_verbose("2bytes basic header parsed. fmt=%d, cid=%d", fmt, cid);
1244 // 64-65599, 3B chunk header 1236 // 64-65599, 3B chunk header
1245 } else if (cid == 1) { 1237 } else if (cid == 1) {
1246 - required_size = 3;  
1247 - if ((ret = in_buffer->grow(skt, 3)) != ERROR_SUCCESS) { 1238 + if ((ret = in_buffer->grow(skt, 2)) != ERROR_SUCCESS) {
1248 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1239 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1249 - srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 1240 + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", 2, ret);
1250 } 1241 }
1251 return ret; 1242 return ret;
1252 } 1243 }
1253 1244
1254 cid = 64; 1245 cid = 64;
1255 - cid += (u_int8_t)*(++p);  
1256 - cid += ((u_int8_t)*(++p)) * 256;  
1257 - bh_size = 3;  
1258 - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); 1246 + cid += (u_int8_t)in_buffer->read_1byte();
  1247 + cid += ((u_int8_t)in_buffer->read_1byte()) * 256;
  1248 + srs_verbose("3bytes basic header parsed. fmt=%d, cid=%d", fmt, cid);
1259 } else { 1249 } else {
1260 srs_error("invalid path, impossible basic header."); 1250 srs_error("invalid path, impossible basic header.");
1261 srs_assert(false); 1251 srs_assert(false);
@@ -1276,7 +1266,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -1276,7 +1266,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
1276 * fmt=2, 0x8X 1266 * fmt=2, 0x8X
1277 * fmt=3, 0xCX 1267 * fmt=3, 0xCX
1278 */ 1268 */
1279 -int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size) 1269 +int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt)
1280 { 1270 {
1281 int ret = ERROR_SUCCESS; 1271 int ret = ERROR_SUCCESS;
1282 1272
@@ -1344,17 +1334,15 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1344,17 +1334,15 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1344 1334
1345 // read message header from socket to buffer. 1335 // read message header from socket to buffer.
1346 static char mh_sizes[] = {11, 7, 3, 0}; 1336 static char mh_sizes[] = {11, 7, 3, 0};
1347 - mh_size = mh_sizes[(int)fmt]; 1337 + int mh_size = mh_sizes[(int)fmt];
1348 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); 1338 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
1349 1339
1350 - int required_size = bh_size + mh_size;  
1351 - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { 1340 + if (mh_size > 0 && (ret = in_buffer->grow(skt, mh_size)) != ERROR_SUCCESS) {
1352 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1341 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1353 - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 1342 + srs_error("read %dbytes message header failed. ret=%d", mh_size, ret);
1354 } 1343 }
1355 return ret; 1344 return ret;
1356 } 1345 }
1357 - char* p = in_buffer->bytes() + bh_size;  
1358 1346
1359 /** 1347 /**
1360 * parse the message header. 1348 * parse the message header.
@@ -1370,6 +1358,8 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1370,6 +1358,8 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1370 */ 1358 */
1371 // see also: ngx_rtmp_recv 1359 // see also: ngx_rtmp_recv
1372 if (fmt <= RTMP_FMT_TYPE2) { 1360 if (fmt <= RTMP_FMT_TYPE2) {
  1361 + char* p = in_buffer->read_slice(mh_size);
  1362 +
1373 char* pp = (char*)&chunk->header.timestamp_delta; 1363 char* pp = (char*)&chunk->header.timestamp_delta;
1374 pp[2] = *p++; 1364 pp[2] = *p++;
1375 pp[1] = *p++; 1365 pp[1] = *p++;
@@ -1466,14 +1456,16 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1466,14 +1456,16 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1466 // read extended-timestamp 1456 // read extended-timestamp
1467 if (chunk->extended_timestamp) { 1457 if (chunk->extended_timestamp) {
1468 mh_size += 4; 1458 mh_size += 4;
1469 - required_size = bh_size + mh_size;  
1470 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); 1459 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
1471 - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { 1460 + if ((ret = in_buffer->grow(skt, 4)) != ERROR_SUCCESS) {
1472 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1461 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1473 - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 1462 + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, 4, ret);
1474 } 1463 }
1475 return ret; 1464 return ret;
1476 } 1465 }
  1466 + // the ptr to the slice maybe invalid when grow()
  1467 + // reset the p to get 4bytes slice.
  1468 + char* p = in_buffer->read_slice(4);
1477 1469
1478 u_int32_t timestamp = 0x00; 1470 u_int32_t timestamp = 0x00;
1479 char* pp = (char*)&timestamp; 1471 char* pp = (char*)&timestamp;
@@ -1515,6 +1507,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1515,6 +1507,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1515 */ 1507 */
1516 if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) { 1508 if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) {
1517 mh_size -= 4; 1509 mh_size -= 4;
  1510 + in_buffer->skip(-4);
1518 srs_info("no 4bytes extended timestamp in the continued chunk"); 1511 srs_info("no 4bytes extended timestamp in the continued chunk");
1519 } else { 1512 } else {
1520 chunk->header.timestamp = timestamp; 1513 chunk->header.timestamp = timestamp;
@@ -1557,15 +1550,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -1557,15 +1550,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
1557 return ret; 1550 return ret;
1558 } 1551 }
1559 1552
1560 -int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg) 1553 +int SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsMessage** pmsg)
1561 { 1554 {
1562 int ret = ERROR_SUCCESS; 1555 int ret = ERROR_SUCCESS;
1563 1556
1564 // empty message 1557 // empty message
1565 if (chunk->header.payload_length <= 0) { 1558 if (chunk->header.payload_length <= 0) {
1566 - // need erase the header in buffer.  
1567 - in_buffer->erase(bh_size + mh_size);  
1568 -  
1569 srs_trace("get an empty RTMP " 1559 srs_trace("get an empty RTMP "
1570 "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, 1560 "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type,
1571 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); 1561 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
@@ -1578,7 +1568,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -1578,7 +1568,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
1578 srs_assert(chunk->header.payload_length > 0); 1568 srs_assert(chunk->header.payload_length > 0);
1579 1569
1580 // the chunk payload size. 1570 // the chunk payload size.
1581 - payload_size = chunk->header.payload_length - chunk->msg->size; 1571 + int payload_size = chunk->header.payload_length - chunk->msg->size;
1582 payload_size = srs_min(payload_size, in_chunk_size); 1572 payload_size = srs_min(payload_size, in_chunk_size);
1583 srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", 1573 srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
1584 payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); 1574 payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
@@ -1586,23 +1576,20 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -1586,23 +1576,20 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
1586 // create msg payload if not initialized 1576 // create msg payload if not initialized
1587 if (!chunk->msg->payload) { 1577 if (!chunk->msg->payload) {
1588 chunk->msg->payload = new char[chunk->header.payload_length]; 1578 chunk->msg->payload = new char[chunk->header.payload_length];
1589 - memset(chunk->msg->payload, 0, chunk->header.payload_length);  
1590 - srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); 1579 + srs_verbose("create payload for RTMP message. size=%d", chunk->header.payload_length);
1591 } 1580 }
1592 1581
1593 // read payload to buffer 1582 // read payload to buffer
1594 - int required_size = bh_size + mh_size + payload_size;  
1595 - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { 1583 + if ((ret = in_buffer->grow(skt, payload_size)) != ERROR_SUCCESS) {
1596 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { 1584 if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
1597 - srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); 1585 + srs_error("read payload failed. required_size=%d, ret=%d", payload_size, ret);
1598 } 1586 }
1599 return ret; 1587 return ret;
1600 } 1588 }
1601 - memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->bytes() + bh_size + mh_size, payload_size);  
1602 - in_buffer->erase(bh_size + mh_size + payload_size); 1589 + memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size);
1603 chunk->msg->size += payload_size; 1590 chunk->msg->size += payload_size;
1604 1591
1605 - srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); 1592 + srs_verbose("chunk payload read completed. payload_size=%d", payload_size);
1606 1593
1607 // got entire RTMP message? 1594 // got entire RTMP message?
1608 if (chunk->header.payload_length == chunk->msg->size) { 1595 if (chunk->header.payload_length == chunk->msg->size) {
@@ -434,21 +434,18 @@ private: @@ -434,21 +434,18 @@ private:
434 /** 434 /**
435 * read the chunk basic header(fmt, cid) from chunk stream. 435 * read the chunk basic header(fmt, cid) from chunk stream.
436 * user can discovery a SrsChunkStream by cid. 436 * user can discovery a SrsChunkStream by cid.
437 - * @bh_size return the chunk basic header size, to remove the used bytes when finished.  
438 */ 437 */
439 - virtual int read_basic_header(char& fmt, int& cid, int& bh_size); 438 + virtual int read_basic_header(char& fmt, int& cid);
440 /** 439 /**
441 * read the chunk message header(timestamp, payload_length, message_type, stream_id) 440 * read the chunk message header(timestamp, payload_length, message_type, stream_id)
442 * from chunk stream and save to SrsChunkStream. 441 * from chunk stream and save to SrsChunkStream.
443 - * @mh_size return the chunk message header size, to remove the used bytes when finished.  
444 */ 442 */
445 - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); 443 + virtual int read_message_header(SrsChunkStream* chunk, char fmt);
446 /** 444 /**
447 * read the chunk payload, remove the used bytes in buffer, 445 * read the chunk payload, remove the used bytes in buffer,
448 * if got entire message, set the pmsg. 446 * if got entire message, set the pmsg.
449 - * @payload_size read size in this roundtrip, generally a chunk size or left message size.  
450 */ 447 */
451 - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); 448 + virtual int read_message_payload(SrsChunkStream* chunk, SrsMessage** pmsg);
452 /** 449 /**
453 * when recv message, update the context. 450 * when recv message, update the context.
454 */ 451 */