winlin

support onStatus, RtmpSampleAccess, DataStart

... ... @@ -195,7 +195,9 @@ messages.
#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
#define RTMP_AMF0_COMMAND_PLAY "play"
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
/****************************************************************************
*****************************************************************************
... ... @@ -483,7 +485,7 @@ int SrsProtocol::on_send_message(SrsMessage* msg)
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(msg->get_packet());
srs_assert(pkt != NULL);
in_chunk_size = pkt->chunk_size;
out_chunk_size = pkt->chunk_size;
srs_trace("set output chunk size to %d", pkt->chunk_size);
break;
... ... @@ -1545,6 +1547,180 @@ int SrsOnBWDonePacket::encode_packet(SrsStream* stream)
return ret;
}
SrsOnStatusCallPacket::SrsOnStatusCallPacket()
{
command_name = RTMP_AMF0_COMMAND_ON_STATUS;
transaction_id = 0;
args = new SrsAmf0Null();
data = new SrsAmf0Object();
}
SrsOnStatusCallPacket::~SrsOnStatusCallPacket()
{
if (args) {
delete args;
args = NULL;
}
if (data) {
delete data;
data = NULL;
}
}
int SrsOnStatusCallPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsOnStatusCallPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsOnStatusCallPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_object_size(data);
}
int SrsOnStatusCallPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");;
if ((ret = srs_amf0_write_object(stream, data)) != ERROR_SUCCESS) {
srs_error("encode data failed. ret=%d", ret);
return ret;
}
srs_verbose("encode data success.");
srs_info("encode onStatus(Call) packet success.");
return ret;
}
SrsOnStatusDataPacket::SrsOnStatusDataPacket()
{
command_name = RTMP_AMF0_COMMAND_ON_STATUS;
data = new SrsAmf0Object();
}
SrsOnStatusDataPacket::~SrsOnStatusDataPacket()
{
if (data) {
delete data;
data = NULL;
}
}
int SrsOnStatusDataPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsOnStatusDataPacket::get_message_type()
{
return RTMP_MSG_AMF0DataMessage;
}
int SrsOnStatusDataPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_object_size(data);
}
int SrsOnStatusDataPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_object(stream, data)) != ERROR_SUCCESS) {
srs_error("encode data failed. ret=%d", ret);
return ret;
}
srs_verbose("encode data success.");
srs_info("encode onStatus(Data) packet success.");
return ret;
}
SrsSampleAccessPacket::SrsSampleAccessPacket()
{
command_name = RTMP_AMF0_DATA_SAMPLE_ACCESS;
video_sample_access = false;
audio_sample_access = false;
}
SrsSampleAccessPacket::~SrsSampleAccessPacket()
{
}
int SrsSampleAccessPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsSampleAccessPacket::get_message_type()
{
return RTMP_MSG_AMF0DataMessage;
}
int SrsSampleAccessPacket::get_size()
{
return srs_amf0_get_string_size(command_name)
+ srs_amf0_get_boolean_size() + srs_amf0_get_boolean_size();
}
int SrsSampleAccessPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_boolean(stream, video_sample_access)) != ERROR_SUCCESS) {
srs_error("encode video_sample_access failed. ret=%d", ret);
return ret;
}
srs_verbose("encode video_sample_access success.");
if ((ret = srs_amf0_write_boolean(stream, audio_sample_access)) != ERROR_SUCCESS) {
srs_error("encode audio_sample_access failed. ret=%d", ret);
return ret;
}
srs_verbose("encode audio_sample_access success.");;
srs_info("encode |RtmpSampleAccess packet success.");
return ret;
}
SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket()
{
ackowledgement_window_size = 0;
... ...
... ... @@ -280,12 +280,18 @@ protected:
public:
SrsPacket();
virtual ~SrsPacket();
/**
* decode functions.
*/
public:
/**
* subpacket must override to decode packet from stream.
* @remark never invoke the super.decode, it always failed.
*/
virtual int decode(SrsStream* stream);
/**
* encode functions.
*/
public:
virtual int get_perfer_cid();
virtual int get_payload_length();
... ... @@ -507,6 +513,93 @@ protected:
};
/**
* onStatus command, AMF0 Call
* @remark, user must set the stream_id in header.
*/
class SrsOnStatusCallPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsOnStatusCallPacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* args;
SrsAmf0Object* data;
public:
SrsOnStatusCallPacket();
virtual ~SrsOnStatusCallPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* onStatus data, AMF0 Data
* @remark, user must set the stream_id in header.
*/
class SrsOnStatusDataPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsOnStatusDataPacket);
}
public:
std::string command_name;
SrsAmf0Object* data;
public:
SrsOnStatusDataPacket();
virtual ~SrsOnStatusDataPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* AMF0Data RtmpSampleAccess
* @remark, user must set the stream_id in header.
*/
class SrsSampleAccessPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsSampleAccessPacket);
}
public:
std::string command_name;
bool video_sample_access;
bool audio_sample_access;
public:
SrsSampleAccessPacket();
virtual ~SrsSampleAccessPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* 5.5. Window Acknowledgement Size (5)
* The client or the server sends this message to inform the peer which
* window size to use when sending acknowledgment.
... ... @@ -595,6 +688,7 @@ protected:
virtual int encode_packet(SrsStream* stream);
};
// 3.7. User Control message
enum SrcPCUCEventType
{
// generally, 4bytes event-data
... ...
... ... @@ -38,6 +38,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define RTMP_SIG_SRS_NAME "srs(simple rtmp server)"
#define RTMP_SIG_SRS_URL "https://github.com/winlinvip/simple-rtmp-server"
#define RTMP_SIG_SRS_VERSION "0.1"
#define RTMP_SIG_CLIENT_ID "ASAICiss"
#define StatusLevel "level"
#define StatusCode "code"
#define StatusDescription "description"
#define StatusDetails "details"
#define StatusClientId "clientid"
// status value
#define StatusLevelStatus "status"
// code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
int SrsRequest::discovery_app()
{
... ... @@ -225,9 +239,9 @@ int SrsRtmp::response_connect_app()
pkt->props->properties["capabilities"] = new SrsAmf0Number(123);
pkt->props->properties["mode"] = new SrsAmf0Number(1);
pkt->info->properties["level"] = new SrsAmf0String("status");
pkt->info->properties["code"] = new SrsAmf0String("NetConnection.Connect.Success");
pkt->info->properties["description"] = new SrsAmf0String("Connection succeeded");
pkt->info->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->info->properties[StatusCode] = new SrsAmf0String(StatusCodeConnectSuccess);
pkt->info->properties[StatusDescription] = new SrsAmf0String("Connection succeeded");
pkt->info->properties["objectEncoding"] = new SrsAmf0Number(RTMP_SIG_AMF0_VER);
SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
pkt->info->properties["data"] = data;
... ... @@ -342,6 +356,80 @@ int SrsRtmp::start_play(int stream_id)
srs_info("send PCUC(StreamBegin) message success.");
}
// onStatus(NetStream.Play.Reset)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamReset);
pkt->data->properties[StatusDescription] = new SrsAmf0String("Playing and resetting stream.");
pkt->data->properties[StatusDetails] = new SrsAmf0String("stream");
pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID);
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Play.Reset) message success.");
}
// onStatus(NetStream.Play.Start)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamStart);
pkt->data->properties[StatusDescription] = new SrsAmf0String("Started playing stream.");
pkt->data->properties[StatusDetails] = new SrsAmf0String("stream");
pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID);
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Play.Reset) message success.");
}
// |RtmpSampleAccess(false, false)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);
return ret;
}
srs_info("send |RtmpSampleAccess(false, false) message success.");
}
// onStatus(NetStream.Data.Start)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeDataStart);
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Data.Start) message success.");
}
srs_info("start play success.");
return ret;
... ...
... ... @@ -104,8 +104,11 @@ public:
*/
virtual int set_chunk_size(int chunk_size);
/**
* when client type is play, response with
* StreamBegin, onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start).
* when client type is play, response with packets:
* StreamBegin,
* onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start).,
* |RtmpSampleAccess(false, false),
* onStatus(NetStream.Data.Start).
*/
virtual int start_play(int stream_id);
private:
... ...