winlin

support FMLE releaseStream, FCPublish

... ... @@ -89,6 +89,11 @@ bool SrsAmf0Any::is_null()
return marker == RTMP_AMF0_Null;
}
bool SrsAmf0Any::is_undefined()
{
return marker == RTMP_AMF0_Undefined;
}
bool SrsAmf0Any::is_object()
{
return marker == RTMP_AMF0_Object;
... ... @@ -145,6 +150,15 @@ SrsAmf0Null::~SrsAmf0Null()
{
}
SrsAmf0Undefined::SrsAmf0Undefined()
{
marker = RTMP_AMF0_Undefined;
}
SrsAmf0Undefined::~SrsAmf0Undefined()
{
}
SrsAmf0ObjectEOF::SrsAmf0ObjectEOF()
{
marker = RTMP_AMF0_ObjectEnd;
... ... @@ -523,6 +537,45 @@ int srs_amf0_write_null(SrsStream* stream)
return ret;
}
int srs_amf0_read_undefined(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 read undefined marker failed. ret=%d", ret);
return ret;
}
char marker = stream->read_1bytes();
if (marker != RTMP_AMF0_Undefined) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 check undefined marker failed. "
"marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_Undefined, ret);
return ret;
}
srs_verbose("amf0 read undefined success");
return ret;
}
int srs_amf0_write_undefined(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write undefined marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_Undefined);
srs_verbose("amf0 write undefined marker success");
return ret;
}
int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
{
int ret = ERROR_SUCCESS;
... ... @@ -572,6 +625,10 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
value = new SrsAmf0Null();
return ret;
}
case RTMP_AMF0_Undefined: {
value = new SrsAmf0Undefined();
return ret;
}
case RTMP_AMF0_ObjectEnd: {
SrsAmf0ObjectEOF* p = NULL;
if ((ret = srs_amf0_read_object_eof(stream, p)) != ERROR_SUCCESS) {
... ... @@ -628,6 +685,9 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value)
case RTMP_AMF0_Null: {
return srs_amf0_write_null(stream);
}
case RTMP_AMF0_Undefined: {
return srs_amf0_write_undefined(stream);
}
case RTMP_AMF0_ObjectEnd: {
SrsAmf0ObjectEOF* p = srs_amf0_convert<SrsAmf0ObjectEOF>(value);
return srs_amf0_write_object_eof(stream, p);
... ... @@ -676,6 +736,10 @@ int srs_amf0_get_any_size(SrsAmf0Any* value)
size += srs_amf0_get_null_size();
break;
}
case RTMP_AMF0_Undefined: {
size += srs_amf0_get_undefined_size();
break;
}
case RTMP_AMF0_ObjectEnd: {
size += srs_amf0_get_object_eof_size();
break;
... ... @@ -1009,6 +1073,11 @@ int srs_amf0_get_null_size()
return 1;
}
int srs_amf0_get_undefined_size()
{
return 1;
}
int srs_amf0_get_boolean_size()
{
return 1 + 1;
... ...
... ... @@ -55,6 +55,7 @@ struct SrsAmf0Any
virtual bool is_boolean();
virtual bool is_number();
virtual bool is_null();
virtual bool is_undefined();
virtual bool is_object();
virtual bool is_object_eof();
virtual bool is_ecma_array();
... ... @@ -115,6 +116,17 @@ struct SrsAmf0Null : public SrsAmf0Any
};
/**
* read amf0 undefined from stream.
* 2.8 undefined Type
* undefined-type = undefined-marker
*/
struct SrsAmf0Undefined : public SrsAmf0Any
{
SrsAmf0Undefined();
virtual ~SrsAmf0Undefined();
};
/**
* 2.11 Object End Type
* object-end-type = UTF-8-empty object-end-marker
* 0x00 0x00 0x09
... ... @@ -208,6 +220,14 @@ extern int srs_amf0_read_null(SrsStream* stream);
extern int srs_amf0_write_null(SrsStream* stream);
/**
* read amf0 undefined from stream.
* 2.8 undefined Type
* undefined-type = undefined-marker
*/
extern int srs_amf0_read_undefined(SrsStream* stream);
extern int srs_amf0_write_undefined(SrsStream* stream);
/**
* read amf0 object from stream.
* 2.5 Object Type
* anonymous-object-type = object-marker *(object-property)
... ... @@ -233,6 +253,7 @@ extern int srs_amf0_get_utf8_size(std::string value);
extern int srs_amf0_get_string_size(std::string value);
extern int srs_amf0_get_number_size();
extern int srs_amf0_get_null_size();
extern int srs_amf0_get_undefined_size();
extern int srs_amf0_get_boolean_size();
extern int srs_amf0_get_object_size(SrsAmf0Object* obj);
extern int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr);
... ...
... ... @@ -197,6 +197,8 @@ messages.
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
/****************************************************************************
... ... @@ -952,6 +954,14 @@ int SrsMessage::decode_packet()
srs_info("decode the AMF0/AMF3 command(paly message).");
packet = new SrsPlayPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
}
// default packet to drop message.
... ... @@ -1351,6 +1361,119 @@ int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
return ret;
}
SrsFMLEStartPacket::SrsFMLEStartPacket()
{
command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
transaction_id = 0;
}
SrsFMLEStartPacket::~SrsFMLEStartPacket()
{
}
int SrsFMLEStartPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty()
|| command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
|| command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode FMLE start packet success");
return ret;
}
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
command_name = RTMP_AMF0_COMMAND_RESULT;
transaction_id = _transaction_id;
command_object = new SrsAmf0Null();
args = new SrsAmf0Undefined();
}
SrsFMLEStartResPacket::~SrsFMLEStartResPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
if (args) {
delete args;
args = NULL;
}
}
int SrsFMLEStartResPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
}
int SrsFMLEStartResPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsFMLEStartResPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_undefined_size();
}
int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");
srs_info("encode FMLE start response packet success.");
return ret;
}
SrsPlayPacket::SrsPlayPacket()
{
command_name = RTMP_AMF0_COMMAND_PLAY;
... ...
... ... @@ -46,6 +46,7 @@ class SrsMessage;
class SrsChunkStream;
class SrsAmf0Object;
class SrsAmf0Null;
class SrsAmf0Undefined;
// convert class name to string.
#define CLASS_NAME_STRING(className) #className
... ... @@ -428,6 +429,57 @@ protected:
};
/**
* FMLE start publish: ReleaseStream/PublishStream
*/
class SrsFMLEStartPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsFMLEStartPacket);
}
public:
std::string command_name;
double transaction_id;
std::string stream_name;
public:
SrsFMLEStartPacket();
virtual ~SrsFMLEStartPacket();
public:
virtual int decode(SrsStream* stream);
};
/**
* response for SrsFMLEStartPacket.
*/
class SrsFMLEStartResPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsFMLEStartResPacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* command_object;
SrsAmf0Undefined* args;
public:
SrsFMLEStartResPacket(double _transaction_id);
virtual ~SrsFMLEStartResPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* 4.2.1. play
* The client sends this command to the server to play a stream.
*/
... ...
... ... @@ -307,9 +307,15 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
srs_info("identify client by create stream, play or flash publish.");
return identify_create_stream_client(
dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);
}
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
srs_info("identify client by releaseStream, fmle publish.");
return identify_fmle_publish_client(
dynamic_cast<SrsFMLEStartPacket*>(pkt), stream_id, type, stream_name);
}
srs_trace("ignore AMF0/AMF3 command message.");
}
... ... @@ -487,3 +493,83 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
return ret;
}
int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
{
int ret = ERROR_SUCCESS;
type = SrsClientPublish;
stream_name = req->stream_name;
// createStream response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send releaseStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send releaseStream response message success.");
}
// FCPublish
double fc_publish_tid = 0;
if (true) {
SrsMessage* msg = NULL;
SrsFMLEStartPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsFMLEStartPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv FCPublish message failed. ret=%d", ret);
return ret;
}
srs_info("recv FCPublish request message success.");
SrsAutoFree(SrsMessage, msg, false);
fc_publish_tid = pkt->transaction_id;
}
// FCPublish response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FCPublish response message failed. ret=%d", ret);
return ret;
}
srs_info("send FCPublish response message success.");
}
// createStream
double create_stream_tid = 0;
if (true) {
SrsMessage* msg = NULL;
SrsCreateStreamPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsCreateStreamPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv createStream message failed. ret=%d", ret);
return ret;
}
srs_info("recv createStream request message success.");
SrsAutoFree(SrsMessage, msg, false);
create_stream_tid = pkt->transaction_id;
}
// createStream response
if (true) {
SrsMessage* msg = new SrsMessage();
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
return ret;
}
srs_info("send createStream response message success.");
}
return ret;
}
... ...
... ... @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsProtocol;
class SrsCreateStreamPacket;
class SrsFMLEStartPacket;
/**
* the original request from client.
... ... @@ -113,6 +114,7 @@ public:
virtual int start_play(int stream_id);
private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
};
#endif
\ No newline at end of file
... ...