winlin

for bug #251, use macro to define the fast cache and cond wait. 2.0.58

... ... @@ -598,9 +598,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// collect elapse for pithy print.
pithy_print.elapse();
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for message to incoming.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
#endif
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
... ... @@ -610,8 +612,14 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
return ret;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// we use wait to get messages, so the count must be positive.
srs_assert(count > 0);
#else
if (count <= 0) {
st_usleep(mw_sleep * 1000);
}
#endif
srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
// reportable
... ...
... ... @@ -167,6 +167,16 @@ SrsMessageQueue::~SrsMessageQueue()
clear();
}
int SrsMessageQueue::size()
{
return (int)msgs.size();
}
int SrsMessageQueue::duration()
{
return (int)(av_end_time - av_start_time);
}
void SrsMessageQueue::set_queue_size(double queue_size)
{
queue_size_ms = (int)(queue_size * 1000);
... ... @@ -297,28 +307,37 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
queue = new SrsMessageQueue();
should_update_source_id = false;
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = st_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
#endif
#ifdef SRS_PERF_QUEUE_FAST_CACHE
mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);
mw_count = 0;
mw_first_pkt = mw_last_pkt = 0;
#endif
}
SrsConsumer::~SrsConsumer()
{
#ifdef SRS_PERF_QUEUE_FAST_CACHE
if (mw_cache) {
mw_cache->free(mw_count);
mw_count = 0;
}
srs_freep(mw_cache);
#endif
source->on_consumer_destroy(this);
srs_freep(jitter);
srs_freep(queue);
#ifdef SRS_PERF_QUEUE_COND_WAIT
st_cond_destroy(mw_wait);
#endif
}
void SrsConsumer::set_queue_size(double queue_size)
... ... @@ -347,6 +366,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
}
}
#ifdef SRS_PERF_QUEUE_FAST_CACHE
// use fast cache if available
if (mw_count < mw_cache->max) {
// update fast cache timestamps
... ... @@ -371,6 +391,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
}
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
// when fast cache not overflow, always flush.
... ... @@ -385,6 +406,26 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
mw_waiting = false;
}
}
#endif
#else
if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {
return ret;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
int duration_ms = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// when duration ok, signal to flush.
if (match_min_msgs && duration_ms > mw_duration) {
st_cond_signal(mw_wait);
mw_waiting = false;
}
}
#endif
#endif
return ret;
}
... ... @@ -405,6 +446,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
return ret;
}
#ifdef SRS_PERF_QUEUE_FAST_CACHE
// only dumps an whole array to msgs.
for (int i = 0; i < mw_count; i++) {
msgs->msgs[i] = mw_cache->msgs[i];
... ... @@ -420,13 +462,26 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
}
return dumps_queue_to_fast_cache();
#else
// pump msgs from queue.
int nb_msgs = 0;
if ((ret = queue->dump_packets(msgs->max, msgs->msgs, nb_msgs)) != ERROR_SUCCESS) {
return ret;
}
*count = nb_msgs;
return ret;
#endif
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsConsumer::wait(int nb_msgs, int duration)
{
mw_min_msgs = nb_msgs;
mw_duration = duration;
#ifdef SRS_PERF_QUEUE_FAST_CACHE
// when fast cache not overflow, always flush.
// so we donot care about the queue.
bool fast_cache_overflow = mw_count >= mw_cache->max;
... ... @@ -437,12 +492,22 @@ void SrsConsumer::wait(int nb_msgs, int duration)
if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
return;
}
#else
int duration_ms = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// when duration ok, signal to flush.
if (match_min_msgs && duration_ms > mw_duration) {
return;
}
#endif
// the enqueue will notify this cond.
mw_waiting = true;
// wait for msgs to incoming.
st_cond_wait(mw_wait);
}
#endif
int SrsConsumer::on_play_client_pause(bool is_pause)
{
... ... @@ -454,6 +519,7 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
return ret;
}
#ifdef SRS_PERF_QUEUE_FAST_CACHE
int SrsConsumer::dumps_queue_to_fast_cache()
{
int ret =ERROR_SUCCESS;
... ... @@ -473,6 +539,7 @@ int SrsConsumer::dumps_queue_to_fast_cache()
return ret;
}
#endif
SrsGopCache::SrsGopCache()
{
... ...
... ... @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_st.hpp>
#include <srs_app_reload.hpp>
#include <srs_core_performance.hpp>
class SrsPlayEdge;
class SrsPublishEdge;
... ... @@ -117,6 +118,14 @@ public:
virtual ~SrsMessageQueue();
public:
/**
* get the size of queue.
*/
virtual int size();
/**
* get the duration of queue.
*/
virtual int duration();
/**
* set the queue size
* @param queue_size the queue size in seconds.
*/
... ... @@ -156,12 +165,15 @@ private:
bool paused;
// when source id changed, notice all consumers
bool should_update_source_id;
#ifdef SRS_PERF_QUEUE_COND_WAIT
// the cond wait for mw.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
st_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
int mw_duration;
#endif
#ifdef SRS_PERF_QUEUE_FAST_CACHE
// use fast cache for msgs
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
SrsMessageArray* mw_cache;
... ... @@ -170,6 +182,7 @@ private:
// the packet time in fast cache.
int64_t mw_first_pkt;
int64_t mw_last_pkt;
#endif
public:
SrsConsumer(SrsSource* _source);
virtual ~SrsConsumer();
... ... @@ -204,22 +217,26 @@ public:
* @max_count the max count to dequeue, must be positive.
*/
virtual int dump_packets(SrsMessageArray* msgs, int* count);
#ifdef SRS_PERF_QUEUE_COND_WAIT
/**
* wait for messages incomming, atleast nb_msgs and in duration.
* @param nb_msgs the messages count to wait.
* @param duration the messgae duration to wait.
*/
virtual void wait(int nb_msgs, int duration);
#endif
/**
* when client send the pause message.
*/
virtual int on_play_client_pause(bool is_pause);
private:
#ifdef SRS_PERF_QUEUE_FAST_CACHE
/**
* dumps the queue to fast cache,
* when fast cache is clear or queue is overflow.
*/
virtual int dumps_queue_to_fast_cache();
#endif
};
/**
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 57
#define VERSION_REVISION 58
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
... ...
... ... @@ -110,6 +110,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_PERF_MW_MIN_MSGS 32
/**
* whether enable the fast cache.
* @remark this improve performance for large connectios.
* @remark this also introduce complex, default to disable it.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
*/
#undef SRS_PERF_QUEUE_FAST_CACHE
/**
* whether use cond wait to send messages.
* @remark this improve performance for large connectios.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
*/
#undef SRS_PERF_QUEUE_COND_WAIT
/**
* how many chunk stream to cache, [0, N].
* to imporove about 10% performance when chunk size small, and 5% for large chunk.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/249
... ...