winlin

decode chunk stream to RTMP message

@@ -46,6 +46,11 @@ char* SrsBuffer::bytes() @@ -46,6 +46,11 @@ char* SrsBuffer::bytes()
46 return &data.at(0); 46 return &data.at(0);
47 } 47 }
48 48
  49 +void SrsBuffer::erase(int size)
  50 +{
  51 + data.erase(data.begin(), data.begin() + size);
  52 +}
  53 +
49 void SrsBuffer::append(char* bytes, int size) 54 void SrsBuffer::append(char* bytes, int size)
50 { 55 {
51 std::vector<char> vec(bytes, bytes + size); 56 std::vector<char> vec(bytes, bytes + size);
@@ -47,9 +47,10 @@ public: @@ -47,9 +47,10 @@ public:
47 SrsBuffer(); 47 SrsBuffer();
48 virtual ~SrsBuffer(); 48 virtual ~SrsBuffer();
49 public: 49 public:
  50 + virtual int size();
50 virtual char* bytes(); 51 virtual char* bytes();
  52 + virtual void erase(int size);
51 private: 53 private:
52 - virtual int size();  
53 virtual void append(char* bytes, int size); 54 virtual void append(char* bytes, int size);
54 public: 55 public:
55 virtual int ensure_buffer_bytes(SrsSocket* skt, int required_size); 56 virtual int ensure_buffer_bytes(SrsSocket* skt, int required_size);
@@ -51,5 +51,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -51,5 +51,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
51 51
52 #define ERROR_RTMP_PLAIN_REQUIRED 300 52 #define ERROR_RTMP_PLAIN_REQUIRED 300
53 #define ERROR_RTMP_CHUNK_START 301 53 #define ERROR_RTMP_CHUNK_START 301
  54 +#define ERROR_RTMP_MSG_INVLIAD_SIZE 302
54 55
55 #endif 56 #endif
@@ -98,6 +98,8 @@ SrsProtocol::SrsProtocol(st_netfd_t client_stfd) @@ -98,6 +98,8 @@ SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
98 stfd = client_stfd; 98 stfd = client_stfd;
99 buffer = new SrsBuffer(); 99 buffer = new SrsBuffer();
100 skt = new SrsSocket(stfd); 100 skt = new SrsSocket(stfd);
  101 +
  102 + in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE;
101 } 103 }
102 104
103 SrsProtocol::~SrsProtocol() 105 SrsProtocol::~SrsProtocol()
@@ -130,6 +132,27 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -130,6 +132,27 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
130 int ret = ERROR_SUCCESS; 132 int ret = ERROR_SUCCESS;
131 133
132 while (true) { 134 while (true) {
  135 + SrsMessage* msg = NULL;
  136 +
  137 + if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
  138 + srs_error("recv interlaced message failed. ret=%d", ret);
  139 + return ret;
  140 + }
  141 +
  142 + if (!msg) {
  143 + continue;
  144 + }
  145 +
  146 + // decode the msg
  147 + }
  148 +
  149 + return ret;
  150 +}
  151 +
  152 +int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
  153 +{
  154 + int ret = ERROR_SUCCESS;
  155 +
133 // chunk stream basic header. 156 // chunk stream basic header.
134 char fmt = 0; 157 char fmt = 0;
135 int cid = 0; 158 int cid = 0;
@@ -148,8 +171,8 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -148,8 +171,8 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
148 srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); 171 srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
149 } else { 172 } else {
150 chunk = chunk_streams[cid]; 173 chunk = chunk_streams[cid];
151 - srs_info("cached chunk stream: fmt=%d, cid=%d, message(type=%d, size=%d, time=%d, sid=%d)",  
152 - chunk->fmt, chunk->cid, chunk->header.message_type, chunk->header.payload_length, 174 + srs_info("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
  175 + chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
153 chunk->header.timestamp, chunk->header.stream_id); 176 chunk->header.timestamp, chunk->header.stream_id);
154 } 177 }
155 178
@@ -160,26 +183,30 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -160,26 +183,30 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
160 return ret; 183 return ret;
161 } 184 }
162 srs_info("read message header success. " 185 srs_info("read message header success. "
163 - "fmt=%d, mh_size=%d, ext_time=%d, message(type=%d, size=%d, time=%d, sid=%d)",  
164 - fmt, mh_size, chunk->extended_timestamp, chunk->header.message_type, 186 + "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
  187 + fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,
165 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); 188 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
166 189
167 // read msg payload from chunk stream. 190 // read msg payload from chunk stream.
168 SrsMessage* msg = NULL; 191 SrsMessage* msg = NULL;
169 - /*int payload_size = 0; 192 + int payload_size = 0;
170 if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { 193 if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
171 srs_error("read message payload failed. ret=%d", ret); 194 srs_error("read message payload failed. ret=%d", ret);
172 return ret; 195 return ret;
173 } 196 }
174 - srs_info("read message payload success. payload_size=%d", payload_size);*/  
175 197
176 // not got an entire RTMP message, try next chunk. 198 // not got an entire RTMP message, try next chunk.
177 if (!msg) { 199 if (!msg) {
178 - continue; 200 + srs_info("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
  201 + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
  202 + chunk->header.timestamp, chunk->header.stream_id);
  203 + return ret;
179 } 204 }
180 205
181 - // decode the msg  
182 - } 206 + *pmsg = msg;
  207 + srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
  208 + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
  209 + chunk->header.timestamp, chunk->header.stream_id);
183 210
184 return ret; 211 return ret;
185 } 212 }
@@ -188,8 +215,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -188,8 +215,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
188 { 215 {
189 int ret = ERROR_SUCCESS; 216 int ret = ERROR_SUCCESS;
190 217
191 - if ((ret = buffer->ensure_buffer_bytes(skt, 1)) != ERROR_SUCCESS) {  
192 - srs_error("read 1bytes basic header failed. ret=%d", ret); 218 + int required_size = 1;
  219 + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
  220 + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
193 return ret; 221 return ret;
194 } 222 }
195 223
@@ -205,8 +233,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -205,8 +233,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
205 } 233 }
206 234
207 if (cid == 0) { 235 if (cid == 0) {
208 - if ((ret = buffer->ensure_buffer_bytes(skt, 2)) != ERROR_SUCCESS) {  
209 - srs_error("read 2bytes basic header failed. ret=%d", ret); 236 + required_size = 2;
  237 + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
  238 + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
210 return ret; 239 return ret;
211 } 240 }
212 241
@@ -215,8 +244,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -215,8 +244,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
215 size = 2; 244 size = 2;
216 srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid); 245 srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
217 } else if (cid == 1) { 246 } else if (cid == 1) {
  247 + required_size = 3;
218 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { 248 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
219 - srs_error("read 3bytes basic header failed. ret=%d", ret); 249 + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
220 return ret; 250 return ret;
221 } 251 }
222 252
@@ -267,24 +297,55 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -267,24 +297,55 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
267 mh_size = mh_sizes[(int)fmt]; 297 mh_size = mh_sizes[(int)fmt];
268 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); 298 srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
269 299
270 - if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) {  
271 - srs_error("read %dbytes message header failed. ret=%d", mh_size, ret); 300 + int required_size = bh_size + mh_size;
  301 + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
  302 + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
272 return ret; 303 return ret;
273 } 304 }
274 char* p = buffer->bytes() + bh_size; 305 char* p = buffer->bytes() + bh_size;
275 306
276 // parse the message header. 307 // parse the message header.
277 // see also: ngx_rtmp_recv 308 // see also: ngx_rtmp_recv
278 - if (fmt <= 2) {  
279 - char* pp = (char*)&chunk->header.timestamp; 309 + if (fmt <= RTMP_FMT_TYPE2) {
  310 + int32_t timestamp_delta;
  311 + char* pp = (char*)&timestamp_delta;
280 pp[2] = *p++; 312 pp[2] = *p++;
281 pp[1] = *p++; 313 pp[1] = *p++;
282 pp[0] = *p++; 314 pp[0] = *p++;
283 pp[3] = 0; 315 pp[3] = 0;
284 316
285 - chunk->extended_timestamp = (chunk->header.timestamp == RTMP_EXTENDED_TIMESTAMP); 317 + if (fmt == RTMP_FMT_TYPE0) {
  318 + // 6.1.2.1. Type 0
  319 + // For a type-0 chunk, the absolute timestamp of the message is sent
  320 + // here.
  321 + chunk->header.timestamp = timestamp_delta;
  322 + } else {
  323 + // 6.1.2.2. Type 1
  324 + // 6.1.2.3. Type 2
  325 + // For a type-1 or type-2 chunk, the difference between the previous
  326 + // chunk's timestamp and the current chunk's timestamp is sent here.
  327 + chunk->header.timestamp += timestamp_delta;
  328 + }
  329 +
  330 + // fmt: 0
  331 + // timestamp: 3 bytes
  332 + // If the timestamp is greater than or equal to 16777215
  333 + // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
  334 + // ‘extended timestamp header’ MUST be present. Otherwise, this value
  335 + // SHOULD be the entire timestamp.
  336 + //
  337 + // fmt: 1 or 2
  338 + // timestamp delta: 3 bytes
  339 + // If the delta is greater than or equal to 16777215 (hexadecimal
  340 + // 0x00ffffff), this value MUST be 16777215, and the ‘extended
  341 + // timestamp header’ MUST be present. Otherwise, this value SHOULD be
  342 + // the entire delta.
  343 + chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
  344 + if (chunk->extended_timestamp) {
  345 + chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
  346 + }
286 347
287 - if (fmt <= 1) { 348 + if (fmt <= RTMP_FMT_TYPE1) {
288 pp = (char*)&chunk->header.payload_length; 349 pp = (char*)&chunk->header.payload_length;
289 pp[2] = *p++; 350 pp[2] = *p++;
290 pp[1] = *p++; 351 pp[1] = *p++;
@@ -318,9 +379,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -318,9 +379,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
318 379
319 if (chunk->extended_timestamp) { 380 if (chunk->extended_timestamp) {
320 mh_size += 4; 381 mh_size += 4;
  382 + required_size = bh_size + mh_size;
321 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); 383 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
322 - if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) {  
323 - srs_error("read %dbytes message header failed. ret=%d", mh_size, ret); 384 + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
  385 + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
324 return ret; 386 return ret;
325 } 387 }
326 388
@@ -332,6 +394,90 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -332,6 +394,90 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
332 srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp); 394 srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);
333 } 395 }
334 396
  397 + // valid message
  398 + if (chunk->header.payload_length < 0) {
  399 + ret = ERROR_RTMP_MSG_INVLIAD_SIZE;
  400 + srs_error("RTMP message size must not be negative. size=%d, ret=%d",
  401 + chunk->header.payload_length, ret);
  402 + return ret;
  403 + }
  404 +
  405 + // copy header to msg
  406 + chunk->msg->header = chunk->header;
  407 +
  408 + return ret;
  409 +}
  410 +
  411 +int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg)
  412 +{
  413 + int ret = ERROR_SUCCESS;
  414 +
  415 + // empty message
  416 + if (chunk->header.payload_length == 0) {
  417 + // need erase the header in buffer.
  418 + buffer->erase(bh_size + mh_size);
  419 +
  420 + srs_warn("get an empty RTMP "
  421 + "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,
  422 + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
  423 +
  424 + return ret;
  425 + }
  426 + srs_assert(chunk->header.payload_length > 0);
  427 +
  428 + // the chunk payload size.
  429 + payload_size = chunk->header.payload_length - chunk->msg->size;
  430 + if (payload_size > in_chunk_size) {
  431 + payload_size = in_chunk_size;
  432 + }
  433 + srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
  434 + payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
  435 +
  436 + // create msg payload if not initialized
  437 + if (!chunk->msg->payload) {
  438 + chunk->msg->payload = new int8_t[chunk->header.payload_length];
  439 + memset(chunk->msg->payload, 0, chunk->header.payload_length);
  440 + srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);
  441 + }
  442 +
  443 + // copy payload from buffer.
  444 + int copy_size = buffer->size() - bh_size - mh_size;
  445 + if (copy_size > payload_size) {
  446 + copy_size = payload_size;
  447 + }
  448 + memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, copy_size);
  449 + buffer->erase(bh_size + mh_size + copy_size);
  450 + chunk->msg->size += copy_size;
  451 +
  452 + // when empty, read the left bytes from socket.
  453 + int left_size = payload_size - copy_size;
  454 + if (left_size > 0) {
  455 + ssize_t nread;
  456 + if ((ret = skt->read_fully(chunk->msg->payload + chunk->msg->size, left_size, &nread)) != ERROR_SUCCESS) {
  457 + srs_error("read chunk payload from socket error. "
  458 + "payload_size=%d, copy_size=%d, left_size=%d, size=%d, msg_size=%d, ret=%d",
  459 + payload_size, copy_size, left_size, chunk->msg->size, chunk->header.payload_length, ret);
  460 + return ret;
  461 + }
  462 + }
  463 +
  464 + srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
  465 +
  466 + // got entire RTMP message?
  467 + if (chunk->header.payload_length == chunk->msg->size) {
  468 + *pmsg = chunk->msg;
  469 + chunk->msg = NULL;
  470 + srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)",
  471 + chunk->header.message_type, chunk->header.payload_length,
  472 + chunk->header.timestamp, chunk->header.stream_id);
  473 + return ret;
  474 + }
  475 +
  476 + srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d",
  477 + chunk->header.message_type, chunk->header.payload_length,
  478 + chunk->header.timestamp, chunk->header.stream_id,
  479 + chunk->msg->size);
  480 +
335 return ret; 481 return ret;
336 } 482 }
337 483
@@ -365,6 +511,7 @@ SrsChunkStream::~SrsChunkStream() @@ -365,6 +511,7 @@ SrsChunkStream::~SrsChunkStream()
365 511
366 SrsMessage::SrsMessage() 512 SrsMessage::SrsMessage()
367 { 513 {
  514 + size = 0;
368 payload = NULL; 515 payload = NULL;
369 } 516 }
370 517
@@ -51,14 +51,18 @@ private: @@ -51,14 +51,18 @@ private:
51 st_netfd_t stfd; 51 st_netfd_t stfd;
52 SrsBuffer* buffer; 52 SrsBuffer* buffer;
53 SrsSocket* skt; 53 SrsSocket* skt;
  54 + int32_t in_chunk_size;
  55 + int32_t out_chunk_size;
54 public: 56 public:
55 SrsProtocol(st_netfd_t client_stfd); 57 SrsProtocol(st_netfd_t client_stfd);
56 virtual ~SrsProtocol(); 58 virtual ~SrsProtocol();
57 public: 59 public:
58 virtual int recv_message(SrsMessage** pmsg); 60 virtual int recv_message(SrsMessage** pmsg);
59 private: 61 private:
  62 + virtual int recv_interlaced_message(SrsMessage** pmsg);
60 virtual int read_basic_header(char& fmt, int& cid, int& size); 63 virtual int read_basic_header(char& fmt, int& cid, int& size);
61 virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); 64 virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
  65 + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
62 }; 66 };
63 67
64 /** 68 /**
@@ -142,6 +146,7 @@ public: @@ -142,6 +146,7 @@ public:
142 * or compressed video data. The payload format and interpretation are 146 * or compressed video data. The payload format and interpretation are
143 * beyond the scope of this document. 147 * beyond the scope of this document.
144 */ 148 */
  149 + int32_t size;
145 int8_t* payload; 150 int8_t* payload;
146 public: 151 public:
147 SrsMessage(); 152 SrsMessage();