winlin

for #293, add http stream cache for audio mp3/aac stream.

... ... @@ -140,6 +140,36 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
return ret;
}
SrsStreamCache::SrsStreamCache(SrsSource* s)
{
source = s;
pthread = new SrsThread("http-stream",
this, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US, false);
}
SrsStreamCache::~SrsStreamCache()
{
pthread->stop();
srs_freep(pthread);
}
int SrsStreamCache::start()
{
return pthread->start();
}
int SrsStreamCache::dump_cache(SrsConsumer* consumer)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsStreamCache::cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
ISrsStreamEncoder::ISrsStreamEncoder()
{
}
... ... @@ -158,7 +188,7 @@ SrsFlvStreamEncoder::~SrsFlvStreamEncoder()
srs_freep(enc);
}
int SrsFlvStreamEncoder::initialize(SrsFileWriter* w)
int SrsFlvStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/)
{
int ret = ERROR_SUCCESS;
... ... @@ -189,9 +219,22 @@ int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size)
return enc->write_metadata(timestamp, data, size);
}
bool SrsFlvStreamEncoder::has_cache()
{
// for flv stream, use gop cache of SrsSource is ok.
return false;
}
int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/)
{
// for flv stream, ignore cache.
return ERROR_SUCCESS;
}
SrsAacStreamEncoder::SrsAacStreamEncoder()
{
enc = new SrsAacEncoder();
cache = NULL;
}
SrsAacStreamEncoder::~SrsAacStreamEncoder()
... ... @@ -199,10 +242,12 @@ SrsAacStreamEncoder::~SrsAacStreamEncoder()
srs_freep(enc);
}
int SrsAacStreamEncoder::initialize(SrsFileWriter* w)
int SrsAacStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c)
{
int ret = ERROR_SUCCESS;
cache = c;
if ((ret = enc->initialize(w)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -227,9 +272,21 @@ int SrsAacStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i
return ERROR_SUCCESS;
}
bool SrsAacStreamEncoder::has_cache()
{
return true;
}
int SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer)
{
srs_assert(cache);
return cache->dump_cache(consumer);
}
SrsMp3StreamEncoder::SrsMp3StreamEncoder()
{
enc = new SrsMp3Encoder();
cache = NULL;
}
SrsMp3StreamEncoder::~SrsMp3StreamEncoder()
... ... @@ -237,10 +294,12 @@ SrsMp3StreamEncoder::~SrsMp3StreamEncoder()
srs_freep(enc);
}
int SrsMp3StreamEncoder::initialize(SrsFileWriter* w)
int SrsMp3StreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c)
{
int ret = ERROR_SUCCESS;
cache = c;
if ((ret = enc->initialize(w)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -269,6 +328,17 @@ int SrsMp3StreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i
return ERROR_SUCCESS;
}
bool SrsMp3StreamEncoder::has_cache()
{
return true;
}
int SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer)
{
srs_assert(cache);
return cache->dump_cache(consumer);
}
SrsStreamWriter::SrsStreamWriter(ISrsGoHttpResponseWriter* w)
{
writer = w;
... ... @@ -305,9 +375,10 @@ int SrsStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite)
return writer->write((char*)buf, (int)count);
}
SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r)
SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c)
{
source = s;
cache = c;
req = r->copy();
}
... ... @@ -339,9 +410,9 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
}
SrsAutoFree(ISrsStreamEncoder, enc);
// create consumer of souce.
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
if ((ret = source->create_consumer(consumer, !enc->has_cache())) != ERROR_SUCCESS) {
srs_error("http: create consumer failed. ret=%d", ret);
return ret;
}
... ... @@ -353,10 +424,19 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
// the memory writer.
SrsStreamWriter writer(w);
if ((ret = enc->initialize(&writer)) != ERROR_SUCCESS) {
if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) {
srs_error("http: initialize stream encoder failed. ret=%d", ret);
return ret;
}
// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((ret = enc->dump_cache(consumer)) != ERROR_SUCCESS) {
srs_error("http: dump cache to consumer failed. ret=%d", ret);
return ret;
}
}
while (true) {
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
... ... @@ -424,6 +504,7 @@ int SrsLiveStream::streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrM
SrsLiveEntry::SrsLiveEntry()
{
stream = NULL;
cache = NULL;
}
SrsHttpServer::SrsHttpServer()
... ... @@ -485,7 +566,14 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
entry->stream = new SrsLiveStream(s, r);
entry->cache = new SrsStreamCache(s);
entry->stream = new SrsLiveStream(s, r, entry->cache);
// start http stream cache thread
if ((ret = entry->cache->start()) != ERROR_SUCCESS) {
srs_error("http: start stream cache failed. ret=%d", ret);
return ret;
}
// mount the http flv stream.
if ((ret = mux.handle(mount, entry->stream)) != ERROR_SUCCESS) {
... ...
... ... @@ -37,9 +37,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_http.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_thread.hpp>
class SrsSource;
class SrsRequest;
class SrsConsumer;
class SrsStSocket;
class SrsAacEncoder;
class SrsMp3Encoder;
... ... @@ -65,6 +67,27 @@ protected:
};
/**
* for the srs http stream cache,
* for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue.
*/
class SrsStreamCache : public ISrsThreadHandler
{
private:
SrsSource* source;
SrsThread* pthread;
public:
SrsStreamCache(SrsSource* s);
virtual ~SrsStreamCache();
public:
virtual int start();
virtual int dump_cache(SrsConsumer* consumer);
// interface ISrsThreadHandler.
public:
virtual int cycle();
};
/**
* the stream encoder in some codec, for example, flv or aac.
*/
class ISrsStreamEncoder
... ... @@ -73,10 +96,29 @@ public:
ISrsStreamEncoder();
virtual ~ISrsStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w) = 0;
/**
* initialize the encoder with file writer(to http response) and stream cache.
* @param w the writer to write to http response.
* @param c the stream cache for audio stream fast startup.
*/
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c) = 0;
/**
* write rtmp video/audio/metadata.
*/
virtual int write_audio(int64_t timestamp, char* data, int size) = 0;
virtual int write_video(int64_t timestamp, char* data, int size) = 0;
virtual int write_metadata(int64_t timestamp, char* data, int size) = 0;
public:
/**
* for some stream, for example, mp3 and aac, the audio stream,
* we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio.
* @return true to use gop cache of encoder; otherwise, use SrsSource.
*/
virtual bool has_cache() = 0;
/**
* dumps the cache of encoder to consumer.
*/
virtual int dump_cache(SrsConsumer* consumer) = 0;
};
/**
... ... @@ -90,10 +132,13 @@ public:
SrsFlvStreamEncoder();
virtual ~SrsFlvStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};
/**
... ... @@ -103,14 +148,18 @@ class SrsAacStreamEncoder : public ISrsStreamEncoder
{
private:
SrsAacEncoder* enc;
SrsStreamCache* cache;
public:
SrsAacStreamEncoder();
virtual ~SrsAacStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};
/**
... ... @@ -120,14 +169,18 @@ class SrsMp3StreamEncoder : public ISrsStreamEncoder
{
private:
SrsMp3Encoder* enc;
SrsStreamCache* cache;
public:
SrsMp3StreamEncoder();
virtual ~SrsMp3StreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};
/**
... ... @@ -159,8 +212,9 @@ class SrsLiveStream : public ISrsGoHttpHandler
private:
SrsRequest* req;
SrsSource* source;
SrsStreamCache* cache;
public:
SrsLiveStream(SrsSource* s, SrsRequest* r);
SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c);
virtual ~SrsLiveStream();
public:
virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
... ... @@ -176,6 +230,7 @@ struct SrsLiveEntry
std::string vhost;
std::string mount;
SrsLiveStream* stream;
SrsStreamCache* cache;
SrsLiveEntry();
};
... ...
... ... @@ -1702,7 +1702,7 @@ void SrsSource::on_unpublish()
handler->on_unpublish(this, _req);
}
int SrsSource::create_consumer(SrsConsumer*& consumer)
int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache)
{
int ret = ERROR_SUCCESS;
... ... @@ -1750,12 +1750,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer)
srs_info("dispatch audio sequence header success");
// copy gop cache to client.
if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
return ret;
if (dump_gop_cache) {
if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
} else {
srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate);
}
srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
return ret;
}
... ...
... ... @@ -494,7 +494,7 @@ public:
virtual void on_unpublish();
// consumer methods
public:
virtual int create_consumer(SrsConsumer*& consumer);
virtual int create_consumer(SrsConsumer*& consumer, bool dump_gop_cache = true);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
// internal
... ...