winlin

refine rtmp client/server, add comments.

... ... @@ -708,11 +708,6 @@ SrsRtmpServer::~SrsRtmpServer()
srs_freep(hs_bytes);
}
SrsProtocol* SrsRtmpServer::get_protocol()
{
return protocol;
}
void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
{
protocol->set_recv_timeout(timeout_us);
... ...
... ... @@ -172,33 +172,92 @@ protected:
public:
SrsRtmpClient(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpClient();
// protocol methods proxy
public:
/**
* set the recv timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_recv_timeout(int64_t timeout_us);
/**
* set the send timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_send_timeout(int64_t timeout_us);
/**
* get recv/send bytes.
*/
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
/**
* recv a RTMP message, which is bytes oriented.
* user can use decode_message to get the decoded RTMP packet.
* @param pmsg, set the received message,
* always NULL if error,
* NULL for unknown packet but return success.
* never NULL if decode success.
* @remark, drop message when msg is empty or payload length is empty.
*/
virtual int recv_message(SrsMessage** pmsg);
/**
* decode bytes oriented RTMP message to RTMP packet,
* @param ppacket, output decoded packet,
* always NULL if error, never NULL if success.
* @return error when unknown packet, error when decode failed.
*/
virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket);
/**
* send the RTMP message and always free it.
* user must never free or use the msg after this method,
* for it will always free the msg.
* @param msg, the msg to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_message(SrsMessage* msg, int stream_id);
/**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,
* for it will always free the packet.
* @param packet, the packet to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_packet(SrsPacket* packet, int stream_id);
public:
// try complex, then simple handshake.
/**
* handshake with server, try complex, then simple handshake.
*/
virtual int handshake();
// only use simple handshake
/**
* only use simple handshake
*/
virtual int simple_handshake();
// only use complex handshake
/**
* only use complex handshake
*/
virtual int complex_handshake();
// set req to use the original request of client:
// pageUrl and swfUrl for refer antisuck.
// args for edge to origin traverse auth, @see SrsRequest.args
/**
* set req to use the original request of client:
* pageUrl and swfUrl for refer antisuck.
* args for edge to origin traverse auth, @see SrsRequest.args
*/
virtual int connect_app(std::string app, std::string tc_url, SrsRequest* req=NULL);
/**
* create a stream, then play/publish data over this stream.
*/
virtual int create_stream(int& stream_id);
/**
* start play stream.
*/
virtual int play(std::string stream, int stream_id);
// flash publish schema:
// connect-app => create-stream => flash-publish
/**
* start publish stream. use flash publish workflow:
* connect-app => create-stream => flash-publish
*/
virtual int publish(std::string stream, int stream_id);
// FMLE publish schema:
// connect-app => FMLE publish
/**
* start publish stream. use FMLE publish workflow:
* connect-app => FMLE publish
*/
virtual int fmle_publish(std::string stream, int& stream_id);
public:
/**
... ... @@ -237,21 +296,70 @@ private:
public:
SrsRtmpServer(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpServer();
// protocol methods proxy
public:
virtual SrsProtocol* get_protocol();
/**
* set/get the recv timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
/**
* set/get the send timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_send_timeout();
/**
* get recv/send bytes.
*/
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
/**
* recv a RTMP message, which is bytes oriented.
* user can use decode_message to get the decoded RTMP packet.
* @param pmsg, set the received message,
* always NULL if error,
* NULL for unknown packet but return success.
* never NULL if decode success.
* @remark, drop message when msg is empty or payload length is empty.
*/
virtual int recv_message(SrsMessage** pmsg);
/**
* decode bytes oriented RTMP message to RTMP packet,
* @param ppacket, output decoded packet,
* always NULL if error, never NULL if success.
* @return error when unknown packet, error when decode failed.
*/
virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket);
/**
* send the RTMP message and always free it.
* user must never free or use the msg after this method,
* for it will always free the msg.
* @param msg, the msg to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_message(SrsMessage* msg, int stream_id);
/**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,
* for it will always free the packet.
* @param packet, the packet to send out, never be NULL.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_packet(SrsPacket* packet, int stream_id);
public:
/**
* handshake with client, try complex then simple.
*/
virtual int handshake();
/**
* do connect app with client, to discovery tcUrl.
*/
virtual int connect_app(SrsRequest* req);
/**
* set ack size to client, client will send ack-size for each ack window
*/
virtual int set_window_ack_size(int ack_size);
/**
* @type: The sender can mark this message hard (0), soft (1), or dynamic (2)
... ... @@ -262,7 +370,13 @@ public:
* @param server_ip the ip of server.
*/
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL);
/**
* reject the connect app request.
*/
virtual void response_connect_reject(SrsRequest* req, const char* desc);
/**
* response client the onBWDone message.
*/
virtual int on_bw_done();
/**
* recv some message to identify the client.
... ...
... ... @@ -5318,6 +5318,60 @@ VOID TEST(ProtocolStackTest, ProtocolPingFlow)
}
}
/**
* expect specified message
*/
VOID TEST(ProtocolStackTest, ProtocolExcpectMessage)
{
MockBufferIO bio;
SrsProtocol proto(&bio);
// packet is SrsConnectAppPacket
char data[] = {
// 12bytes header, 1byts chunk header, 11bytes msg heder
(char)0x03, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x01, (char)0xa1, (char)0x14, (char)0x00, (char)0x00, (char)0x00, (char)0x00,
// msg payload start
(char)0x02, (char)0x00, (char)0x07, (char)0x63,
(char)0x6f, (char)0x6e, (char)0x6e, (char)0x65, (char)0x63, (char)0x74, (char)0x00, (char)0x3f, (char)0xf0, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x03,
(char)0x00, (char)0x03, (char)0x61, (char)0x70, (char)0x70, (char)0x02, (char)0x00, (char)0x04, (char)0x6c, (char)0x69, (char)0x76, (char)0x65, (char)0x00, (char)0x08, (char)0x66, (char)0x6c,
(char)0x61, (char)0x73, (char)0x68, (char)0x56, (char)0x65, (char)0x72, (char)0x02, (char)0x00, (char)0x0d, (char)0x57, (char)0x49, (char)0x4e, (char)0x20, (char)0x31, (char)0x32, (char)0x2c,
(char)0x30, (char)0x2c, (char)0x30, (char)0x2c, (char)0x34, (char)0x31, (char)0x00, (char)0x06, (char)0x73, (char)0x77, (char)0x66, (char)0x55, (char)0x72, (char)0x6c, (char)0x02, (char)0x00,
(char)0x51, (char)0x68, (char)0x74, (char)0x74, (char)0x70, (char)0x3a, (char)0x2f, (char)0x2f, (char)0x77, (char)0x77, (char)0x77, (char)0x2e, (char)0x6f, (char)0x73, (char)0x73, (char)0x72,
(char)0x73, (char)0x2e, (char)0x6e, (char)0x65, (char)0x74, (char)0x3a, (char)0x38, (char)0x30, (char)0x38, (char)0x35, (char)0x2f, (char)0x70, (char)0x6c, (char)0x61, (char)0x79, (char)0x65,
(char)0x72, (char)0x73, (char)0x2f, (char)0x73, (char)0x72, (char)0x73, (char)0x5f, (char)0x70, (char)0x6c, (char)0x61, (char)0x79, (char)0x65, (char)0x72, (char)0x2f, (char)0x72, (char)0x65,
(char)0x6c, (char)0x65, (char)0x61, (char)0x73, (char)0x65, (char)0x2f, (char)0x73, (char)0x72, (char)0x73, (char)0x5f, (char)0x70, (char)0x6c,
(char)0xC3, /*next chunk.*/ (char)0x61, (char)0x79, (char)0x65, (char)0x72,
(char)0x2e, (char)0x73, (char)0x77, (char)0x66, (char)0x3f, (char)0x5f, (char)0x76, (char)0x65, (char)0x72, (char)0x73, (char)0x69, (char)0x6f, (char)0x6e, (char)0x3d, (char)0x31, (char)0x2e,
(char)0x32, (char)0x33, (char)0x00, (char)0x05, (char)0x74, (char)0x63, (char)0x55, (char)0x72, (char)0x6c, (char)0x02, (char)0x00, (char)0x14, (char)0x72, (char)0x74, (char)0x6d, (char)0x70,
(char)0x3a, (char)0x2f, (char)0x2f, (char)0x64, (char)0x65, (char)0x76, (char)0x3a, (char)0x31, (char)0x39, (char)0x33, (char)0x35, (char)0x2f, (char)0x6c, (char)0x69, (char)0x76, (char)0x65,
(char)0x00, (char)0x04, (char)0x66, (char)0x70, (char)0x61, (char)0x64, (char)0x01, (char)0x00, (char)0x00, (char)0x0c, (char)0x63, (char)0x61, (char)0x70, (char)0x61, (char)0x62, (char)0x69,
(char)0x6c, (char)0x69, (char)0x74, (char)0x69, (char)0x65, (char)0x73, (char)0x00, (char)0x40, (char)0x6d, (char)0xe0, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00,
(char)0x0b, (char)0x61, (char)0x75, (char)0x64, (char)0x69, (char)0x6f, (char)0x43, (char)0x6f, (char)0x64, (char)0x65, (char)0x63, (char)0x73, (char)0x00, (char)0x40, (char)0xab, (char)0xee,
(char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x0b, (char)0x76, (char)0x69, (char)0x64, (char)0x65, (char)0x6f, (char)0x43, (char)0x6f, (char)0x64, (char)0x65,
(char)0x63, (char)0x73, (char)0x00, (char)0x40, (char)0x6f, (char)0x80, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00,
(char)0xC3, /*next chunk.*/ (char)0x0d, (char)0x76, (char)0x69, (char)0x64,
(char)0x65, (char)0x6f, (char)0x46, (char)0x75, (char)0x6e, (char)0x63, (char)0x74, (char)0x69, (char)0x6f, (char)0x6e, (char)0x00, (char)0x3f, (char)0xf0, (char)0x00, (char)0x00, (char)0x00,
(char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x07, (char)0x70, (char)0x61, (char)0x67, (char)0x65, (char)0x55, (char)0x72, (char)0x6c, (char)0x02, (char)0x00, (char)0x62, (char)0x68,
(char)0x74, (char)0x74, (char)0x70, (char)0x3a, (char)0x2f, (char)0x2f, (char)0x77, (char)0x77, (char)0x77, (char)0x2e, (char)0x6f, (char)0x73, (char)0x73, (char)0x72, (char)0x73, (char)0x2e,
(char)0x6e, (char)0x65, (char)0x74, (char)0x3a, (char)0x38, (char)0x30, (char)0x38, (char)0x35, (char)0x2f, (char)0x70, (char)0x6c, (char)0x61, (char)0x79, (char)0x65, (char)0x72, (char)0x73,
(char)0x2f, (char)0x73, (char)0x72, (char)0x73, (char)0x5f, (char)0x70, (char)0x6c, (char)0x61, (char)0x79, (char)0x65, (char)0x72, (char)0x2e, (char)0x68, (char)0x74, (char)0x6d, (char)0x6c,
(char)0x3f, (char)0x76, (char)0x68, (char)0x6f, (char)0x73, (char)0x74, (char)0x3d, (char)0x64, (char)0x65, (char)0x76, (char)0x26, (char)0x73, (char)0x74, (char)0x72, (char)0x65, (char)0x61,
(char)0x6d, (char)0x3d, (char)0x6c, (char)0x69, (char)0x76, (char)0x65, (char)0x73, (char)0x74, (char)0x72, (char)0x65, (char)0x61, (char)0x6d, (char)0x26, (char)0x73, (char)0x65, (char)0x72,
(char)0x76, (char)0x65, (char)0x72, (char)0x3d, (char)0x64, (char)0x65, (char)0x76, (char)0x26, (char)0x70, (char)0x6f, (char)0x72, (char)0x74,
(char)0xC3, /*next chunk.*/ (char)0x3d, (char)0x31, (char)0x39, (char)0x33,
(char)0x35, (char)0x00, (char)0x0e, (char)0x6f, (char)0x62, (char)0x6a, (char)0x65, (char)0x63, (char)0x74, (char)0x45, (char)0x6e, (char)0x63, (char)0x6f, (char)0x64, (char)0x69, (char)0x6e,
(char)0x67, (char)0x00, (char)0x40, (char)0x08, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x00, (char)0x09
};
bio.in_buffer.append(data, sizeof(data));
SrsMessage* msg = NULL;
SrsConnectAppPacket* pkt = NULL;
ASSERT_TRUE(ERROR_SUCCESS == proto.expect_message<SrsConnectAppPacket>(&msg, &pkt));
SrsAutoFree(SrsMessage, msg);
SrsAutoFree(SrsConnectAppPacket, pkt);
ASSERT_TRUE(NULL != pkt);
}
VOID TEST(ProtocolRTMPTest, RTMPRequest)
{
SrsRequest req;
... ...