winlin

encode packet and send out

... ... @@ -60,29 +60,32 @@ int SrsClient::do_cycle()
int ret = ERROR_SUCCESS;
if ((ret = get_peer_ip()) != ERROR_SUCCESS) {
srs_warn("get peer ip failed. ret=%d", ret);
srs_error("get peer ip failed. ret=%d", ret);
return ret;
}
srs_verbose("get peer ip success. ip=%s", ip);
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
srs_warn("rtmp handshake failed. ret=%d", ret);
srs_error("rtmp handshake failed. ret=%d", ret);
return ret;
}
srs_verbose("rtmp handshake success");
if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
srs_warn("rtmp connect vhost/app failed. ret=%d", ret);
srs_error("rtmp connect vhost/app failed. ret=%d", ret);
return ret;
}
srs_info("rtmp connect success. tcUrl=%s, pageUrl=%s, swfUrl=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str());
srs_trace("rtmp connect success. "
srs_trace("rtmp connect app success. "
"tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
req->app.c_str());
if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) {
srs_error("set window acknowledgement size failed. ret=%d", ret);
return ret;
}
srs_verbose("set window acknowledgement size success");
return ret;
}
... ...
... ... @@ -55,6 +55,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_RTMP_AMF0_DECODE 303
#define ERROR_RTMP_AMF0_INVALID 304
#define ERROR_RTMP_REQ_CONNECT 305
#define ERROR_RTMP_REQ_TCURL 306
#define ERROR_RTMP_MESSAGE_DECODE 307
#define ERROR_SYSTEM_STREAM_INIT 400
... ...
... ... @@ -30,6 +30,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_buffer.hpp>
#include <srs_core_stream.hpp>
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
5. Protocol Control Messages
RTMP reserves message type IDs 1-7 for protocol control messages.
... ... @@ -124,6 +127,9 @@ messages.
*/
#define RTMP_MSG_AggregateMessage 22 // 0x16
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* 6.1.2. Chunk Message Header
* There are four different formats for the chunk message header,
... ... @@ -164,6 +170,9 @@ messages.
// the same as the timestamp of Type 0 chunk.
#define RTMP_FMT_TYPE3 3
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* 6. Chunking
* The chunk size is configurable. It can be set using a control
... ... @@ -189,11 +198,58 @@ messages.
*/
#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* amf0 command message, command name: "connect"
*/
#define RTMP_AMF0_COMMAND_CONNECT "connect"
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* the chunk stream id used for some under-layer message,
* for example, the PC(protocol control) message.
*/
#define RTMP_CID_ProtocolControl 0x02
/**
* the AMF0/AMF3 command message, invoke method and return the result, over NetConnection.
* generally use 0x03.
*/
#define RTMP_CID_OverConnection 0x03
/**
* the AMF0/AMF3 command message, invoke method and return the result, over NetConnection,
* the midst state(we guess).
* rarely used, e.g. onStatus(NetStream.Play.Reset).
*/
#define RTMP_CID_OverConnection2 0x04
/**
* the stream message(amf0/amf3), over NetStream.
* generally use 0x05.
*/
#define RTMP_CID_OverStream 0x05
/**
* the stream message(amf0/amf3), over NetStream, the midst state(we guess).
* rarely used, e.g. play("mp4:mystram.f4v")
*/
#define RTMP_CID_OverStream2 0x08
/**
* the stream message(video), over NetStream
* generally use 0x06.
*/
#define RTMP_CID_Video 0x06
/**
* the stream message(audio), over NetStream.
* generally use 0x07.
*/
#define RTMP_CID_Audio 0x07
/****************************************************************************
*****************************************************************************
****************************************************************************/
SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
{
stfd = client_stfd;
... ... @@ -263,6 +319,118 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
return ret;
}
int SrsProtocol::send_message(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {
srs_error("encode packet to message payload failed. ret=%d", ret);
return ret;
}
srs_info("encode packet to message payload success");
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char* p = (char*)msg->payload;
// always write the header event payload is empty.
do {
// generate the header.
char* pheader = NULL;
int header_size = 0;
if (p == (char*)msg->payload) {
// write new chunk stream header, fmt is 0
pheader = out_header_fmt0;
*pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) {
*pheader++ = 0xFF;
*pheader++ = 0xFF;
*pheader++ = 0xFF;
} else {
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
// message_length, 3bytes, big-endian
pp = (char*)&msg->header.payload_length;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
// message_type, 1bytes
*pheader++ = msg->header.message_type;
// message_length, 3bytes, little-endian
pp = (char*)&msg->header.stream_id;
*pheader++ = pp[0];
*pheader++ = pp[1];
*pheader++ = pp[2];
*pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
header_size = pheader - out_header_fmt0;
pheader = out_header_fmt0;
} else {
// write no message header chunk stream, fmt is 3
pheader = out_header_fmt3;
*pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F);
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
header_size = pheader - out_header_fmt3;
pheader = out_header_fmt3;
}
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
int payload_size = msg->size - ((char*)msg->payload - p);
if (payload_size > out_chunk_size) {
payload_size = out_chunk_size;
}
// send by writev
iovec iov[2];
iov[0].iov_base = pheader;
iov[0].iov_len = header_size;
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
ssize_t nwrite;
if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes when not empty packet.
if (msg->payload && msg->size > 0) {
p += payload_size;
}
} while (p < (char*)msg->payload + msg->size);
return ret;
}
int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
{
int ret = ERROR_SUCCESS;
... ... @@ -325,7 +493,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
return ret;
}
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
{
int ret = ERROR_SUCCESS;
... ... @@ -339,10 +507,10 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
fmt = (*p >> 6) & 0x03;
cid = *p & 0x3f;
size = 1;
bh_size = 1;
if (cid > 1) {
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
return ret;
}
... ... @@ -355,8 +523,8 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
cid = 64;
cid += *(++p);
size = 2;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
bh_size = 2;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else if (cid == 1) {
required_size = 3;
if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
... ... @@ -367,8 +535,8 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size)
cid = 64;
cid += *(++p);
cid += *(++p) * 256;
size = 3;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid);
bh_size = 3;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else {
srs_error("invalid path, impossible basic header.");
srs_assert(false);
... ... @@ -620,7 +788,7 @@ SrsMessage::SrsMessage()
size = 0;
stream = NULL;
payload = NULL;
decoded_payload = NULL;
packet = NULL;
}
SrsMessage::~SrsMessage()
... ... @@ -630,9 +798,9 @@ SrsMessage::~SrsMessage()
payload = NULL;
}
if (decoded_payload) {
delete decoded_payload;
decoded_payload = NULL;
if (packet) {
delete packet;
packet = NULL;
}
if (stream) {
... ... @@ -641,16 +809,6 @@ SrsMessage::~SrsMessage()
}
}
SrsPacket* SrsMessage::get_packet()
{
if (!decoded_payload) {
srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");
}
srs_assert(decoded_payload != NULL);
return decoded_payload;
}
int SrsMessage::decode_packet()
{
int ret = ERROR_SUCCESS;
... ... @@ -684,19 +842,64 @@ int SrsMessage::decode_packet()
stream->reset();
if (command == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0 command(connect vhost/app message).");
decoded_payload = new SrsConnectAppPacket();
return decoded_payload->decode(stream);
packet = new SrsConnectAppPacket();
return packet->decode(stream);
}
// default packet to drop message.
srs_trace("drop the AMF0 command message, command_name=%s", command.c_str());
decoded_payload = new SrsPacket();
packet = new SrsPacket();
return ret;
}
// default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type);
decoded_payload = new SrsPacket();
packet = new SrsPacket();
return ret;
}
SrsPacket* SrsMessage::get_packet()
{
if (!packet) {
srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");
}
srs_assert(packet != NULL);
return packet;
}
int SrsMessage::get_perfer_cid()
{
if (!packet) {
return RTMP_CID_ProtocolControl;
}
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (packet->get_perfer_cid() < 2) {
return packet->get_perfer_cid();
}
return packet->get_perfer_cid();
}
void SrsMessage::set_packet(SrsPacket* pkt)
{
if (packet) {
delete packet;
}
packet = pkt;
}
int SrsMessage::encode_packet()
{
int ret = ERROR_SUCCESS;
if (packet == NULL) {
srs_warn("packet is empty, send out empty message.");
return ret;
}
return ret;
}
... ... @@ -715,6 +918,11 @@ int SrsPacket::decode(SrsStream* /*stream*/)
return ret;
}
int SrsPacket::get_perfer_cid()
{
return 0;
}
SrsConnectAppPacket::SrsConnectAppPacket()
{
command_name = RTMP_AMF0_COMMAND_CONNECT;
... ... @@ -771,3 +979,37 @@ int SrsConnectAppPacket::decode(SrsStream* stream)
return ret;
}
SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket()
{
ackowledgement_window_size = 0;
}
SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket()
{
}
int SrsSetWindowAckSizePacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = super::decode(stream)) != ERROR_SUCCESS) {
return ret;
}
if (!stream->require(4)) {
ret = ERROR_RTMP_MESSAGE_DECODE;
srs_error("set window ack size failed. ret=%d", ret);
return ret;
}
ackowledgement_window_size = stream->read_4bytes();
srs_info("decode window ack size success. ack_size=%d", ackowledgement_window_size);
return ret;
}
int SrsSetWindowAckSizePacket::get_perfer_cid()
{
return RTMP_CID_ProtocolControl;
}
... ...
... ... @@ -47,18 +47,42 @@ class SrsChunkStream;
class SrsAmf0Object;
/**
* max rtmp header size:
* 1bytes basic header,
* 11bytes message header,
* 4bytes timestamp header,
* that is, 1+11+4=16bytes.
*/
#define RTMP_MAX_FMT0_HEADER_SIZE 16
/**
* max rtmp header size:
* 1bytes basic header,
* 4bytes timestamp header,
* that is, 1+4=5bytes.
*/
#define RTMP_MAX_FMT3_HEADER_SIZE 5
/**
* the protocol provides the rtmp-message-protocol services,
* to recv RTMP message from RTMP chunk stream,
* and to send out RTMP message over RTMP chunk stream.
*/
class SrsProtocol
{
// peer in/out
private:
std::map<int, SrsChunkStream*> chunk_streams;
st_netfd_t stfd;
SrsBuffer* buffer;
SrsSocket* skt;
char* pp;
// peer in
private:
std::map<int, SrsChunkStream*> chunk_streams;
SrsBuffer* buffer;
int32_t in_chunk_size;
// peer out
private:
char out_header_fmt0[RTMP_MAX_FMT0_HEADER_SIZE];
char out_header_fmt3[RTMP_MAX_FMT3_HEADER_SIZE];
int32_t out_chunk_size;
public:
SrsProtocol(st_netfd_t client_stfd);
... ... @@ -72,10 +96,38 @@ public:
* @remark, only when success, user can use and must free the pmsg.
*/
virtual int recv_message(SrsMessage** pmsg);
/**
* send out message with encoded payload to peer.
* use the message encode method to encode to payload,
* then sendout over socket.
* @msg this method will free it whatever return value.
*/
virtual int send_message(SrsMessage* msg);
private:
/**
* try to recv interlaced message from peer,
* return error if error occur and nerver set the pmsg,
* return success and pmsg set to NULL if no entire message got,
* return success and pmsg set to entire message if got one.
*/
virtual int recv_interlaced_message(SrsMessage** pmsg);
virtual int read_basic_header(char& fmt, int& cid, int& size);
/**
* read the chunk basic header(fmt, cid) from chunk stream.
* user can discovery a SrsChunkStream by cid.
* @bh_size return the chunk basic header size, to remove the used bytes when finished.
*/
virtual int read_basic_header(char& fmt, int& cid, int& bh_size);
/**
* read the chunk message header(timestamp, payload_length, message_type, stream_id)
* from chunk stream and save to SrsChunkStream.
* @mh_size return the chunk message header size, to remove the used bytes when finished.
*/
virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
/**
* read the chunk payload, remove the used bytes in buffer,
* if got entire message, set the pmsg.
* @payload_size read size in this roundtrip, generally a chunk size or left message size.
*/
virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
};
... ... @@ -164,19 +216,35 @@ public:
// decoded message payload.
private:
SrsStream* stream;
SrsPacket* decoded_payload;
SrsPacket* packet;
public:
SrsMessage();
virtual ~SrsMessage();
public:
/**
* get the decoded packet,
* not all packets need to decode, for video/audio packet,
* passthrough to peer are ok.
* @remark, user must invoke decode_packet first.
* decode packet from message payload.
*/
virtual SrsPacket* get_packet();
virtual int decode_packet();
/**
* get the decoded packet which decoded by decode_packet().
* @remark, user never free the pkt, the message will auto free it.
*/
virtual SrsPacket* get_packet();
public:
SrsMessage();
virtual ~SrsMessage();
/**
* get the perfered cid(chunk stream id) which sendout over.
*/
virtual int get_perfer_cid();
/**
* set the encoded packet to encode_packet() to payload.
* @remark, user never free the pkt, the message will auto free it.
*/
virtual void set_packet(SrsPacket* pkt);
/**
* encode the packet to message payload bytes.
* @remark there exists empty packet, so maybe the payload is NULL.
*/
virtual int encode_packet();
};
/**
... ... @@ -189,8 +257,15 @@ public:
virtual ~SrsPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
};
/**
* 4.1.1. connect
* The client sends the connect command to the server to request
* connection to a server application instance.
*/
class SrsConnectAppPacket : public SrsPacket
{
private:
... ... @@ -207,6 +282,26 @@ public:
};
/**
* 5.5. Window Acknowledgement Size (5)
* The client or the server sends this message to inform the peer which
* window size to use when sending acknowledgment.
*/
class SrsSetWindowAckSizePacket : public SrsPacket
{
private:
typedef SrsPacket super;
public:
int32_t ackowledgement_window_size;
public:
SrsSetWindowAckSizePacket();
virtual ~SrsSetWindowAckSizePacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
};
/**
* expect a specified message, drop others util got specified one.
* @pmsg, user must free it. NULL if not success.
* @ppacket, store in the pmsg, user must never free it. NULL if not success.
... ...
... ... @@ -30,6 +30,47 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_auto_free.hpp>
#include <srs_core_amf0.hpp>
int SrsRequest::discovery_app()
{
int ret = ERROR_SUCCESS;
size_t pos = std::string::npos;
std::string url = tcUrl;
if ((pos = url.find("://")) != std::string::npos) {
schema = url.substr(0, pos);
url = url.substr(schema.length() + 3);
srs_verbose("discovery schema=%s", schema.c_str());
}
if ((pos = url.find("/")) != std::string::npos) {
vhost = url.substr(0, pos);
url = url.substr(vhost.length() + 1);
srs_verbose("discovery vhost=%s", vhost.c_str());
}
port = "1935";
if ((pos = vhost.find(":")) != std::string::npos) {
port = vhost.substr(pos + 1);
vhost = vhost.substr(0, pos);
srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());
}
app = url;
srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());
if (schema.empty() || vhost.empty() || port.empty() || app.empty()) {
ret = ERROR_RTMP_REQ_TCURL;
srs_error("discovery tcUrl failed. "
"tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
tcUrl.c_str(), schema.c_str(), vhost.c_str(), port.c_str(), app.c_str(), ret);
return ret;
}
return ret;
}
SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
{
protocol = new SrsProtocol(client_stfd);
... ... @@ -119,6 +160,26 @@ int SrsRtmp::connect_app(SrsRequest* req)
if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {
req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
}
srs_info("get connect app message params success.");
return req->discovery_app();
}
int SrsRtmp::set_window_ack_size(int ack_size)
{
int ret = ERROR_SUCCESS;
SrsMessage* msg = new SrsMessage();
SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
pkt->ackowledgement_window_size = ack_size;
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send ack size message failed. ret=%d", ret);
return ret;
}
srs_info("send ack size message success. ack_size=%d", ack_size);
return ret;
}
... ...
... ... @@ -50,6 +50,11 @@ struct SrsRequest
std::string port;
std::string app;
std::string stream;
/**
* disconvery vhost/app from tcUrl.
*/
virtual int discovery_app();
};
/**
... ... @@ -68,6 +73,7 @@ public:
public:
virtual int handshake();
virtual int connect_app(SrsRequest* req);
virtual int set_window_ack_size(int ack_size);
};
#endif
\ No newline at end of file
... ...
... ... @@ -85,3 +85,16 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite)
return ret;
}
int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
*nwrite = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
if (*nwrite <= 0) {
ret = ERROR_SOCKET_WRITE;
}
return ret;
}
... ...
... ... @@ -47,6 +47,7 @@ public:
virtual int read(const void* buf, size_t size, ssize_t* nread);
virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
virtual int write(const void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
#endif
\ No newline at end of file
... ...
... ... @@ -97,6 +97,20 @@ int16_t SrsStream::read_2bytes()
return value;
}
int32_t SrsStream::read_4bytes()
{
srs_assert(require(4));
int32_t value;
pp = (char*)&value;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
return value;
}
int64_t SrsStream::read_8bytes()
{
srs_assert(require(8));
... ...
... ... @@ -80,6 +80,10 @@ public:
*/
virtual int16_t read_2bytes();
/**
* get 4bytes int from stream.
*/
virtual int32_t read_4bytes();
/**
* get 8bytes int from stream.
*/
virtual int64_t read_8bytes();
... ...