winlin

decode message header

@@ -50,5 +50,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -50,5 +50,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
50 #define ERROR_SOCKET_WRITE 209 50 #define ERROR_SOCKET_WRITE 209
51 51
52 #define ERROR_RTMP_PLAIN_REQUIRED 300 52 #define ERROR_RTMP_PLAIN_REQUIRED 300
  53 +#define ERROR_RTMP_CHUNK_START 301
53 54
54 #endif 55 #endif
@@ -28,6 +28,71 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,6 +28,71 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 #include <srs_core_socket.hpp> 28 #include <srs_core_socket.hpp>
29 #include <srs_core_buffer.hpp> 29 #include <srs_core_buffer.hpp>
30 30
  31 +/**
  32 +* 6.1.2. Chunk Message Header
  33 +* There are four different formats for the chunk message header,
  34 +* selected by the "fmt" field in the chunk basic header.
  35 +*/
  36 +// 6.1.2.1. Type 0
  37 +// Chunks of Type 0 are 11 bytes long. This type MUST be used at the
  38 +// start of a chunk stream, and whenever the stream timestamp goes
  39 +// backward (e.g., because of a backward seek).
  40 +#define RTMP_FMT_TYPE0 0
  41 +// 6.1.2.2. Type 1
  42 +// Chunks of Type 1 are 7 bytes long. The message stream ID is not
  43 +// included; this chunk takes the same stream ID as the preceding chunk.
  44 +// Streams with variable-sized messages (for example, many video
  45 +// formats) SHOULD use this format for the first chunk of each new
  46 +// message after the first.
  47 +#define RTMP_FMT_TYPE1 1
  48 +// 6.1.2.3. Type 2
  49 +// Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
  50 +// message length is included; this chunk has the same stream ID and
  51 +// message length as the preceding chunk. Streams with constant-sized
  52 +// messages (for example, some audio and data formats) SHOULD use this
  53 +// format for the first chunk of each message after the first.
  54 +#define RTMP_FMT_TYPE2 2
  55 +// 6.1.2.4. Type 3
  56 +// Chunks of Type 3 have no header. Stream ID, message length and
  57 +// timestamp delta are not present; chunks of this type take values from
  58 +// the preceding chunk. When a single message is split into chunks, all
  59 +// chunks of a message except the first one, SHOULD use this type. Refer
  60 +// to example 2 in section 6.2.2. Stream consisting of messages of
  61 +// exactly the same size, stream ID and spacing in time SHOULD use this
  62 +// type for all chunks after chunk of Type 2. Refer to example 1 in
  63 +// section 6.2.1. If the delta between the first message and the second
  64 +// message is same as the time stamp of first message, then chunk of
  65 +// type 3 would immediately follow the chunk of type 0 as there is no
  66 +// need for a chunk of type 2 to register the delta. If Type 3 chunk
  67 +// follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is
  68 +// the same as the timestamp of Type 0 chunk.
  69 +#define RTMP_FMT_TYPE3 3
  70 +
  71 +/**
  72 +* 6. Chunking
  73 +* The chunk size is configurable. It can be set using a control
  74 +* message(Set Chunk Size) as described in section 7.1. The maximum
  75 +* chunk size can be 65536 bytes and minimum 128 bytes. Larger values
  76 +* reduce CPU usage, but also commit to larger writes that can delay
  77 +* other content on lower bandwidth connections. Smaller chunks are not
  78 +* good for high-bit rate streaming. Chunk size is maintained
  79 +* independently for each direction.
  80 +*/
  81 +#define RTMP_DEFAULT_CHUNK_SIZE 128
  82 +
  83 +/**
  84 +* 6.1. Chunk Format
  85 +* Extended timestamp: 0 or 4 bytes
  86 +* This field MUST be sent when the normal timsestamp is set to
  87 +* 0xffffff, it MUST NOT be sent if the normal timestamp is set to
  88 +* anything else. So for values less than 0xffffff the normal
  89 +* timestamp field SHOULD be used in which case the extended timestamp
  90 +* MUST NOT be present. For values greater than or equal to 0xffffff
  91 +* the normal timestamp field MUST NOT be used and MUST be set to
  92 +* 0xffffff and the extended timestamp MUST be sent.
  93 +*/
  94 +#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
  95 +
31 SrsProtocol::SrsProtocol(st_netfd_t client_stfd) 96 SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
32 { 97 {
33 stfd = client_stfd; 98 stfd = client_stfd;
@@ -68,12 +133,12 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -68,12 +133,12 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
68 // chunk stream basic header. 133 // chunk stream basic header.
69 char fmt = 0; 134 char fmt = 0;
70 int cid = 0; 135 int cid = 0;
71 - int size = 0;  
72 - if ((ret = read_basic_header(fmt, cid, size)) != ERROR_SUCCESS) { 136 + int bh_size = 0;
  137 + if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {
73 srs_error("read basic header failed. ret=%d", ret); 138 srs_error("read basic header failed. ret=%d", ret);
74 return ret; 139 return ret;
75 } 140 }
76 - srs_info("read basic header success. fmt=%d, cid=%d, size=%d", fmt, cid, size); 141 + srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);
77 142
78 // get the cached chunk stream. 143 // get the cached chunk stream.
79 SrsChunkStream* chunk = NULL; 144 SrsChunkStream* chunk = NULL;
@@ -89,11 +154,24 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -89,11 +154,24 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
89 } 154 }
90 155
91 // chunk stream message header 156 // chunk stream message header
92 - SrsMessage* msg = NULL;  
93 - if ((ret = read_message_header(chunk, fmt, &msg)) != ERROR_SUCCESS) { 157 + int mh_size = 0;
  158 + if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {
94 srs_error("read message header failed. ret=%d", ret); 159 srs_error("read message header failed. ret=%d", ret);
95 return ret; 160 return ret;
96 } 161 }
  162 + srs_info("read message header success. "
  163 + "fmt=%d, mh_size=%d, ext_time=%d, message(type=%d, size=%d, time=%d, sid=%d)",
  164 + fmt, mh_size, chunk->extended_timestamp, chunk->header.message_type,
  165 + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
  166 +
  167 + // read msg payload from chunk stream.
  168 + SrsMessage* msg = NULL;
  169 + /*int payload_size = 0;
  170 + if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
  171 + srs_error("read message payload failed. ret=%d", ret);
  172 + return ret;
  173 + }
  174 + srs_info("read message payload success. payload_size=%d", payload_size);*/
97 175
98 // not got an entire RTMP message, try next chunk. 176 // not got an entire RTMP message, try next chunk.
99 if (!msg) { 177 if (!msg) {
@@ -111,6 +189,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -111,6 +189,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
111 int ret = ERROR_SUCCESS; 189 int ret = ERROR_SUCCESS;
112 190
113 if ((ret = buffer->ensure_buffer_bytes(skt, 1)) != ERROR_SUCCESS) { 191 if ((ret = buffer->ensure_buffer_bytes(skt, 1)) != ERROR_SUCCESS) {
  192 + srs_error("read 1bytes basic header failed. ret=%d", ret);
114 return ret; 193 return ret;
115 } 194 }
116 195
@@ -121,19 +200,23 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -121,19 +200,23 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
121 size = 1; 200 size = 1;
122 201
123 if (cid > 1) { 202 if (cid > 1) {
  203 + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
124 return ret; 204 return ret;
125 } 205 }
126 206
127 if (cid == 0) { 207 if (cid == 0) {
128 if ((ret = buffer->ensure_buffer_bytes(skt, 2)) != ERROR_SUCCESS) { 208 if ((ret = buffer->ensure_buffer_bytes(skt, 2)) != ERROR_SUCCESS) {
  209 + srs_error("read 2bytes basic header failed. ret=%d", ret);
129 return ret; 210 return ret;
130 } 211 }
131 212
132 cid = 64; 213 cid = 64;
133 cid += *(++p); 214 cid += *(++p);
134 size = 2; 215 size = 2;
  216 + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
135 } else if (cid == 1) { 217 } else if (cid == 1) {
136 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { 218 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
  219 + srs_error("read 3bytes basic header failed. ret=%d", ret);
137 return ret; 220 return ret;
138 } 221 }
139 222
@@ -141,6 +224,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -141,6 +224,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
141 cid += *(++p); 224 cid += *(++p);
142 cid += *(++p) * 256; 225 cid += *(++p) * 256;
143 size = 3; 226 size = 3;
  227 + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
144 } else { 228 } else {
145 srs_error("invalid path, impossible basic header."); 229 srs_error("invalid path, impossible basic header.");
146 srs_assert(false); 230 srs_assert(false);
@@ -149,9 +233,105 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) @@ -149,9 +233,105 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
149 return ret; 233 return ret;
150 } 234 }
151 235
152 -int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, SrsMessage** pmsg) 236 +int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size)
153 { 237 {
154 int ret = ERROR_SUCCESS; 238 int ret = ERROR_SUCCESS;
  239 +
  240 + // when not exists cached msg, means get an new message,
  241 + // the fmt must be type0 which means new message.
  242 + if (!chunk->msg && fmt != RTMP_FMT_TYPE0) {
  243 + ret = ERROR_RTMP_CHUNK_START;
  244 + srs_error("chunk stream start, "
  245 + "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
  246 + return ret;
  247 + }
  248 +
  249 + // when exists cache msg, means got an partial message,
  250 + // the fmt must not be type0 which means new message.
  251 + if (chunk->msg && fmt == RTMP_FMT_TYPE0) {
  252 + ret = ERROR_RTMP_CHUNK_START;
  253 + srs_error("chunk stream exists, "
  254 + "fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
  255 + return ret;
  256 + }
  257 +
  258 + // create msg when new chunk stream start
  259 + if (!chunk->msg) {
  260 + srs_assert(fmt == RTMP_FMT_TYPE0);
  261 + chunk->msg = new SrsMessage();
  262 + srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
  263 + }
  264 +
  265 + // read message header from socket to buffer.
  266 + static char mh_sizes[] = {11, 7, 1, 0};
  267 + mh_size = mh_sizes[(int)fmt];
  268 + srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
  269 +
  270 + if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) {
  271 + srs_error("read %dbytes message header failed. ret=%d", mh_size, ret);
  272 + return ret;
  273 + }
  274 + char* p = buffer->bytes() + bh_size;
  275 +
  276 + // parse the message header.
  277 + // see also: ngx_rtmp_recv
  278 + if (fmt <= 2) {
  279 + char* pp = (char*)&chunk->header.timestamp;
  280 + pp[2] = *p++;
  281 + pp[1] = *p++;
  282 + pp[0] = *p++;
  283 + pp[3] = 0;
  284 +
  285 + chunk->extended_timestamp = (chunk->header.timestamp == RTMP_EXTENDED_TIMESTAMP);
  286 +
  287 + if (fmt <= 1) {
  288 + pp = (char*)&chunk->header.payload_length;
  289 + pp[2] = *p++;
  290 + pp[1] = *p++;
  291 + pp[0] = *p++;
  292 + pp[3] = 0;
  293 +
  294 + chunk->header.message_type = *p++;
  295 +
  296 + if (fmt == 0) {
  297 + pp = (char*)&chunk->header.stream_id;
  298 + pp[0] = *p++;
  299 + pp[1] = *p++;
  300 + pp[2] = *p++;
  301 + pp[3] = *p++;
  302 + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d",
  303 + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
  304 + chunk->header.message_type, chunk->header.stream_id);
  305 + } else {
  306 + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d",
  307 + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
  308 + chunk->header.message_type);
  309 + }
  310 + } else {
  311 + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d",
  312 + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
  313 + }
  314 + } else {
  315 + srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",
  316 + fmt, mh_size, chunk->extended_timestamp);
  317 + }
  318 +
  319 + if (chunk->extended_timestamp) {
  320 + mh_size += 4;
  321 + srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
  322 + if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) {
  323 + srs_error("read %dbytes message header failed. ret=%d", mh_size, ret);
  324 + return ret;
  325 + }
  326 +
  327 + char* pp = (char*)&chunk->header.timestamp;
  328 + pp[3] = *p++;
  329 + pp[2] = *p++;
  330 + pp[1] = *p++;
  331 + pp[0] = *p++;
  332 + srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);
  333 + }
  334 +
155 return ret; 335 return ret;
156 } 336 }
157 337
@@ -171,6 +351,7 @@ SrsChunkStream::SrsChunkStream(int _cid) @@ -171,6 +351,7 @@ SrsChunkStream::SrsChunkStream(int _cid)
171 { 351 {
172 fmt = 0; 352 fmt = 0;
173 cid = _cid; 353 cid = _cid;
  354 + extended_timestamp = false;
174 msg = NULL; 355 msg = NULL;
175 } 356 }
176 357
@@ -58,7 +58,7 @@ public: @@ -58,7 +58,7 @@ public:
58 virtual int recv_message(SrsMessage** pmsg); 58 virtual int recv_message(SrsMessage** pmsg);
59 private: 59 private:
60 virtual int read_basic_header(char& fmt, int& cid, int& size); 60 virtual int read_basic_header(char& fmt, int& cid, int& size);
61 - virtual int read_message_header(SrsChunkStream* chunk, char fmt, SrsMessage** pmsg); 61 + virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
62 }; 62 };
63 63
64 /** 64 /**
@@ -113,6 +113,10 @@ public: @@ -113,6 +113,10 @@ public:
113 */ 113 */
114 SrsMessageHeader header; 114 SrsMessageHeader header;
115 /** 115 /**
  116 + * whether the chunk message header has extended timestamp.
  117 + */
  118 + bool extended_timestamp;
  119 + /**
116 * partially read message. 120 * partially read message.
117 */ 121 */
118 SrsMessage* msg; 122 SrsMessage* msg;