winlin

support pause for live stream.

... ... @@ -48,6 +48,7 @@ url: rtmp://127.0.0.1:1935/live/livestream
* nginx v1.5.0: 139524 lines <br/>
### History
* v0.4, 2013-11-09, support pause for live stream.
* v0.3, 2013-11-04, v0.3 released. 11773 lines.
* v0.3, 2013-11-04, support refer/play-refer/publish-refer.
* v0.3, 2013-11-04, support vhosts specified config.
... ...
... ... @@ -63,7 +63,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
(void)0
// current release version
#define RTMP_SIG_SRS_VERSION "0.2.0"
#define RTMP_SIG_SRS_VERSION "0.3.0"
// server info.
#define RTMP_SIG_SRS_KEY "srs"
#define RTMP_SIG_SRS_ROLE "origin server"
... ...
... ... @@ -270,10 +270,9 @@ int SrsClient::playing(SrsSource* source)
srs_error("recv client control message failed. ret=%d", ret);
return ret;
}
if (ret == ERROR_SUCCESS && !msg) {
srs_info("play loop got a message.");
SrsAutoFree(SrsCommonMessage, msg, false);
// TODO: process it.
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
srs_error("process play control message failed. ret=%d", ret);
return ret;
}
}
... ... @@ -442,3 +441,44 @@ int SrsClient::get_peer_ip()
return ret;
}
int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
if (!msg) {
srs_verbose("ignore all empty message.");
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
srs_info("ignore all message except amf0/amf3 command.");
return ret;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret);
return ret;
}
srs_info("decode the amf0/amf3 command packet success.");
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(msg->get_packet());
if (!pause) {
srs_info("ignore all amf0/amf3 command except pause.");
return ret;
}
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret;
}
if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
srs_error("consumer process play client pause failed. ret=%d", ret);
return ret;
}
srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
return ret;
}
... ...
... ... @@ -37,6 +37,8 @@ class SrsRequest;
class SrsResponse;
class SrsSource;
class SrsRefer;
class SrsConsumer;
class SrsCommonMessage;
/**
* the client provides the main logic control for RTMP clients.
... ... @@ -59,6 +61,7 @@ private:
virtual int playing(SrsSource* source);
virtual int publish(SrsSource* source, bool is_fmle);
virtual int get_peer_ip();
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
};
#endif
\ No newline at end of file
... ...
... ... @@ -195,6 +195,7 @@ messages.
#define RTMP_AMF0_COMMAND_CONNECT "connect"
#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
#define RTMP_AMF0_COMMAND_PLAY "play"
#define RTMP_AMF0_COMMAND_PAUSE "pause"
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
... ... @@ -1219,6 +1220,10 @@ int SrsCommonMessage::decode_packet()
srs_info("decode the AMF0/AMF3 command(paly message).");
packet = new SrsPlayPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PAUSE) {
srs_info("decode the AMF0/AMF3 command(pause message).");
packet = new SrsPausePacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
packet = new SrsFMLEStartPacket();
... ... @@ -1896,6 +1901,61 @@ int SrsPublishPacket::decode(SrsStream* stream)
return ret;
}
SrsPausePacket::SrsPausePacket()
{
command_name = RTMP_AMF0_COMMAND_PAUSE;
transaction_id = 0;
command_object = new SrsAmf0Null();
time_ms = 0;
is_pause = true;
}
SrsPausePacket::~SrsPausePacket()
{
srs_freep(command_object);
}
int SrsPausePacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode pause command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PAUSE) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode pause command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode pause transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode pause command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_boolean(stream, is_pause)) != ERROR_SUCCESS) {
srs_error("amf0 decode pause is_pause failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, time_ms)) != ERROR_SUCCESS) {
srs_error("amf0 decode pause time_ms failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode pause packet success");
return ret;
}
SrsPlayPacket::SrsPlayPacket()
{
command_name = RTMP_AMF0_COMMAND_PLAY;
... ...
... ... @@ -637,6 +637,33 @@ public:
};
/**
* 4.2.8. pause
* The client sends the pause command to tell the server to pause or
* start playing.
*/
class SrsPausePacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsPausePacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* command_object;
bool is_pause;
double time_ms;
public:
SrsPausePacket();
virtual ~SrsPausePacket();
public:
virtual int decode(SrsStream* stream);
};
/**
* 4.2.1. play
* The client sends this command to the server to play a stream.
*/
... ... @@ -960,12 +987,12 @@ protected:
enum SrcPCUCEventType
{
// generally, 4bytes event-data
SrcPCUCStreamBegin = 0x00,
SrcPCUCStreamBegin = 0x00,
SrcPCUCStreamEOF = 0x01,
SrcPCUCStreamDry = 0x02,
SrcPCUCSetBufferLength = 0x03, // 8bytes event-data
SrcPCUCSetBufferLength = 0x03, // 8bytes event-data
SrcPCUCStreamIsRecorded = 0x04,
SrcPCUCPingRequest = 0x06,
SrcPCUCPingRequest = 0x06,
SrcPCUCPingResponse = 0x07,
};
... ...
... ... @@ -34,34 +34,36 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
/**
* the signature for packets to client.
*/
#define RTMP_SIG_FMS_VER "3,5,3,888"
#define RTMP_SIG_AMF0_VER 0
#define RTMP_SIG_CLIENT_ID "ASAICiss"
#define RTMP_SIG_FMS_VER "3,5,3,888"
#define RTMP_SIG_AMF0_VER 0
#define RTMP_SIG_CLIENT_ID "ASAICiss"
/**
* onStatus consts.
*/
#define StatusLevel "level"
#define StatusCode "code"
#define StatusDescription "description"
#define StatusDetails "details"
#define StatusClientId "clientid"
#define StatusLevel "level"
#define StatusCode "code"
#define StatusDescription "description"
#define StatusDetails "details"
#define StatusClientId "clientid"
// status value
#define StatusLevelStatus "status"
#define StatusLevelStatus "status"
// code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeStreamPause "NetStream.Pause.Notify"
#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"
#define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
// FMLE
#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"
// default stream id for response the createStream request.
#define SRS_DEFAULT_SID 1
#define SRS_DEFAULT_SID 1
SrsRequest::SrsRequest()
{
... ... @@ -570,6 +572,81 @@ int SrsRtmp::start_play(int stream_id)
return ret;
}
int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause)
{
int ret = ERROR_SUCCESS;
if (is_pause) {
// onStatus(NetStream.Pause.Notify)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause));
pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream."));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Pause.Notify) message success.");
}
// StreamEOF
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCStreamEOF;
pkt->event_data = stream_id;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret);
return ret;
}
srs_info("send PCUC(StreamEOF) message success.");
}
} else {
// onStatus(NetStream.Unpause.Notify)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause));
pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream."));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Unpause.Notify) message success.");
}
// StreanBegin
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCStreamBegin;
pkt->event_data = stream_id;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret);
return ret;
}
srs_info("send PCUC(StreanBegin) message success.");
}
}
return ret;
}
int SrsRtmp::start_fmle_publish(int stream_id)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -148,6 +148,16 @@ public:
*/
virtual int start_play(int stream_id);
/**
* when client(type is play) send pause message,
* if is_pause, response the following packets:
* onStatus(NetStream.Pause.Notify)
* StreamEOF
* if not is_pause, response the following packets:
* onStatus(NetStream.Unpause.Notify)
* StreamBegin
*/
virtual int on_play_client_pause(int stream_id, bool is_pause);
/**
* when client type is publish, response with packets:
* releaseStream response
* FCPublish
... ...
... ... @@ -31,8 +31,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_amf0.hpp>
#include <srs_core_codec.hpp>
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 10
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 10
#define PAUSED_SHRINK_SIZE 250
std::map<std::string, SrsSource*> SrsSource::pool;
... ... @@ -50,16 +51,14 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
{
source = _source;
last_pkt_correct_time = last_pkt_time = 0;
paused = false;
codec = new SrsCodec();
}
SrsConsumer::~SrsConsumer()
{
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
msgs.clear();
clear();
srs_freep(codec);
source->on_consumer_destroy(this);
}
... ... @@ -89,6 +88,13 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
if (msgs.empty()) {
return ret;
}
if (paused) {
if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) {
shrink();
}
return ret;
}
if (max_count == 0) {
count = (int)msgs.size();
... ... @@ -111,6 +117,68 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
return ret;
}
int SrsConsumer::on_play_client_pause(bool is_pause)
{
int ret = ERROR_SUCCESS;
srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);
paused = is_pause;
return ret;
}
void SrsConsumer::shrink()
{
int i = 0;
std::vector<SrsSharedPtrMessage*>::iterator it;
// issue the last video iframe.
bool has_video = false;
int frame_to_remove = 0;
std::vector<SrsSharedPtrMessage*>::iterator iframe = msgs.end();
for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) {
SrsSharedPtrMessage* msg = *it;
if (msg->header.is_video()) {
has_video = true;
if (codec->video_is_keyframe(msg->payload, msg->size)) {
iframe = it;
frame_to_remove = i + 1;
}
}
}
// last iframe is the first elem, ignore it.
if (iframe == msgs.begin()) {
return;
}
// recalc the frame to remove
if (iframe == msgs.end()) {
frame_to_remove = 0;
}
if (!has_video) {
frame_to_remove = (int)msgs.size();
}
srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d",
has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove);
// if no video, remove all audio.
if (!has_video) {
clear();
return;
}
// if exists video Iframe, remove the frames before it.
if (iframe != msgs.end()) {
for (it = msgs.begin(); it != iframe; ++it) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
msgs.erase(msgs.begin(), iframe);
}
}
int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate)
{
int ret = ERROR_SUCCESS;
... ... @@ -156,6 +224,16 @@ int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate)
return ret;
}
void SrsConsumer::clear()
{
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
msgs.clear();
}
SrsSource::SrsSource(std::string _stream_url)
{
stream_url = _stream_url;
... ...
... ... @@ -50,6 +50,8 @@ private:
int32_t last_pkt_correct_time;
SrsSource* source;
std::vector<SrsSharedPtrMessage*> msgs;
bool paused;
SrsCodec* codec;
public:
SrsConsumer(SrsSource* _source);
virtual ~SrsConsumer();
... ... @@ -69,11 +71,21 @@ public:
* @max_count the max count to dequeue, 0 to dequeue all.
*/
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
/**
* when client send the pause message.
*/
virtual int on_play_client_pause(bool is_pause);
private:
/**
* when paused, shrink the cache queue,
* remove to cache only one gop.
*/
virtual void shrink();
/**
* detect the time jitter and correct it.
*/
virtual int jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate);
virtual void clear();
};
/**
... ...