winlin

for bug #293, use http stream cache for android weixin to happy.

@@ -143,14 +143,16 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* @@ -143,14 +143,16 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
143 SrsStreamCache::SrsStreamCache(SrsSource* s) 143 SrsStreamCache::SrsStreamCache(SrsSource* s)
144 { 144 {
145 source = s; 145 source = s;
146 - pthread = new SrsThread("http-stream",  
147 - this, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US, false); 146 + queue = new SrsMessageQueue(true);
  147 + pthread = new SrsThread("http-stream", this, 0, false);
148 } 148 }
149 149
150 SrsStreamCache::~SrsStreamCache() 150 SrsStreamCache::~SrsStreamCache()
151 { 151 {
152 pthread->stop(); 152 pthread->stop();
153 srs_freep(pthread); 153 srs_freep(pthread);
  154 +
  155 + srs_freep(queue);
154 } 156 }
155 157
156 int SrsStreamCache::start() 158 int SrsStreamCache::start()
@@ -161,12 +163,60 @@ int SrsStreamCache::start() @@ -161,12 +163,60 @@ int SrsStreamCache::start()
161 int SrsStreamCache::dump_cache(SrsConsumer* consumer) 163 int SrsStreamCache::dump_cache(SrsConsumer* consumer)
162 { 164 {
163 int ret = ERROR_SUCCESS; 165 int ret = ERROR_SUCCESS;
  166 +
  167 + if ((ret = queue->dump_packets(consumer, false, 0, 0, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) {
  168 + return ret;
  169 + }
  170 +
  171 + srs_trace("http: dump cache %d msgs, duration=%dms", queue->size(), queue->duration());
  172 +
164 return ret; 173 return ret;
165 } 174 }
166 175
167 int SrsStreamCache::cycle() 176 int SrsStreamCache::cycle()
168 { 177 {
169 int ret = ERROR_SUCCESS; 178 int ret = ERROR_SUCCESS;
  179 +
  180 + SrsConsumer* consumer = NULL;
  181 + if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) {
  182 + srs_error("http: create consumer failed. ret=%d", ret);
  183 + return ret;
  184 + }
  185 + SrsAutoFree(SrsConsumer, consumer);
  186 +
  187 + SrsMessageArray msgs(SRS_PERF_MW_MSGS);
  188 + // TODO: FIMXE: add pithy print.
  189 +
  190 + // TODO: FIXME: config it.
  191 + queue->set_queue_size(60);
  192 +
  193 + while (true) {
  194 + // get messages from consumer.
  195 + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
  196 + int count = 0;
  197 + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
  198 + srs_error("http: get messages from consumer failed. ret=%d", ret);
  199 + return ret;
  200 + }
  201 +
  202 + if (count <= 0) {
  203 + srs_info("http: mw sleep %dms for no msg", mw_sleep);
  204 + // directly use sleep, donot use consumer wait.
  205 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  206 +
  207 + // ignore when nothing got.
  208 + continue;
  209 + }
  210 + srs_info("http: got %d msgs, min=%d, mw=%d", count,
  211 + SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
  212 +
  213 + // free the messages.
  214 + for (int i = 0; i < count; i++) {
  215 + SrsSharedPtrMessage* msg = msgs.msgs[i];
  216 + queue->enqueue(msg);
  217 + }
  218 + }
  219 +
170 return ret; 220 return ret;
171 } 221 }
172 222
@@ -412,7 +462,7 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) @@ -412,7 +462,7 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
412 462
413 // create consumer of souce, ignore gop cache, use the audio gop cache. 463 // create consumer of souce, ignore gop cache, use the audio gop cache.
414 SrsConsumer* consumer = NULL; 464 SrsConsumer* consumer = NULL;
415 - if ((ret = source->create_consumer(consumer, !enc->has_cache())) != ERROR_SUCCESS) { 465 + if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) {
416 srs_error("http: create consumer failed. ret=%d", ret); 466 srs_error("http: create consumer failed. ret=%d", ret);
417 return ret; 467 return ret;
418 } 468 }
@@ -49,6 +49,7 @@ class SrsFlvEncoder; @@ -49,6 +49,7 @@ class SrsFlvEncoder;
49 class SrsHttpParser; 49 class SrsHttpParser;
50 class SrsHttpMessage; 50 class SrsHttpMessage;
51 class SrsHttpHandler; 51 class SrsHttpHandler;
  52 +class SrsMessageQueue;
52 class SrsSharedPtrMessage; 53 class SrsSharedPtrMessage;
53 54
54 /** 55 /**
@@ -74,6 +75,7 @@ protected: @@ -74,6 +75,7 @@ protected:
74 class SrsStreamCache : public ISrsThreadHandler 75 class SrsStreamCache : public ISrsThreadHandler
75 { 76 {
76 private: 77 private:
  78 + SrsMessageQueue* queue;
77 SrsSource* source; 79 SrsSource* source;
78 SrsThread* pthread; 80 SrsThread* pthread;
79 public: 81 public:
@@ -244,8 +244,9 @@ void SrsFastVector::free() @@ -244,8 +244,9 @@ void SrsFastVector::free()
244 } 244 }
245 #endif 245 #endif
246 246
247 -SrsMessageQueue::SrsMessageQueue() 247 +SrsMessageQueue::SrsMessageQueue(bool ignore_shrink)
248 { 248 {
  249 + _ignore_shrink = ignore_shrink;
249 queue_size_ms = 0; 250 queue_size_ms = 0;
250 av_start_time = av_end_time = -1; 251 av_start_time = av_end_time = -1;
251 } 252 }
@@ -330,6 +331,26 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in @@ -330,6 +331,26 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in
330 return ret; 331 return ret;
331 } 332 }
332 333
  334 +int SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag)
  335 +{
  336 + int ret = ERROR_SUCCESS;
  337 +
  338 + int nb_msgs = (int)msgs.size();
  339 + if (nb_msgs <= 0) {
  340 + return ret;
  341 + }
  342 +
  343 + SrsSharedPtrMessage** omsgs = msgs.data();
  344 + for (int i = 0; i < nb_msgs; i++) {
  345 + SrsSharedPtrMessage* msg = omsgs[i];
  346 + if ((ret = consumer->enqueue(msg, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
  347 + return ret;
  348 + }
  349 + }
  350 +
  351 + return ret;
  352 +}
  353 +
333 void SrsMessageQueue::shrink() 354 void SrsMessageQueue::shrink()
334 { 355 {
335 int iframe_index = -1; 356 int iframe_index = -1;
@@ -364,8 +385,13 @@ void SrsMessageQueue::shrink() @@ -364,8 +385,13 @@ void SrsMessageQueue::shrink()
364 return; 385 return;
365 } 386 }
366 387
367 - srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f",  
368 - (int)msgs.size(), iframe_index, queue_size_ms / 1000.0); 388 + if (_ignore_shrink) {
  389 + srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f",
  390 + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0);
  391 + } else {
  392 + srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f",
  393 + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0);
  394 + }
369 395
370 // remove the first gop from the front 396 // remove the first gop from the front
371 for (int i = 0; i < iframe_index; i++) { 397 for (int i = 0; i < iframe_index; i++) {
@@ -1702,7 +1728,7 @@ void SrsSource::on_unpublish() @@ -1702,7 +1728,7 @@ void SrsSource::on_unpublish()
1702 handler->on_unpublish(this, _req); 1728 handler->on_unpublish(this, _req);
1703 } 1729 }
1704 1730
1705 -int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) 1731 +int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg)
1706 { 1732 {
1707 int ret = ERROR_SUCCESS; 1733 int ret = ERROR_SUCCESS;
1708 1734
@@ -1730,14 +1756,14 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) @@ -1730,14 +1756,14 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache)
1730 SrsRtmpJitterAlgorithm ag = jitter_algorithm; 1756 SrsRtmpJitterAlgorithm ag = jitter_algorithm;
1731 1757
1732 // copy metadata. 1758 // copy metadata.
1733 - if (cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) { 1759 + if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
1734 srs_error("dispatch metadata failed. ret=%d", ret); 1760 srs_error("dispatch metadata failed. ret=%d", ret);
1735 return ret; 1761 return ret;
1736 } 1762 }
1737 srs_info("dispatch metadata success"); 1763 srs_info("dispatch metadata success");
1738 1764
1739 // copy sequence header 1765 // copy sequence header
1740 - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) { 1766 + if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
1741 srs_error("dispatch video sequence header failed. ret=%d", ret); 1767 srs_error("dispatch video sequence header failed. ret=%d", ret);
1742 return ret; 1768 return ret;
1743 } 1769 }
@@ -1750,10 +1776,12 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) @@ -1750,10 +1776,12 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache)
1750 srs_info("dispatch audio sequence header success"); 1776 srs_info("dispatch audio sequence header success");
1751 1777
1752 // copy gop cache to client. 1778 // copy gop cache to client.
1753 - if (dump_gop_cache) {  
1754 - if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {  
1755 - return ret;  
1756 - } 1779 + if (dg && (ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
  1780 + return ret;
  1781 + }
  1782 +
  1783 + // print status.
  1784 + if (dg) {
1757 srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate); 1785 srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
1758 } else { 1786 } else {
1759 srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate); 1787 srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate);
@@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
38 #include <srs_app_reload.hpp> 38 #include <srs_app_reload.hpp>
39 #include <srs_core_performance.hpp> 39 #include <srs_core_performance.hpp>
40 40
  41 +class SrsConsumer;
41 class SrsPlayEdge; 42 class SrsPlayEdge;
42 class SrsPublishEdge; 43 class SrsPublishEdge;
43 class SrsSource; 44 class SrsSource;
@@ -137,6 +138,7 @@ public: @@ -137,6 +138,7 @@ public:
137 class SrsMessageQueue 138 class SrsMessageQueue
138 { 139 {
139 private: 140 private:
  141 + bool _ignore_shrink;
140 int64_t av_start_time; 142 int64_t av_start_time;
141 int64_t av_end_time; 143 int64_t av_end_time;
142 int queue_size_ms; 144 int queue_size_ms;
@@ -146,7 +148,7 @@ private: @@ -146,7 +148,7 @@ private:
146 std::vector<SrsSharedPtrMessage*> msgs; 148 std::vector<SrsSharedPtrMessage*> msgs;
147 #endif 149 #endif
148 public: 150 public:
149 - SrsMessageQueue(); 151 + SrsMessageQueue(bool ignore_shrink = false);
150 virtual ~SrsMessageQueue(); 152 virtual ~SrsMessageQueue();
151 public: 153 public:
152 /** 154 /**
@@ -176,6 +178,11 @@ public: @@ -176,6 +178,11 @@ public:
176 * @max_count the max count to dequeue, must be positive. 178 * @max_count the max count to dequeue, must be positive.
177 */ 179 */
178 virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); 180 virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
  181 + /**
  182 + * dumps packets to consumer, use specified args.
  183 + * @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue().
  184 + */
  185 + virtual int dump_packets(SrsConsumer* consumer, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag);
179 private: 186 private:
180 /** 187 /**
181 * remove a gop from the front. 188 * remove a gop from the front.
@@ -494,7 +501,17 @@ public: @@ -494,7 +501,17 @@ public:
494 virtual void on_unpublish(); 501 virtual void on_unpublish();
495 // consumer methods 502 // consumer methods
496 public: 503 public:
497 - virtual int create_consumer(SrsConsumer*& consumer, bool dump_gop_cache = true); 504 + /**
  505 + * create consumer and dumps packets in cache.
  506 + * @param consumer, output the create consumer.
  507 + * @param ds, whether dumps the sequence header.
  508 + * @param dm, whether dumps the metadata.
  509 + * @param dg, whether dumps the gop cache.
  510 + */
  511 + virtual int create_consumer(
  512 + SrsConsumer*& consumer,
  513 + bool ds = true, bool dm = true, bool dg = true
  514 + );
498 virtual void on_consumer_destroy(SrsConsumer* consumer); 515 virtual void on_consumer_destroy(SrsConsumer* consumer);
499 virtual void set_cache(bool enabled); 516 virtual void set_cache(bool enabled);
500 // internal 517 // internal