winlin

decode basic header

... ... @@ -23,6 +23,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_buffer.hpp>
#include <srs_core_error.hpp>
#include <srs_core_socket.hpp>
#define SOCKET_READ_SIZE 4096
SrsBuffer::SrsBuffer()
{
}
... ... @@ -31,3 +36,39 @@ SrsBuffer::~SrsBuffer()
{
}
int SrsBuffer::size()
{
return (int)data.size();
}
char* SrsBuffer::bytes()
{
return &data.at(0);
}
void SrsBuffer::append(char* bytes, int size)
{
std::vector<char> vec(bytes, bytes + size);
data.insert(data.begin(), vec.begin(), vec.end());
}
int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size)
{
int ret = ERROR_SUCCESS;
ssize_t nread;
while (size() < required_size) {
char buffer[SOCKET_READ_SIZE];
if ((ret = skt->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
return ret;
}
srs_assert((int)nread > 0);
append(buffer, (int)nread);
}
return ret;
}
... ...
... ... @@ -30,6 +30,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <vector>
class SrsSocket;
/**
* the buffer provices bytes cache for protocol. generally,
* protocol recv data from socket, put into buffer, decode to RTMP message.
... ... @@ -37,9 +41,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
class SrsBuffer
{
private:
std::vector<char> data;
public:
SrsBuffer();
virtual ~SrsBuffer();
public:
virtual char* bytes();
private:
virtual int size();
virtual void append(char* bytes, int size);
public:
virtual int ensure_buffer_bytes(SrsSocket* skt, int required_size);
};
#endif
\ No newline at end of file
... ...
... ... @@ -31,23 +31,167 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
{
stfd = client_stfd;
buffer = new SrsBuffer();
skt = new SrsSocket(stfd);
}
SrsProtocol::~SrsProtocol()
{
std::map<int, SrsChunkStream*>::iterator it;
for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) {
SrsChunkStream* stream = it->second;
if (stream) {
delete stream;
}
}
chunk_streams.clear();
if (buffer) {
delete buffer;
buffer = NULL;
}
if (skt) {
delete skt;
skt = NULL;
}
}
int SrsProtocol::recv_message(SrsMessage** pmsg)
{
int ret = ERROR_SUCCESS;
while (true) {
// chunk stream basic header.
char fmt = 0;
int cid = 0;
int size = 0;
if ((ret = read_basic_header(fmt, cid, size)) != ERROR_SUCCESS) {
srs_error("read basic header failed. ret=%d", ret);
return ret;
}
srs_info("read basic header success. fmt=%d, cid=%d, size=%d", fmt, cid, size);
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_info("cached chunk stream: fmt=%d, cid=%d, message(type=%d, size=%d, time=%d, sid=%d)",
chunk->fmt, chunk->cid, chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
// chunk stream message header
SrsMessage* msg = NULL;
if ((ret = read_message_header(chunk, fmt, &msg)) != ERROR_SUCCESS) {
srs_error("read message header failed. ret=%d", ret);
return ret;
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
continue;
}
// decode the msg
}
return ret;
}
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
{
int ret = ERROR_SUCCESS;
if ((ret = buffer->ensure_buffer_bytes(skt, 1)) != ERROR_SUCCESS) {
return ret;
}
char* p = buffer->bytes();
fmt = (*p >> 6) & 0x03;
cid = *p & 0x3f;
size = 1;
if (cid > 1) {
return ret;
}
if (cid == 0) {
if ((ret = buffer->ensure_buffer_bytes(skt, 2)) != ERROR_SUCCESS) {
return ret;
}
cid = 64;
cid += *(++p);
size = 2;
} else if (cid == 1) {
if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
return ret;
}
cid = 64;
cid += *(++p);
cid += *(++p) * 256;
size = 3;
} else {
srs_error("invalid path, impossible basic header.");
srs_assert(false);
}
return ret;
}
int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, SrsMessage** pmsg)
{
int ret = ERROR_SUCCESS;
return ret;
}
SrsMessageHeader::SrsMessageHeader()
{
message_type = 0;
payload_length = 0;
timestamp = 0;
stream_id = 0;
}
SrsMessageHeader::~SrsMessageHeader()
{
}
SrsChunkStream::SrsChunkStream(int _cid)
{
fmt = 0;
cid = _cid;
msg = NULL;
}
SrsChunkStream::~SrsChunkStream()
{
if (msg) {
delete msg;
msg = NULL;
}
}
SrsMessage::SrsMessage()
{
payload = NULL;
}
SrsMessage::~SrsMessage()
{
if (payload) {
delete[] payload;
payload = NULL;
}
}
... ...
... ... @@ -30,9 +30,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <map>
#include <st.h>
class SrsSocket;
class SrsBuffer;
class SrsMessage;
class SrsChunkStream;
/**
* the protocol provides the rtmp-message-protocol services,
... ... @@ -42,19 +47,98 @@ class SrsMessage;
class SrsProtocol
{
private:
std::map<int, SrsChunkStream*> chunk_streams;
st_netfd_t stfd;
SrsBuffer* buffer;
SrsSocket* skt;
public:
SrsProtocol(st_netfd_t client_stfd);
virtual ~SrsProtocol();
public:
virtual int recv_message(SrsMessage** pmsg);
private:
virtual int read_basic_header(char& fmt, int& cid, int& size);
virtual int read_message_header(SrsChunkStream* chunk, char fmt, SrsMessage** pmsg);
};
/**
* 4.1. Message Header
*/
struct SrsMessageHeader
{
/**
* One byte field to represent the message type. A range of type IDs
* (1-7) are reserved for protocol control messages.
*/
int8_t message_type;
/**
* Three-byte field that represents the size of the payload in bytes.
* It is set in big-endian format.
*/
int32_t payload_length;
/**
* Four-byte field that contains a timestamp of the message.
* The 4 bytes are packed in the big-endian order.
*/
int32_t timestamp;
/**
* Three-byte field that identifies the stream of the message. These
* bytes are set in big-endian format.
*/
int32_t stream_id;
SrsMessageHeader();
virtual ~SrsMessageHeader();
};
/**
* incoming chunk stream maybe interlaced,
* use the chunk stream to cache the input RTMP chunk streams.
*/
class SrsChunkStream
{
public:
/**
* represents the basic header fmt,
* which used to identify the variant message header type.
*/
char fmt;
/**
* represents the basic header cid,
* which is the chunk stream id.
*/
int cid;
/**
* cached message header
*/
SrsMessageHeader header;
/**
* partially read message.
*/
SrsMessage* msg;
public:
SrsChunkStream(int _cid);
virtual ~SrsChunkStream();
public:
};
/**
* common RTMP message defines in rtmp.part2.Message-Formats.pdf.
*/
class SrsMessage
{
// 4.1. Message Header
public:
int8_t message_type;
int24_t payload_length;
SrsMessageHeader header;
// 4.2. Message Payload
public:
/**
* The other part which is the payload is the actual data that is
* contained in the message. For example, it could be some audio samples
* or compressed video data. The payload format and interpretation are
* beyond the scope of this document.
*/
int8_t* payload;
public:
SrsMessage();
virtual ~SrsMessage();
... ...
... ... @@ -48,7 +48,7 @@ int SrsRtmp::handshake()
int ret = ERROR_SUCCESS;
ssize_t nsize;
Socket skt(stfd);
SrsSocket skt(stfd);
char* c0c1 = new char[1537];
SrsAutoFree(char, c0c1, true);
... ... @@ -92,6 +92,10 @@ int SrsRtmp::handshake()
int SrsRtmp::connect_app(SrsApp** papp)
{
int ret = ERROR_SUCCESS;
SrsMessage* msg = NULL;
protocol->recv_message(&msg);
return ret;
}
... ...
... ... @@ -25,16 +25,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_error.hpp>
Socket::Socket(st_netfd_t client_stfd)
SrsSocket::SrsSocket(st_netfd_t client_stfd)
{
stfd = client_stfd;
}
Socket::~Socket()
SrsSocket::~SrsSocket()
{
}
int Socket::read(const void* buf, size_t size, ssize_t* nread)
int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
... ... @@ -53,7 +53,7 @@ int Socket::read(const void* buf, size_t size, ssize_t* nread)
return ret;
}
int Socket::read_fully(const void* buf, size_t size, ssize_t* nread)
int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
... ... @@ -72,7 +72,7 @@ int Socket::read_fully(const void* buf, size_t size, ssize_t* nread)
return ret;
}
int Socket::write(const void* buf, size_t size, ssize_t* nwrite)
int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -36,13 +36,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
*/
class Socket
class SrsSocket
{
private:
st_netfd_t stfd;
public:
Socket(st_netfd_t client_stfd);
virtual ~Socket();
SrsSocket(st_netfd_t client_stfd);
virtual ~SrsSocket();
public:
virtual int read(const void* buf, size_t size, ssize_t* nread);
virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
... ...