winlin

srs-librtmp: implements the publish and play.

... ... @@ -35,7 +35,7 @@ int main(int argc, char** argv)
printf("srs(simple-rtmp-server) client librtmp library.\n");
printf("version: %d.%d.%d\n", srs_version_major(), srs_version_minor(), srs_version_revision());
rtmp = srs_rtmp_create("rtmp://127.0.0.1:1936/live/livestream");
rtmp = srs_rtmp_create("rtmp://127.0.0.1:1935/live/livestream");
if (srs_simple_handshake(rtmp) != 0) {
printf("simple handshake failed.\n");
... ...
... ... @@ -230,7 +230,8 @@ int SrsClient::stream_service_cycle()
return ret;
}
req->strip();
srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
srs_trace("identify client success. type=%s, stream_name=%s",
srs_client_type_string(type).c_str(), req->stream.c_str());
// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US);
... ...
... ... @@ -249,6 +249,10 @@ int srs_publish_stream(srs_rtmp_t rtmp)
srs_assert(rtmp != NULL);
Context* context = (Context*)rtmp;
if ((ret = context->rtmp->fmle_publish(context->stream, context->stream_id)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
... ...
... ... @@ -176,6 +176,17 @@ SrsResponse::~SrsResponse()
{
}
string srs_client_type_string(SrsClientType type)
{
switch (type) {
case SrsClientPlay: return "Play";
case SrsClientFlashPublish: return "FlashPublish";
case SrsClientFMLEPublish: return "FMLEPublish";
default: return "Unknown";
}
return "Unknown";
}
SrsRtmpClient::SrsRtmpClient(ISrsProtocolReaderWriter* skt)
{
io = skt;
... ... @@ -441,6 +452,87 @@ int SrsRtmpClient::publish(string stream, int stream_id)
return ret;
}
int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
{
stream_id = 0;
int ret = ERROR_SUCCESS;
// SrsFMLEStartPacket
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_release_stream(stream);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"release stream failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
}
}
// FCPublish
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_FC_publish(stream);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"FCPublish failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
}
}
// CreateStream
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
pkt->transaction_id = 4;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"createStream failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
}
}
// expect result of CreateStream
if (true) {
SrsCommonMessage* msg = NULL;
SrsCreateStreamResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect create stream response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get create stream response message");
stream_id = (int)pkt->stream_id;
}
// publish(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPublishPacket* pkt = new SrsPublishPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FMLE publish publish failed. "
"stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret);
return ret;
}
}
return ret;
}
SrsRtmpServer::SrsRtmpServer(ISrsProtocolReaderWriter* skt)
{
io = skt;
... ...
... ... @@ -106,6 +106,7 @@ enum SrsClientType
SrsClientFMLEPublish,
SrsClientFlashPublish,
};
std::string srs_client_type_string(SrsClientType type);
/**
* implements the client role protocol.
... ... @@ -137,7 +138,12 @@ public:
virtual int connect_app(std::string app, std::string tc_url);
virtual int create_stream(int& stream_id);
virtual int play(std::string stream, int stream_id);
// flash publish schema:
// connect-app => create-stream => flash-publish
virtual int publish(std::string stream, int stream_id);
// FMLE publish schema:
// connect-app => FMLE publish
virtual int fmle_publish(std::string stream, int& stream_id);
};
/**
... ...
... ... @@ -676,7 +676,7 @@ int SrsProtocol::on_send_message(ISrsMessage* msg)
SrsConnectAppPacket* pkt = NULL;
pkt = dynamic_cast<SrsConnectAppPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT;
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
... ... @@ -684,7 +684,15 @@ int SrsProtocol::on_send_message(ISrsMessage* msg)
SrsCreateStreamPacket* pkt = NULL;
pkt = dynamic_cast<SrsCreateStreamPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CREATE_STREAM;
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
if (true) {
SrsFMLEStartPacket* pkt = NULL;
pkt = dynamic_cast<SrsFMLEStartPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
... ... @@ -1291,13 +1299,19 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 response command(connect vhost/app message).");
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsConnectAppResPacket();
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 response command(createStream message).");
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsCreateStreamResPacket(0, 0);
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
|| request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
|| request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsFMLEStartResPacket(0);
return packet->decode(stream);
} else {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. "
... ... @@ -2157,6 +2171,78 @@ int SrsFMLEStartPacket::decode(SrsStream* stream)
return ret;
}
int SrsFMLEStartPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
}
int SrsFMLEStartPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsFMLEStartPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_string_size(stream_name);
}
int SrsFMLEStartPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
if ((ret = srs_amf0_write_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("encode stream_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode stream_name success.");
srs_info("encode FMLE start response packet success.");
return ret;
}
SrsFMLEStartPacket* SrsFMLEStartPacket::create_release_stream(string stream)
{
SrsFMLEStartPacket* pkt = new SrsFMLEStartPacket();
pkt->command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM;
pkt->transaction_id = 2;
pkt->stream_name = stream;
return pkt;
}
SrsFMLEStartPacket* SrsFMLEStartPacket::create_FC_publish(string stream)
{
SrsFMLEStartPacket* pkt = new SrsFMLEStartPacket();
pkt->command_name = RTMP_AMF0_COMMAND_FC_PUBLISH;
pkt->transaction_id = 3;
pkt->stream_name = stream;
return pkt;
}
SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
{
command_name = RTMP_AMF0_COMMAND_RESULT;
... ... @@ -2171,6 +2257,41 @@ SrsFMLEStartResPacket::~SrsFMLEStartResPacket()
srs_freep(args);
}
int SrsFMLEStartResPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start response command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start response command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start response transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start response command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_undefined(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start response stream_id failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode FMLE start packet success");
return ret;
}
int SrsFMLEStartResPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
... ...
... ... @@ -681,6 +681,16 @@ public:
virtual ~SrsFMLEStartPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
public:
static SrsFMLEStartPacket* create_release_stream(std::string stream);
static SrsFMLEStartPacket* create_FC_publish(std::string stream);
};
/**
* response for SrsFMLEStartPacket.
... ... @@ -703,6 +713,8 @@ public:
SrsFMLEStartResPacket(double _transaction_id);
virtual ~SrsFMLEStartResPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
... ...