winlin

decode the amf0 command message: connect.

@@ -22,3 +22,69 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -22,3 +22,69 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */ 22 */
23 23
24 #include <srs_core_amf0.hpp> 24 #include <srs_core_amf0.hpp>
  25 +
  26 +#include <srs_core_log.hpp>
  27 +#include <srs_core_stream.hpp>
  28 +
  29 +// AMF0 marker
  30 +#define RTMP_AMF0_Number 0x00
  31 +#define RTMP_AMF0_Boolean 0x01
  32 +#define RTMP_AMF0_String 0x02
  33 +#define RTMP_AMF0_Object 0x03
  34 +#define RTMP_AMF0_MovieClip 0x04 // reserved, not supported
  35 +#define RTMP_AMF0_Null 0x05
  36 +#define RTMP_AMF0_Undefined 0x06
  37 +#define RTMP_AMF0_Reference 0x07
  38 +#define RTMP_AMF0_EcmaArray 0x08
  39 +#define RTMP_AMF0_ObjectEnd 0x09
  40 +#define RTMP_AMF0_StrictArray 0x0A
  41 +#define RTMP_AMF0_Date 0x0B
  42 +#define RTMP_AMF0_LongString 0x0C
  43 +#define RTMP_AMF0_UnSupported 0x0D
  44 +#define RTMP_AMF0_RecordSet 0x0E // reserved, not supported
  45 +#define RTMP_AMF0_XmlDocument 0x0F
  46 +#define RTMP_AMF0_TypedObject 0x10
  47 +// AVM+ object is the AMF3 object.
  48 +#define RTMP_AMF0_AVMplusObject 0x11
  49 +// origin array whos data takes the same form as LengthValueBytes
  50 +#define RTMP_AMF0_OriginStrictArray 0x20
  51 +
  52 +std::string srs_amf0_read_string(SrsStream* stream)
  53 +{
  54 + std::string str;
  55 +
  56 + // marker
  57 + if (!stream->require(1)) {
  58 + return str;
  59 + }
  60 +
  61 + char marker = stream->read_char();
  62 + if (marker != RTMP_AMF0_String) {
  63 + return str;
  64 + }
  65 +
  66 + // len
  67 + if (!stream->require(2)) {
  68 + return str;
  69 + }
  70 + int16_t len = stream->read_2bytes();
  71 +
  72 + // data
  73 + if (!stream->require(len)) {
  74 + return str;
  75 + }
  76 + str = stream->read_string(len);
  77 +
  78 + // support utf8-1 only
  79 + // 1.3.1 Strings and UTF-8
  80 + // UTF8-1 = %x00-7F
  81 + for (int i = 0; i < len; i++) {
  82 + char ch = *(str.data() + i);
  83 + if ((ch & 0x80) != 0) {
  84 + srs_warn("only support utf8-1, 0x00-0x7F, actual is %#x", (int)ch);
  85 + return "";
  86 + }
  87 + }
  88 +
  89 + return str;
  90 +}
@@ -30,4 +30,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,4 +30,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
  33 +#include <string>
  34 +
  35 +class SrsStream;
  36 +
  37 +/**
  38 +* read amf0 string from stream.
  39 +*/
  40 +extern std::string srs_amf0_read_string(SrsStream* stream);
  41 +
33 #endif 42 #endif
@@ -62,6 +62,8 @@ int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size) @@ -62,6 +62,8 @@ int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size)
62 { 62 {
63 int ret = ERROR_SUCCESS; 63 int ret = ERROR_SUCCESS;
64 64
  65 + srs_assert(required_size >= 0);
  66 +
65 while (size() < required_size) { 67 while (size() < required_size) {
66 char buffer[SOCKET_READ_SIZE]; 68 char buffer[SOCKET_READ_SIZE];
67 69
@@ -52,6 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -52,6 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52 #define ERROR_RTMP_PLAIN_REQUIRED 300 52 #define ERROR_RTMP_PLAIN_REQUIRED 300
53 #define ERROR_RTMP_CHUNK_START 301 53 #define ERROR_RTMP_CHUNK_START 301
54 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302 54 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302
  55 +#define ERROR_RTMP_AMF0_DECODE 303
55 56
56 #define ERROR_SYSTEM_STREAM_INIT 400 57 #define ERROR_SYSTEM_STREAM_INIT 400
57 58
@@ -24,9 +24,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,9 +24,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_core_protocol.hpp> 24 #include <srs_core_protocol.hpp>
25 25
26 #include <srs_core_log.hpp> 26 #include <srs_core_log.hpp>
  27 +#include <srs_core_amf0.hpp>
27 #include <srs_core_error.hpp> 28 #include <srs_core_error.hpp>
28 #include <srs_core_socket.hpp> 29 #include <srs_core_socket.hpp>
29 #include <srs_core_buffer.hpp> 30 #include <srs_core_buffer.hpp>
  31 +#include <srs_core_stream.hpp>
30 32
31 /** 33 /**
32 5. Protocol Control Messages 34 5. Protocol Control Messages
@@ -187,6 +189,11 @@ messages. @@ -187,6 +189,11 @@ messages.
187 */ 189 */
188 #define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF 190 #define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
189 191
  192 +/**
  193 +* amf0 command message, command name: "connect"
  194 +*/
  195 +#define RTMP_AMF0_COMMAND_CONNECT "connect"
  196 +
190 SrsMessageHeader::SrsMessageHeader() 197 SrsMessageHeader::SrsMessageHeader()
191 { 198 {
192 message_type = 0; 199 message_type = 0;
@@ -218,6 +225,7 @@ SrsChunkStream::~SrsChunkStream() @@ -218,6 +225,7 @@ SrsChunkStream::~SrsChunkStream()
218 SrsMessage::SrsMessage() 225 SrsMessage::SrsMessage()
219 { 226 {
220 size = 0; 227 size = 0;
  228 + stream = NULL;
221 payload = NULL; 229 payload = NULL;
222 decoded_payload = NULL; 230 decoded_payload = NULL;
223 } 231 }
@@ -233,6 +241,11 @@ SrsMessage::~SrsMessage() @@ -233,6 +241,11 @@ SrsMessage::~SrsMessage()
233 delete decoded_payload; 241 delete decoded_payload;
234 decoded_payload = NULL; 242 decoded_payload = NULL;
235 } 243 }
  244 +
  245 + if (stream) {
  246 + delete stream;
  247 + stream = NULL;
  248 + }
236 } 249 }
237 250
238 SrsPacket* SrsMessage::get_packet() 251 SrsPacket* SrsMessage::get_packet()
@@ -249,7 +262,44 @@ int SrsMessage::decode_packet() @@ -249,7 +262,44 @@ int SrsMessage::decode_packet()
249 { 262 {
250 int ret = ERROR_SUCCESS; 263 int ret = ERROR_SUCCESS;
251 264
252 - // TODO: decode packet. 265 + srs_assert(payload != NULL);
  266 + srs_assert(size > 0);
  267 +
  268 + if (!stream) {
  269 + srs_verbose("create decode stream for message.");
  270 + stream = new SrsStream();
  271 + }
  272 +
  273 + if (header.message_type == RTMP_MSG_AMF0CommandMessage) {
  274 + srs_verbose("start to decode AMF0 command message.");
  275 +
  276 + // amf0 command message.
  277 + // need to read the command name.
  278 + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {
  279 + srs_error("initialize stream failed. ret=%d", ret);
  280 + return ret;
  281 + }
  282 + srs_verbose("decode stream initialized success");
  283 +
  284 + std::string command = srs_amf0_read_string(stream);
  285 + srs_verbose("AMF0 command message, command_name=%s", command.c_str());
  286 +
  287 + stream->reset();
  288 + if (command == RTMP_AMF0_COMMAND_CONNECT) {
  289 + srs_info("decode the AMF0 command(connect vhost/app message).");
  290 + decoded_payload = new SrsConnectAppPacket();
  291 + return decoded_payload->decode(stream);
  292 + }
  293 +
  294 + // default packet to drop message.
  295 + srs_trace("drop the AMF0 command message, command_name=%s", command.c_str());
  296 + decoded_payload = new SrsPacket();
  297 + return ret;
  298 + }
  299 +
  300 + // default packet to drop message.
  301 + srs_trace("drop the unknown message, type=%d", header.message_type);
  302 + decoded_payload = new SrsPacket();
253 303
254 return ret; 304 return ret;
255 } 305 }
@@ -262,6 +312,12 @@ SrsPacket::~SrsPacket() @@ -262,6 +312,12 @@ SrsPacket::~SrsPacket()
262 { 312 {
263 } 313 }
264 314
  315 +int SrsPacket::decode(SrsStream* /*stream*/)
  316 +{
  317 + int ret = ERROR_SUCCESS;
  318 + return ret;
  319 +}
  320 +
265 SrsConnectAppPacket::SrsConnectAppPacket() 321 SrsConnectAppPacket::SrsConnectAppPacket()
266 { 322 {
267 } 323 }
@@ -270,6 +326,24 @@ SrsConnectAppPacket::~SrsConnectAppPacket() @@ -270,6 +326,24 @@ SrsConnectAppPacket::~SrsConnectAppPacket()
270 { 326 {
271 } 327 }
272 328
  329 +int SrsConnectAppPacket::decode(SrsStream* stream)
  330 +{
  331 + int ret = ERROR_SUCCESS;
  332 +
  333 + if ((ret = super::decode(stream)) != ERROR_SUCCESS) {
  334 + return ret;
  335 + }
  336 +
  337 + command_name = srs_amf0_read_string(stream);
  338 + if (command_name.empty()) {
  339 + ret = ERROR_RTMP_AMF0_DECODE;
  340 + srs_error("amf0 decode connect command_name failed. ret=%d", ret);
  341 + return ret;
  342 + }
  343 +
  344 + return ret;
  345 +}
  346 +
273 SrsProtocol::SrsProtocol(st_netfd_t client_stfd) 347 SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
274 { 348 {
275 stfd = client_stfd; 349 stfd = client_stfd;
@@ -317,12 +391,21 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -317,12 +391,21 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
317 srs_error("recv interlaced message failed. ret=%d", ret); 391 srs_error("recv interlaced message failed. ret=%d", ret);
318 return ret; 392 return ret;
319 } 393 }
  394 + srs_verbose("entire msg received");
320 395
321 if (!msg) { 396 if (!msg) {
322 continue; 397 continue;
323 } 398 }
324 399
325 - // return the msg with raw/undecoded payload 400 + if (msg->size <= 0 || msg->header.payload_length <= 0) {
  401 + srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",
  402 + msg->header.message_type, msg->header.payload_length,
  403 + msg->header.timestamp, msg->header.stream_id);
  404 + delete msg;
  405 + continue;
  406 + }
  407 +
  408 + srs_verbose("get a msg with raw/undecoded payload");
326 *pmsg = msg; 409 *pmsg = msg;
327 break; 410 break;
328 } 411 }
@@ -598,10 +681,13 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -598,10 +681,13 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
598 // need erase the header in buffer. 681 // need erase the header in buffer.
599 buffer->erase(bh_size + mh_size); 682 buffer->erase(bh_size + mh_size);
600 683
601 - srs_warn("get an empty RTMP " 684 + srs_trace("get an empty RTMP "
602 "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type, 685 "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,
603 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); 686 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
604 687
  688 + *pmsg = chunk->msg;
  689 + chunk->msg = NULL;
  690 +
605 return ret; 691 return ret;
606 } 692 }
607 srs_assert(chunk->header.payload_length > 0); 693 srs_assert(chunk->header.payload_length > 0);
@@ -31,6 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,6 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
33 #include <map> 33 #include <map>
  34 +#include <string>
34 35
35 #include <st.h> 36 #include <st.h>
36 37
@@ -40,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -40,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
40 class SrsSocket; 41 class SrsSocket;
41 class SrsBuffer; 42 class SrsBuffer;
42 class SrsPacket; 43 class SrsPacket;
  44 +class SrsStream;
43 class SrsMessage; 45 class SrsMessage;
44 class SrsChunkStream; 46 class SrsChunkStream;
45 47
@@ -127,6 +129,7 @@ public: @@ -127,6 +129,7 @@ public:
127 int8_t* payload; 129 int8_t* payload;
128 // decoded message payload. 130 // decoded message payload.
129 private: 131 private:
  132 + SrsStream* stream;
130 SrsPacket* decoded_payload; 133 SrsPacket* decoded_payload;
131 public: 134 public:
132 /** 135 /**
@@ -150,13 +153,21 @@ class SrsPacket @@ -150,13 +153,21 @@ class SrsPacket
150 public: 153 public:
151 SrsPacket(); 154 SrsPacket();
152 virtual ~SrsPacket(); 155 virtual ~SrsPacket();
  156 +public:
  157 + virtual int decode(SrsStream* stream);
153 }; 158 };
154 159
155 class SrsConnectAppPacket : public SrsPacket 160 class SrsConnectAppPacket : public SrsPacket
156 { 161 {
  162 +private:
  163 + typedef SrsPacket super;
  164 +private:
  165 + std::string command_name;
157 public: 166 public:
158 SrsConnectAppPacket(); 167 SrsConnectAppPacket();
159 virtual ~SrsConnectAppPacket(); 168 virtual ~SrsConnectAppPacket();
  169 +public:
  170 + virtual int decode(SrsStream* stream);
160 }; 171 };
161 172
162 /** 173 /**
@@ -28,7 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,7 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 28
29 SrsStream::SrsStream() 29 SrsStream::SrsStream()
30 { 30 {
31 - bytes = NULL; 31 + p = bytes = NULL;
32 size = 0; 32 size = 0;
33 } 33 }
34 34
@@ -53,8 +53,54 @@ int SrsStream::initialize(char* _bytes, int _size) @@ -53,8 +53,54 @@ int SrsStream::initialize(char* _bytes, int _size)
53 } 53 }
54 54
55 size = _size; 55 size = _size;
56 - bytes = _bytes; 56 + p = bytes = _bytes;
57 57
58 return ret; 58 return ret;
59 } 59 }
60 60
  61 +void SrsStream::reset()
  62 +{
  63 + p = bytes;
  64 +}
  65 +
  66 +bool SrsStream::empty()
  67 +{
  68 + return !p || !bytes || (p >= bytes + size);
  69 +}
  70 +
  71 +bool SrsStream::require(int required_size)
  72 +{
  73 + return !empty() && (required_size < bytes + size - p);
  74 +}
  75 +
  76 +char SrsStream::read_char()
  77 +{
  78 + srs_assert(require(1));
  79 +
  80 + return *p++;
  81 +}
  82 +
  83 +int16_t SrsStream::read_2bytes()
  84 +{
  85 + srs_assert(require(2));
  86 +
  87 + int16_t value;
  88 + pp = (char*)&value;
  89 + pp[1] = *p++;
  90 + pp[0] = *p++;
  91 +
  92 + return value;
  93 +}
  94 +
  95 +std::string SrsStream::read_string(int len)
  96 +{
  97 + srs_assert(require(len));
  98 +
  99 + std::string value;
  100 + value.append(p, len);
  101 +
  102 + p += len;
  103 +
  104 + return value;
  105 +}
  106 +
@@ -30,9 +30,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,9 +30,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
  33 +#include <sys/types.h>
  34 +#include <string>
  35 +
33 class SrsStream 36 class SrsStream
34 { 37 {
35 protected: 38 protected:
  39 + char* p;
  40 + char* pp;
36 char* bytes; 41 char* bytes;
37 int size; 42 int size;
38 public: 43 public:
@@ -46,6 +51,24 @@ public: @@ -46,6 +51,24 @@ public:
46 * @remark, stream never free the _bytes, user must free it. 51 * @remark, stream never free the _bytes, user must free it.
47 */ 52 */
48 virtual int initialize(char* _bytes, int _size); 53 virtual int initialize(char* _bytes, int _size);
  54 + /**
  55 + * reset the position to beginning.
  56 + */
  57 + virtual void reset();
  58 + /**
  59 + * whether stream is empty.
  60 + * if empty, never read or write.
  61 + */
  62 + virtual bool empty();
  63 + /**
  64 + * whether required size is ok.
  65 + * @return true if stream can read/write specified required_size bytes.
  66 + */
  67 + virtual bool require(int required_size);
  68 +public:
  69 + virtual char read_char();
  70 + virtual int16_t read_2bytes();
  71 + virtual std::string read_string(int len);
49 }; 72 };
50 73
51 #endif 74 #endif