winlin

expect rtmp packet which decoded from message payload

@@ -29,6 +29,100 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,6 +29,100 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 #include <srs_core_buffer.hpp> 29 #include <srs_core_buffer.hpp>
30 30
31 /** 31 /**
  32 +5. Protocol Control Messages
  33 +RTMP reserves message type IDs 1-7 for protocol control messages.
  34 +These messages contain information needed by the RTM Chunk Stream
  35 +protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
  36 +reserved for usage with RTM Chunk Stream protocol. Protocol messages
  37 +with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
  38 +7 is used between edge server and origin server.
  39 +*/
  40 +#define RTMP_MSG_SetChunkSize 0x01
  41 +#define RTMP_MSG_AbortMessage 0x02
  42 +#define RTMP_MSG_Acknowledgement 0x03
  43 +#define RTMP_MSG_UserControlMessage 0x04
  44 +#define RTMP_MSG_WindowAcknowledgementSize 0x05
  45 +#define RTMP_MSG_SetPeerBandwidth 0x06
  46 +#define RTMP_MSG_EdgeAndOriginServerCommand 0x07
  47 +/**
  48 +* The server sends this event to test whether the client is reachable.
  49 +*
  50 +* Event data is a 4-byte timestamp, representing the local server time when the server dispatched the command.
  51 +* The client responds with PingResponse on receiving PingRequest.
  52 +*/
  53 +#define RTMP_MSG_PCUC_PingRequest 0x06
  54 +
  55 +/**
  56 +* The client sends this event to the server in response to the ping request.
  57 +*
  58 +* The event data is a 4-byte timestamp, which was received with the PingRequest request.
  59 +*/
  60 +#define RTMP_MSG_PCUC_PingResponse 0x07
  61 +/**
  62 +3. Types of messages
  63 +The server and the client send messages over the network to
  64 +communicate with each other. The messages can be of any type which
  65 +includes audio messages, video messages, command messages, shared
  66 +object messages, data messages, and user control messages.
  67 +3.1. Command message
  68 +Command messages carry the AMF-encoded commands between the client
  69 +and the server. These messages have been assigned message type value
  70 +of 20 for AMF0 encoding and message type value of 17 for AMF3
  71 +encoding. These messages are sent to perform some operations like
  72 +connect, createStream, publish, play, pause on the peer. Command
  73 +messages like onstatus, result etc. are used to inform the sender
  74 +about the status of the requested commands. A command message
  75 +consists of command name, transaction ID, and command object that
  76 +contains related parameters. A client or a server can request Remote
  77 +Procedure Calls (RPC) over streams that are communicated using the
  78 +command messages to the peer.
  79 +*/
  80 +#define RTMP_MSG_AMF3CommandMessage 17 // 0x11
  81 +#define RTMP_MSG_AMF0CommandMessage 20 // 0x14
  82 +/**
  83 +3.2. Data message
  84 +The client or the server sends this message to send Metadata or any
  85 +user data to the peer. Metadata includes details about the
  86 +data(audio, video etc.) like creation time, duration, theme and so
  87 +on. These messages have been assigned message type value of 18 for
  88 +AMF0 and message type value of 15 for AMF3.
  89 +*/
  90 +#define RTMP_MSG_AMF0DataMessage 18 // 0x12
  91 +#define RTMP_MSG_AMF3DataMessage 15 // 0x0F
  92 +/**
  93 +3.3. Shared object message
  94 +A shared object is a Flash object (a collection of name value pairs)
  95 +that are in synchronization across multiple clients, instances, and
  96 +so on. The message types kMsgContainer=19 for AMF0 and
  97 +kMsgContainerEx=16 for AMF3 are reserved for shared object events.
  98 +Each message can contain multiple events.
  99 +*/
  100 +#define RTMP_MSG_AMF3SharedObject 16 // 0x10
  101 +#define RTMP_MSG_AMF0SharedObject 19 // 0x13
  102 +/**
  103 +3.4. Audio message
  104 +The client or the server sends this message to send audio data to the
  105 +peer. The message type value of 8 is reserved for audio messages.
  106 +*/
  107 +#define RTMP_MSG_AudioMessage 8 // 0x08
  108 +/* *
  109 +3.5. Video message
  110 +The client or the server sends this message to send video data to the
  111 +peer. The message type value of 9 is reserved for video messages.
  112 +These messages are large and can delay the sending of other type of
  113 +messages. To avoid such a situation, the video message is assigned
  114 +the lowest priority.
  115 +*/
  116 +#define RTMP_MSG_VideoMessage 9 // 0x09
  117 +/**
  118 +3.6. Aggregate message
  119 +An aggregate message is a single message that contains a list of submessages.
  120 +The message type value of 22 is reserved for aggregate
  121 +messages.
  122 +*/
  123 +#define RTMP_MSG_AggregateMessage 22 // 0x16
  124 +
  125 +/**
32 * 6.1.2. Chunk Message Header 126 * 6.1.2. Chunk Message Header
33 * There are four different formats for the chunk message header, 127 * There are four different formats for the chunk message header,
34 * selected by the "fmt" field in the chunk basic header. 128 * selected by the "fmt" field in the chunk basic header.
@@ -93,6 +187,89 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -93,6 +187,89 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
93 */ 187 */
94 #define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF 188 #define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
95 189
  190 +SrsMessageHeader::SrsMessageHeader()
  191 +{
  192 + message_type = 0;
  193 + payload_length = 0;
  194 + timestamp = 0;
  195 + stream_id = 0;
  196 +}
  197 +
  198 +SrsMessageHeader::~SrsMessageHeader()
  199 +{
  200 +}
  201 +
  202 +SrsChunkStream::SrsChunkStream(int _cid)
  203 +{
  204 + fmt = 0;
  205 + cid = _cid;
  206 + extended_timestamp = false;
  207 + msg = NULL;
  208 +}
  209 +
  210 +SrsChunkStream::~SrsChunkStream()
  211 +{
  212 + if (msg) {
  213 + delete msg;
  214 + msg = NULL;
  215 + }
  216 +}
  217 +
  218 +SrsMessage::SrsMessage()
  219 +{
  220 + size = 0;
  221 + payload = NULL;
  222 + decoded_payload = NULL;
  223 +}
  224 +
  225 +SrsMessage::~SrsMessage()
  226 +{
  227 + if (payload) {
  228 + delete[] payload;
  229 + payload = NULL;
  230 + }
  231 +
  232 + if (decoded_payload) {
  233 + delete decoded_payload;
  234 + decoded_payload = NULL;
  235 + }
  236 +}
  237 +
  238 +SrsPacket* SrsMessage::get_packet()
  239 +{
  240 + if (!decoded_payload) {
  241 + srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");
  242 + }
  243 + srs_assert(decoded_payload != NULL);
  244 +
  245 + return decoded_payload;
  246 +}
  247 +
  248 +int SrsMessage::decode_packet()
  249 +{
  250 + int ret = ERROR_SUCCESS;
  251 +
  252 + // TODO: decode packet.
  253 +
  254 + return ret;
  255 +}
  256 +
  257 +SrsPacket::SrsPacket()
  258 +{
  259 +}
  260 +
  261 +SrsPacket::~SrsPacket()
  262 +{
  263 +}
  264 +
  265 +SrsConnectAppPacket::SrsConnectAppPacket()
  266 +{
  267 +}
  268 +
  269 +SrsConnectAppPacket::~SrsConnectAppPacket()
  270 +{
  271 +}
  272 +
96 SrsProtocol::SrsProtocol(st_netfd_t client_stfd) 273 SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
97 { 274 {
98 stfd = client_stfd; 275 stfd = client_stfd;
@@ -129,6 +306,8 @@ SrsProtocol::~SrsProtocol() @@ -129,6 +306,8 @@ SrsProtocol::~SrsProtocol()
129 306
130 int SrsProtocol::recv_message(SrsMessage** pmsg) 307 int SrsProtocol::recv_message(SrsMessage** pmsg)
131 { 308 {
  309 + *pmsg = NULL;
  310 +
132 int ret = ERROR_SUCCESS; 311 int ret = ERROR_SUCCESS;
133 312
134 while (true) { 313 while (true) {
@@ -143,7 +322,9 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -143,7 +322,9 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
143 continue; 322 continue;
144 } 323 }
145 324
146 - // decode the msg 325 + // return the msg with raw/undecoded payload
  326 + *pmsg = msg;
  327 + break;
147 } 328 }
148 329
149 return ret; 330 return ret;
@@ -440,26 +621,15 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -440,26 +621,15 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
440 srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); 621 srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);
441 } 622 }
442 623
443 - // copy payload from buffer.  
444 - int copy_size = buffer->size() - bh_size - mh_size;  
445 - if (copy_size > payload_size) {  
446 - copy_size = payload_size;  
447 - }  
448 - memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, copy_size);  
449 - buffer->erase(bh_size + mh_size + copy_size);  
450 - chunk->msg->size += copy_size;  
451 -  
452 - // when empty, read the left bytes from socket.  
453 - int left_size = payload_size - copy_size;  
454 - if (left_size > 0) {  
455 - ssize_t nread;  
456 - if ((ret = skt->read_fully(chunk->msg->payload + chunk->msg->size, left_size, &nread)) != ERROR_SUCCESS) {  
457 - srs_error("read chunk payload from socket error. "  
458 - "payload_size=%d, copy_size=%d, left_size=%d, size=%d, msg_size=%d, ret=%d",  
459 - payload_size, copy_size, left_size, chunk->msg->size, chunk->header.payload_length, ret);  
460 - return ret;  
461 - } 624 + // read payload to buffer
  625 + int required_size = bh_size + mh_size + payload_size;
  626 + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
  627 + srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
  628 + return ret;
462 } 629 }
  630 + memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
  631 + buffer->erase(bh_size + mh_size + payload_size);
  632 + chunk->msg->size += payload_size;
463 633
464 srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); 634 srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
465 635
@@ -481,45 +651,3 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -481,45 +651,3 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
481 return ret; 651 return ret;
482 } 652 }
483 653
484 -SrsMessageHeader::SrsMessageHeader()  
485 -{  
486 - message_type = 0;  
487 - payload_length = 0;  
488 - timestamp = 0;  
489 - stream_id = 0;  
490 -}  
491 -  
492 -SrsMessageHeader::~SrsMessageHeader()  
493 -{  
494 -}  
495 -  
496 -SrsChunkStream::SrsChunkStream(int _cid)  
497 -{  
498 - fmt = 0;  
499 - cid = _cid;  
500 - extended_timestamp = false;  
501 - msg = NULL;  
502 -}  
503 -  
504 -SrsChunkStream::~SrsChunkStream()  
505 -{  
506 - if (msg) {  
507 - delete msg;  
508 - msg = NULL;  
509 - }  
510 -}  
511 -  
512 -SrsMessage::SrsMessage()  
513 -{  
514 - size = 0;  
515 - payload = NULL;  
516 -}  
517 -  
518 -SrsMessage::~SrsMessage()  
519 -{  
520 - if (payload) {  
521 - delete[] payload;  
522 - payload = NULL;  
523 - }  
524 -}  
525 -  
@@ -34,38 +34,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,38 +34,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 34
35 #include <st.h> 35 #include <st.h>
36 36
  37 +#include <srs_core_log.hpp>
  38 +#include <srs_core_error.hpp>
  39 +
37 class SrsSocket; 40 class SrsSocket;
38 class SrsBuffer; 41 class SrsBuffer;
  42 +class SrsPacket;
39 class SrsMessage; 43 class SrsMessage;
40 class SrsChunkStream; 44 class SrsChunkStream;
41 45
42 /** 46 /**
43 -* the protocol provides the rtmp-message-protocol services,  
44 -* to recv RTMP message from RTMP chunk stream,  
45 -* and to send out RTMP message over RTMP chunk stream.  
46 -*/  
47 -class SrsProtocol  
48 -{  
49 -private:  
50 - std::map<int, SrsChunkStream*> chunk_streams;  
51 - st_netfd_t stfd;  
52 - SrsBuffer* buffer;  
53 - SrsSocket* skt;  
54 - int32_t in_chunk_size;  
55 - int32_t out_chunk_size;  
56 -public:  
57 - SrsProtocol(st_netfd_t client_stfd);  
58 - virtual ~SrsProtocol();  
59 -public:  
60 - virtual int recv_message(SrsMessage** pmsg);  
61 -private:  
62 - virtual int recv_interlaced_message(SrsMessage** pmsg);  
63 - virtual int read_basic_header(char& fmt, int& cid, int& size);  
64 - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);  
65 - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);  
66 -};  
67 -  
68 -/**  
69 * 4.1. Message Header 47 * 4.1. Message Header
70 */ 48 */
71 struct SrsMessageHeader 49 struct SrsMessageHeader
@@ -127,7 +105,6 @@ public: @@ -127,7 +105,6 @@ public:
127 public: 105 public:
128 SrsChunkStream(int _cid); 106 SrsChunkStream(int _cid);
129 virtual ~SrsChunkStream(); 107 virtual ~SrsChunkStream();
130 -public:  
131 }; 108 };
132 109
133 /** 110 /**
@@ -148,10 +125,114 @@ public: @@ -148,10 +125,114 @@ public:
148 */ 125 */
149 int32_t size; 126 int32_t size;
150 int8_t* payload; 127 int8_t* payload;
  128 +// decoded message payload.
  129 +private:
  130 + SrsPacket* decoded_payload;
  131 +public:
  132 + /**
  133 + * get the decoded packet,
  134 + * not all packets need to decode, for video/audio packet,
  135 + * passthrough to peer are ok.
  136 + * @remark, user must invoke decode_packet first.
  137 + */
  138 + virtual SrsPacket* get_packet();
  139 + virtual int decode_packet();
151 public: 140 public:
152 SrsMessage(); 141 SrsMessage();
153 virtual ~SrsMessage(); 142 virtual ~SrsMessage();
  143 +};
  144 +
  145 +/**
  146 +* the decoded message payload.
  147 +*/
  148 +class SrsPacket
  149 +{
  150 +public:
  151 + SrsPacket();
  152 + virtual ~SrsPacket();
  153 +};
  154 +
  155 +class SrsConnectAppPacket : public SrsPacket
  156 +{
  157 +public:
  158 + SrsConnectAppPacket();
  159 + virtual ~SrsConnectAppPacket();
  160 +};
  161 +
  162 +/**
  163 +* the protocol provides the rtmp-message-protocol services,
  164 +* to recv RTMP message from RTMP chunk stream,
  165 +* and to send out RTMP message over RTMP chunk stream.
  166 +*/
  167 +class SrsProtocol
  168 +{
  169 +private:
  170 + std::map<int, SrsChunkStream*> chunk_streams;
  171 + st_netfd_t stfd;
  172 + SrsBuffer* buffer;
  173 + SrsSocket* skt;
  174 + int32_t in_chunk_size;
  175 + int32_t out_chunk_size;
  176 +public:
  177 + SrsProtocol(st_netfd_t client_stfd);
  178 + virtual ~SrsProtocol();
  179 +public:
  180 + /**
  181 + * recv a message with raw/undecoded payload from peer.
  182 + * the payload is not decoded, use expect_message<T> if requires specifies message.
  183 + * @pmsg, user must free it. NULL if not success.
  184 + * @remark, only when success, user can use and must free the pmsg.
  185 + */
  186 + virtual int recv_message(SrsMessage** pmsg);
154 public: 187 public:
  188 + /**
  189 + * expect a specified message, drop others util got specified one.
  190 + * @pmsg, user must free it. NULL if not success.
  191 + * @ppacket, store in the pmsg, user must never free it. NULL if not success.
  192 + * @remark, only when success, user can use and must free the pmsg/ppacket.
  193 + */
  194 + template<class T>
  195 + int expect_message(SrsMessage** pmsg, T** ppacket)
  196 + {
  197 + *pmsg = NULL;
  198 + *ppacket = NULL;
  199 +
  200 + int ret = ERROR_SUCCESS;
  201 +
  202 + while (true) {
  203 + SrsMessage* msg = NULL;
  204 + if ((ret = recv_message(&msg)) != ERROR_SUCCESS) {
  205 + srs_error("recv message failed. ret=%d", ret);
  206 + return ret;
  207 + }
  208 + srs_verbose("recv message success.");
  209 +
  210 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  211 + delete msg;
  212 + srs_error("decode message failed. ret=%d", ret);
  213 + return ret;
  214 + }
  215 +
  216 + T* pkt = dynamic_cast<T*>(msg->get_packet());
  217 + if (!pkt) {
  218 + delete msg;
  219 + srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).",
  220 + msg->header.message_type, msg->header.payload_length,
  221 + msg->header.timestamp, msg->header.stream_id);
  222 + continue;
  223 + }
  224 +
  225 + *pmsg = msg;
  226 + *ppacket = pkt;
  227 + }
  228 +
  229 + return ret;
  230 + }
  231 +private:
  232 + virtual int recv_interlaced_message(SrsMessage** pmsg);
  233 + virtual int read_basic_header(char& fmt, int& cid, int& size);
  234 + virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
  235 + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
155 }; 236 };
156 237
157 #endif 238 #endif
@@ -94,7 +94,12 @@ int SrsRtmp::connect_app(SrsApp** papp) @@ -94,7 +94,12 @@ int SrsRtmp::connect_app(SrsApp** papp)
94 int ret = ERROR_SUCCESS; 94 int ret = ERROR_SUCCESS;
95 95
96 SrsMessage* msg = NULL; 96 SrsMessage* msg = NULL;
97 - protocol->recv_message(&msg); 97 + SrsConnectAppPacket* pkt = NULL;
  98 + if ((ret = protocol->expect_message<SrsConnectAppPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
  99 + srs_error("expect connect app message failed. ret=%d", ret);
  100 + return ret;
  101 + }
  102 + SrsAutoFree(SrsMessage, msg, false);
98 103
99 return ret; 104 return ret;
100 } 105 }