winlin

refine RTMP protocol stack, rename buffer to in_buffer, change field pp/stream t…

…o local variables. 0.9.154
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR "0"
#define VERSION_MINOR "9"
#define VERSION_REVISION "153"
#define VERSION_REVISION "154"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
... ...
... ... @@ -297,8 +297,7 @@ SrsProtocol::AckWindowSize::AckWindowSize()
SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
{
buffer = new SrsBuffer();
decode_stream = new SrsStream();
in_buffer = new SrsBuffer();
skt = io;
in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE;
... ... @@ -317,8 +316,7 @@ SrsProtocol::~SrsProtocol()
chunk_streams.clear();
}
srs_freep(decode_stream);
srs_freep(buffer);
srs_freep(in_buffer);
}
void SrsProtocol::set_recv_timeout(int64_t timeout_us)
... ... @@ -407,9 +405,11 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
srs_assert(msg->payload != NULL);
srs_assert(msg->size > 0);
SrsStream stream;
// initialize the decode stream for all message,
// it's ok for the initialize if fast and without memory copy.
if ((ret = decode_stream->initialize((char*)(msg->payload), msg->size)) != ERROR_SUCCESS) {
if ((ret = stream.initialize((char*)(msg->payload), msg->size)) != ERROR_SUCCESS) {
srs_error("initialize stream failed. ret=%d", ret);
return ret;
}
... ... @@ -417,7 +417,7 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
// decode the packet.
SrsPacket* packet = NULL;
if ((ret = do_decode_message(msg->header, decode_stream, &packet)) != ERROR_SUCCESS) {
if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
srs_freep(packet);
return ret;
}
... ... @@ -445,6 +445,8 @@ int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet)
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char* p = (char*)msg->payload;
// to directly set the field.
char* pp = NULL;
// always write the header event payload is empty.
do {
... ... @@ -871,14 +873,14 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
int ret = ERROR_SUCCESS;
int required_size = 1;
if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
char* p = buffer->bytes();
char* p = in_buffer->bytes();
fmt = (*p >> 6) & 0x03;
cid = *p & 0x3f;
... ... @@ -891,7 +893,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
if (cid == 0) {
required_size = 2;
if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
... ... @@ -904,7 +906,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else if (cid == 1) {
required_size = 3;
if ((ret = buffer->grow(skt, 3)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, 3)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
... ... @@ -997,13 +999,13 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
int required_size = bh_size + mh_size;
if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
}
return ret;
}
char* p = buffer->bytes() + bh_size;
char* p = in_buffer->bytes() + bh_size;
/**
* parse the message header.
... ... @@ -1107,7 +1109,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
mh_size += 4;
required_size = bh_size + mh_size;
srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
}
... ... @@ -1210,7 +1212,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
// empty message
if (chunk->header.payload_length <= 0) {
// need erase the header in buffer.
buffer->erase(bh_size + mh_size);
in_buffer->erase(bh_size + mh_size);
srs_trace("get an empty RTMP "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type,
... ... @@ -1238,14 +1240,14 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
// read payload to buffer
int required_size = bh_size + mh_size + payload_size;
if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
buffer->erase(bh_size + mh_size + payload_size);
memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->bytes() + bh_size + mh_size, payload_size);
in_buffer->erase(bh_size + mh_size + payload_size);
chunk->msg->size += payload_size;
srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
... ...
... ... @@ -97,8 +97,10 @@ private:
};
// peer in/out
private:
/**
* underlayer socket object, send/recv bytes.
*/
ISrsProtocolReaderWriter* skt;
char* pp;
/**
* requests sent out, used to build the response.
* key: transactionId
... ... @@ -107,14 +109,33 @@ private:
std::map<double, std::string> requests;
// peer in
private:
/**
* chunk stream to decode RTMP messages.
*/
std::map<int, SrsChunkStream*> chunk_streams;
SrsStream* decode_stream;
SrsBuffer* buffer;
/**
* bytes buffer cache, recv from skt, provide services for stream.
*/
SrsBuffer* in_buffer;
/**
* input chunk size, default to 128, set by peer packet.
*/
int32_t in_chunk_size;
/**
* input ack size, when to send the acked packet.
*/
AckWindowSize in_ack_size;
// peer out
private:
/**
* output header cache.
* used for type0, 11bytes(or 15bytes with extended timestamp) header.
* or for type3, 1bytes(or 5bytes with extended timestamp) header.
*/
char out_header_cache[RTMP_MAX_FMT0_HEADER_SIZE];
/**
* output chunk size, default to 128, set by config.
*/
int32_t out_chunk_size;
public:
/**
... ...