winlin

basic media cache framework

... ... @@ -86,7 +86,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server"
"srs_core_error" "srs_core_conn" "srs_core_client"
"srs_core_rtmp" "srs_core_socket" "srs_core_buffer"
"srs_core_auto_free" "srs_core_protocol" "srs_core_amf0"
"srs_core_stream")
"srs_core_stream" "srs_core_source")
MODULE_DIR="src/core" . auto/modules.sh
CORE_OBJS="${MODULE_OBJS[@]}"
... ...
... ... @@ -30,6 +30,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_rtmp.hpp>
#include <srs_core_protocol.hpp>
#include <srs_core_auto_free.hpp>
#include <srs_core_source.hpp>
// wait for client message.
#define SRS_PULSE_TIME_MS 100
SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd)
... ... @@ -113,6 +117,11 @@ int SrsClient::do_cycle()
}
srs_verbose("set chunk size success");
// find a source to publish.
SrsSource* source = SrsSource::find(req->get_stream_url());
srs_assert(source != NULL);
srs_info("source found, url=%s", req->get_stream_url().c_str());
switch (type) {
case SrsClientPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str());
... ... @@ -122,7 +131,7 @@ int SrsClient::do_cycle()
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str());
return streaming_play();
return streaming_play(source);
}
case SrsClientPublish: {
srs_verbose("start to publish stream %s.", req->stream.c_str());
... ... @@ -132,7 +141,7 @@ int SrsClient::do_cycle()
return ret;
}
srs_info("start to publish stream %s success", req->stream.c_str());
return streaming_publish();
return streaming_publish(source);
}
default: {
ret = ERROR_SYSTEM_CLIENT_INVALID;
... ... @@ -144,13 +153,58 @@ int SrsClient::do_cycle()
return ret;
}
int SrsClient::streaming_play()
int SrsClient::streaming_play(SrsSource* source)
{
int ret = ERROR_SUCCESS;
SrsConsumer* consumer = source->create_consumer();
srs_assert(consumer != NULL);
SrsAutoFree(SrsConsumer, consumer, false);
srs_verbose("consumer created.");
while (true) {
bool ready = false;
if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) {
srs_error("wait client control message failed. ret=%d", ret);
return ret;
}
srs_verbose("client pulse %dms, ready=%d", SRS_PULSE_TIME_MS, ready);
// read from client.
if (ready) {
SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client control message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsMessage, msg, false);
// TODO: process it.
}
// get messages from consumer.
SrsMessage** msgs = NULL;
int count = 0;
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsMessage*, msgs, true);
// sendout messages
for (int i = 0; i < count; i++) {
SrsMessage* msg = msgs[i];
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret);
return ret;
}
}
}
return ret;
}
int SrsClient::streaming_publish()
int SrsClient::streaming_publish(SrsSource* source)
{
int ret = ERROR_SUCCESS;
... ... @@ -163,6 +217,17 @@ int SrsClient::streaming_publish()
SrsAutoFree(SrsMessage, msg, false);
// process audio packet
if (msg->header.is_audio() && ((ret = source->on_audio(msg)) != ERROR_SUCCESS)) {
srs_error("process audio message failed. ret=%d", ret);
return ret;
}
// process video packet
if (msg->header.is_video() && ((ret = source->on_video(msg)) != ERROR_SUCCESS)) {
srs_error("process video message failed. ret=%d", ret);
return ret;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
... ... @@ -173,6 +238,12 @@ int SrsClient::streaming_publish()
SrsPacket* pkt = msg->get_packet();
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_trace("process onMetaData message success.");
continue;
}
srs_trace("ignore AMF0/AMF3 data message.");
... ...
... ... @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsRtmp;
class SrsRequest;
class SrsResponse;
class SrsSource;
/**
* the client provides the main logic control for RTMP clients.
... ... @@ -52,8 +53,8 @@ public:
protected:
virtual int do_cycle();
private:
virtual int streaming_play();
virtual int streaming_publish();
virtual int streaming_play(SrsSource* source);
virtual int streaming_publish(SrsSource* source);
virtual int get_peer_ip();
};
... ...
... ... @@ -48,6 +48,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SOCKET_READ 207
#define ERROR_SOCKET_READ_FULLY 208
#define ERROR_SOCKET_WRITE 209
#define ERROR_SOCKET_WAIT 210
#define ERROR_RTMP_PLAIN_REQUIRED 300
#define ERROR_RTMP_CHUNK_START 301
... ...
... ... @@ -273,6 +273,11 @@ SrsProtocol::~SrsProtocol()
srs_freep(skt);
}
int SrsProtocol::can_read(int timeout_ms, bool& ready)
{
return skt->can_read(timeout_ms, ready);
}
int SrsProtocol::recv_message(SrsMessage** pmsg)
{
*pmsg = NULL;
... ... @@ -858,6 +863,16 @@ SrsMessageHeader::~SrsMessageHeader()
{
}
bool SrsMessageHeader::is_audio()
{
return message_type == RTMP_MSG_AudioMessage;
}
bool SrsMessageHeader::is_video()
{
return message_type == RTMP_MSG_VideoMessage;
}
bool SrsMessageHeader::is_amf0_command()
{
return message_type == RTMP_MSG_AMF0CommandMessage;
... ...
... ... @@ -94,6 +94,10 @@ public:
virtual ~SrsProtocol();
public:
/**
* whether the peer can read.
*/
virtual int can_read(int timeout_ms, bool& ready);
/**
* recv a message with raw/undecoded payload from peer.
* the payload is not decoded, use srs_rtmp_expect_message<T> if requires
* specifies message.
... ... @@ -179,6 +183,8 @@ struct SrsMessageHeader
SrsMessageHeader();
virtual ~SrsMessageHeader();
bool is_audio();
bool is_video();
bool is_amf0_command();
bool is_amf0_data();
bool is_amf3_command();
... ...
... ... @@ -112,6 +112,16 @@ int SrsRequest::discovery_app()
return ret;
}
std::string SrsRequest::get_stream_url()
{
std::string url = vhost;
url += app;
url += stream;
return url;
}
SrsResponse::SrsResponse()
{
stream_id = SRS_DEFAULT_SID;
... ... @@ -132,6 +142,21 @@ SrsRtmp::~SrsRtmp()
srs_freep(protocol);
}
int SrsRtmp::recv_message(SrsMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
int SrsRtmp::can_read(int timeout_ms, bool& ready)
{
return protocol->can_read(timeout_ms, ready);
}
int SrsRtmp::send_message(SrsMessage* msg)
{
return protocol->send_message(msg);
}
int SrsRtmp::handshake()
{
int ret = ERROR_SUCCESS;
... ... @@ -178,11 +203,6 @@ int SrsRtmp::handshake()
return ret;
}
int SrsRtmp::recv_message(SrsMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
int SrsRtmp::connect_app(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -62,6 +62,7 @@ struct SrsRequest
* disconvery vhost/app from tcUrl.
*/
virtual int discovery_app();
virtual std::string get_stream_url();
};
/**
... ... @@ -99,8 +100,11 @@ public:
SrsRtmp(st_netfd_t client_stfd);
virtual ~SrsRtmp();
public:
virtual int handshake();
virtual int recv_message(SrsMessage** pmsg);
virtual int can_read(int timeout_ms, bool& ready);
virtual int send_message(SrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(SrsRequest* req);
virtual int set_window_ack_size(int ack_size);
/**
... ...
... ... @@ -34,6 +34,26 @@ SrsSocket::~SrsSocket()
{
}
int SrsSocket::can_read(int timeout_ms, bool& ready)
{
ready = false;
int ret = ERROR_SUCCESS;
// If the named file descriptor object is ready for I/O within the specified amount of time,
// a value of 0 is returned. Otherwise, a value of -1 is returned and errno is set to
// indicate the error
if(st_netfd_poll(stfd, POLLIN, timeout_ms * 1000) == -1){
if(errno == ETIME){
return ret;
}
return ERROR_SOCKET_WAIT;
}
ready = true;
return ret;
}
int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -44,6 +44,7 @@ public:
SrsSocket(st_netfd_t client_stfd);
virtual ~SrsSocket();
public:
virtual int can_read(int timeout_ms, bool& ready);
virtual int read(const void* buf, size_t size, ssize_t* nread);
virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
virtual int write(const void* buf, size_t size, ssize_t* nwrite);
... ...
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_core_source.hpp>
#include <srs_core_log.hpp>
#include <srs_core_protocol.hpp>
std::map<std::string, SrsSource*> SrsSource::pool;
SrsSource* SrsSource::find(std::string stream_url)
{
if (pool.find(stream_url) == pool.end()) {
pool[stream_url] = new SrsSource(stream_url);
srs_verbose("create new source for url=%s", stream_url.c_str());
}
return pool[stream_url];
}
SrsConsumer::SrsConsumer()
{
}
SrsConsumer::~SrsConsumer()
{
}
int SrsConsumer::get_packets(int max_count, SrsMessage**& msgs, int& count)
{
msgs = NULL;
count = 0;
int ret = ERROR_SUCCESS;
return ret;
}
SrsSource::SrsSource(std::string _stream_url)
{
stream_url = _stream_url;
}
SrsSource::~SrsSource()
{
}
int SrsSource::on_meta_data(SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsSource::on_audio(SrsMessage* audio)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsSource::on_video(SrsMessage* audio)
{
int ret = ERROR_SUCCESS;
return ret;
}
SrsConsumer* SrsSource::create_consumer()
{
SrsConsumer* consumer = new SrsConsumer();
return consumer;
}
... ...
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_CORE_SOURCE_HPP
#define SRS_CORE_SOURCE_HPP
/*
#include <srs_core_source.hpp>
*/
#include <srs_core.hpp>
#include <map>
#include <string>
class SrsMessage;
class SrsOnMetaDataPacket;
/**
* the consumer for SrsSource, that is a play client.
*/
class SrsConsumer
{
public:
SrsConsumer();
virtual ~SrsConsumer();
public:
/**
* get packets in consumer queue.
* @msgs SrsMessages*[], output the prt array.
* @count the count in array.
* @max_count the max count to dequeue, 0 to dequeue all.
*/
virtual int get_packets(int max_count, SrsMessage**& msgs, int& count);
};
/**
* live streaming source.
*/
class SrsSource
{
private:
static std::map<std::string, SrsSource*> pool;
public:
/**
* find stream by vhost/app/stream.
* @stream_url the stream url, for example, myserver.xxx.com/app/stream
* @return the matched source, never be NULL.
* @remark stream_url should without port and schema.
*/
static SrsSource* find(std::string stream_url);
private:
std::string stream_url;
public:
SrsSource(std::string _stream_url);
virtual ~SrsSource();
public:
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsMessage* audio);
virtual int on_video(SrsMessage* video);
public:
virtual SrsConsumer* create_consumer();
};
#endif
\ No newline at end of file
... ...
... ... @@ -14,6 +14,8 @@ file
..\core\srs_core_conn.cpp,
..\core\srs_core_client.hpp,
..\core\srs_core_client.cpp,
..\core\srs_core_source.hpp,
..\core\srs_core_source.cpp,
..\core\srs_core_rtmp.hpp,
..\core\srs_core_rtmp.cpp,
..\core\srs_core_protocol.hpp,
... ...