winlin

support set live queue length

@@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
212 * nginx v1.5.0: 139524 lines <br/> 212 * nginx v1.5.0: 139524 lines <br/>
213 213
214 ### History 214 ### History
  215 +* v0.9, 2013-12-15, drop the old whole gop when live message queue full.
215 * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header. 216 * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header.
216 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. 217 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder.
217 * v0.9, 2013-12-14, refine the thread model for the retry threads. 218 * v0.9, 2013-12-14, refine the thread model for the retry threads.
@@ -21,6 +21,7 @@ max_connections 2000; @@ -21,6 +21,7 @@ max_connections 2000;
21 vhost __defaultVhost__ { 21 vhost __defaultVhost__ {
22 enabled on; 22 enabled on;
23 gop_cache on; 23 gop_cache on;
  24 + queue_length 30;
24 forward 127.0.0.1:19350; 25 forward 127.0.0.1:19350;
25 hls { 26 hls {
26 enabled on; 27 enabled on;
@@ -92,7 +93,8 @@ vhost __defaultVhost__ { @@ -92,7 +93,8 @@ vhost __defaultVhost__ {
92 vhost dev { 93 vhost dev {
93 enabled on; 94 enabled on;
94 gop_cache on; 95 gop_cache on;
95 - forward 127.0.0.1:19350; 96 + queue_length 30;
  97 + #forward 127.0.0.1:19350;
96 hls { 98 hls {
97 enabled off; 99 enabled off;
98 hls_path ./objs/nginx/html; 100 hls_path ./objs/nginx/html;
@@ -685,6 +687,11 @@ vhost min.delay.com { @@ -685,6 +687,11 @@ vhost min.delay.com {
685 # set to on if requires client fast startup. 687 # set to on if requires client fast startup.
686 # default: on 688 # default: on
687 gop_cache off; 689 gop_cache off;
  690 + # the max live queue length in seconds.
  691 + # if the messages in the queue exceed the max length,
  692 + # drop the old whole gop.
  693 + # default: 30
  694 + queue_length 10;
688 } 695 }
689 # the vhost for antisuck. 696 # the vhost for antisuck.
690 vhost refer.anti_suck.com { 697 vhost refer.anti_suck.com {
@@ -548,6 +548,17 @@ int SrsConfig::reload() @@ -548,6 +548,17 @@ int SrsConfig::reload()
548 } 548 }
549 srs_trace("vhost %s reload gop_cache success.", vhost.c_str()); 549 srs_trace("vhost %s reload gop_cache success.", vhost.c_str());
550 } 550 }
  551 + // queue_length
  552 + if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) {
  553 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  554 + ISrsReloadHandler* subscribe = *it;
  555 + if ((ret = subscribe->on_reload_queue_length(vhost)) != ERROR_SUCCESS) {
  556 + srs_error("vhost %s notify subscribes queue_length failed. ret=%d", vhost.c_str(), ret);
  557 + return ret;
  558 + }
  559 + }
  560 + srs_trace("vhost %s reload queue_length success.", vhost.c_str());
  561 + }
551 // forward 562 // forward
552 if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) { 563 if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) {
553 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 564 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@@ -1275,6 +1286,7 @@ bool SrsConfig::get_gop_cache(string vhost) @@ -1275,6 +1286,7 @@ bool SrsConfig::get_gop_cache(string vhost)
1275 return true; 1286 return true;
1276 } 1287 }
1277 1288
  1289 + conf = conf->get("gop_cache");
1278 if (conf && conf->arg0() == "off") { 1290 if (conf && conf->arg0() == "off") {
1279 return false; 1291 return false;
1280 } 1292 }
@@ -1282,6 +1294,22 @@ bool SrsConfig::get_gop_cache(string vhost) @@ -1282,6 +1294,22 @@ bool SrsConfig::get_gop_cache(string vhost)
1282 return true; 1294 return true;
1283 } 1295 }
1284 1296
  1297 +double SrsConfig::get_queue_length(string vhost)
  1298 +{
  1299 + SrsConfDirective* conf = get_vhost(vhost);
  1300 +
  1301 + if (!conf) {
  1302 + return SRS_CONF_DEFAULT_QUEUE_LENGTH;
  1303 + }
  1304 +
  1305 + conf = conf->get("queue_length");
  1306 + if (conf || conf->arg0().empty()) {
  1307 + return SRS_CONF_DEFAULT_QUEUE_LENGTH;
  1308 + }
  1309 +
  1310 + return ::atoi(conf->arg0().c_str());
  1311 +}
  1312 +
1285 SrsConfDirective* SrsConfig::get_forward(string vhost) 1313 SrsConfDirective* SrsConfig::get_forward(string vhost)
1286 { 1314 {
1287 SrsConfDirective* conf = get_vhost(vhost); 1315 SrsConfDirective* conf = get_vhost(vhost);
@@ -48,6 +48,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -48,6 +48,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
48 #define SRS_CONF_DEFAULT_AAC_SYNC 100 48 #define SRS_CONF_DEFAULT_AAC_SYNC 100
49 // in ms, for HLS aac flush the audio 49 // in ms, for HLS aac flush the audio
50 #define SRS_CONF_DEFAULT_AAC_DELAY 300 50 #define SRS_CONF_DEFAULT_AAC_DELAY 300
  51 +// in seconds, the live queue length.
  52 +#define SRS_CONF_DEFAULT_QUEUE_LENGTH 30
  53 +// in seconds, the paused queue length.
  54 +#define SRS_CONF_DEFAULT_PAUSED_LENGTH 10
51 55
52 #define SRS_CONF_DEFAULT_CHUNK_SIZE 4096 56 #define SRS_CONF_DEFAULT_CHUNK_SIZE 4096
53 57
@@ -145,6 +149,7 @@ public: @@ -145,6 +149,7 @@ public:
145 virtual std::string get_log_dir(); 149 virtual std::string get_log_dir();
146 virtual int get_max_connections(); 150 virtual int get_max_connections();
147 virtual bool get_gop_cache(std::string vhost); 151 virtual bool get_gop_cache(std::string vhost);
  152 + virtual double get_queue_length(std::string vhost);
148 virtual SrsConfDirective* get_forward(std::string vhost); 153 virtual SrsConfDirective* get_forward(std::string vhost);
149 private: 154 private:
150 virtual SrsConfDirective* get_hls(std::string vhost); 155 virtual SrsConfDirective* get_hls(std::string vhost);
@@ -51,12 +51,14 @@ SrsForwarder::SrsForwarder(SrsSource* _source) @@ -51,12 +51,14 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
51 stream_id = 0; 51 stream_id = 0;
52 52
53 pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS); 53 pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS);
  54 + queue = new SrsMessageQueue();
54 } 55 }
55 56
56 SrsForwarder::~SrsForwarder() 57 SrsForwarder::~SrsForwarder()
57 { 58 {
58 on_unpublish(); 59 on_unpublish();
59 60
  61 + // TODO: FIXME: remove it.
60 std::vector<SrsSharedPtrMessage*>::iterator it; 62 std::vector<SrsSharedPtrMessage*>::iterator it;
61 for (it = msgs.begin(); it != msgs.end(); ++it) { 63 for (it = msgs.begin(); it != msgs.end(); ++it) {
62 SrsSharedPtrMessage* msg = *it; 64 SrsSharedPtrMessage* msg = *it;
@@ -65,6 +67,12 @@ SrsForwarder::~SrsForwarder() @@ -65,6 +67,12 @@ SrsForwarder::~SrsForwarder()
65 msgs.clear(); 67 msgs.clear();
66 68
67 srs_freep(pthread); 69 srs_freep(pthread);
  70 + srs_freep(queue);
  71 +}
  72 +
  73 +void SrsForwarder::set_queue_size(double queue_size)
  74 +{
  75 + queue->set_queue_size(queue_size);
68 } 76 }
69 77
70 int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) 78 int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
@@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 36
37 class SrsSharedPtrMessage; 37 class SrsSharedPtrMessage;
38 class SrsOnMetaDataPacket; 38 class SrsOnMetaDataPacket;
  39 +class SrsMessageQueue;
39 class SrsRtmpClient; 40 class SrsRtmpClient;
40 class SrsRequest; 41 class SrsRequest;
41 class SrsSource; 42 class SrsSource;
@@ -58,11 +59,14 @@ private: @@ -58,11 +59,14 @@ private:
58 private: 59 private:
59 SrsSource* source; 60 SrsSource* source;
60 SrsRtmpClient* client; 61 SrsRtmpClient* client;
  62 + SrsMessageQueue* queue;
61 std::vector<SrsSharedPtrMessage*> msgs; 63 std::vector<SrsSharedPtrMessage*> msgs;
62 public: 64 public:
63 SrsForwarder(SrsSource* _source); 65 SrsForwarder(SrsSource* _source);
64 virtual ~SrsForwarder(); 66 virtual ~SrsForwarder();
65 public: 67 public:
  68 + virtual void set_queue_size(double queue_size);
  69 +public:
66 virtual int on_publish(SrsRequest* req, std::string forward_server); 70 virtual int on_publish(SrsRequest* req, std::string forward_server);
67 virtual void on_unpublish(); 71 virtual void on_unpublish();
68 virtual int on_meta_data(SrsSharedPtrMessage* metadata); 72 virtual int on_meta_data(SrsSharedPtrMessage* metadata);
@@ -55,6 +55,11 @@ int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/) @@ -55,6 +55,11 @@ int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/)
55 return ERROR_SUCCESS; 55 return ERROR_SUCCESS;
56 } 56 }
57 57
  58 +int ISrsReloadHandler::on_reload_queue_length(string /*vhost*/)
  59 +{
  60 + return ERROR_SUCCESS;
  61 +}
  62 +
58 int ISrsReloadHandler::on_reload_forward(string /*vhost*/) 63 int ISrsReloadHandler::on_reload_forward(string /*vhost*/)
59 { 64 {
60 return ERROR_SUCCESS; 65 return ERROR_SUCCESS;
@@ -44,6 +44,7 @@ public: @@ -44,6 +44,7 @@ public:
44 virtual int on_reload_pithy_print(); 44 virtual int on_reload_pithy_print();
45 virtual int on_reload_vhost_removed(std::string vhost); 45 virtual int on_reload_vhost_removed(std::string vhost);
46 virtual int on_reload_gop_cache(std::string vhost); 46 virtual int on_reload_gop_cache(std::string vhost);
  47 + virtual int on_reload_queue_length(std::string vhost);
47 virtual int on_reload_forward(std::string vhost); 48 virtual int on_reload_forward(std::string vhost);
48 virtual int on_reload_hls(std::string vhost); 49 virtual int on_reload_hls(std::string vhost);
49 virtual int on_reload_transcode(std::string vhost); 50 virtual int on_reload_transcode(std::string vhost);
@@ -39,7 +39,6 @@ using namespace std; @@ -39,7 +39,6 @@ using namespace std;
39 39
40 #define CONST_MAX_JITTER_MS 500 40 #define CONST_MAX_JITTER_MS 500
41 #define DEFAULT_FRAME_TIME_MS 10 41 #define DEFAULT_FRAME_TIME_MS 10
42 -#define PAUSED_SHRINK_SIZE 250  
43 42
44 SrsRtmpJitter::SrsRtmpJitter() 43 SrsRtmpJitter::SrsRtmpJitter()
45 { 44 {
@@ -50,9 +49,21 @@ SrsRtmpJitter::~SrsRtmpJitter() @@ -50,9 +49,21 @@ SrsRtmpJitter::~SrsRtmpJitter()
50 { 49 {
51 } 50 }
52 51
  52 +// TODO: FIXME: remove the 64bits time, change the timestamp in heaer to 64bits.
53 int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time) 53 int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time)
54 { 54 {
55 int ret = ERROR_SUCCESS; 55 int ret = ERROR_SUCCESS;
  56 +
  57 + // set to 0 for metadata.
  58 + if (!msg->header.is_video() && !msg->header.is_audio()) {
  59 + if (corrected_time) {
  60 + *corrected_time = 0;
  61 + }
  62 +
  63 + msg->header.timestamp = 0;
  64 +
  65 + return ret;
  66 + }
56 67
57 int sample_rate = tba; 68 int sample_rate = tba;
58 int frame_rate = tbv; 69 int frame_rate = tbv;
@@ -110,55 +121,50 @@ int SrsRtmpJitter::get_time() @@ -110,55 +121,50 @@ int SrsRtmpJitter::get_time()
110 return (int)last_pkt_correct_time; 121 return (int)last_pkt_correct_time;
111 } 122 }
112 123
113 -SrsConsumer::SrsConsumer(SrsSource* _source) 124 +SrsMessageQueue::SrsMessageQueue()
114 { 125 {
115 - source = _source;  
116 - paused = false;  
117 - jitter = new SrsRtmpJitter(); 126 + queue_size_ms = 0;
  127 + av_start_time = av_end_time = -1;
118 } 128 }
119 129
120 -SrsConsumer::~SrsConsumer() 130 +SrsMessageQueue::~SrsMessageQueue()
121 { 131 {
122 clear(); 132 clear();
123 -  
124 - source->on_consumer_destroy(this);  
125 - srs_freep(jitter);  
126 } 133 }
127 134
128 -int SrsConsumer::get_time() 135 +void SrsMessageQueue::set_queue_size(double queue_size)
129 { 136 {
130 - return jitter->get_time(); 137 + queue_size_ms = (int)(queue_size * 1000);
131 } 138 }
132 139
133 -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) 140 +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
134 { 141 {
135 int ret = ERROR_SUCCESS; 142 int ret = ERROR_SUCCESS;
136 143
137 - if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {  
138 - srs_freep(msg);  
139 - return ret; 144 + if (msg->header.is_video() || msg->header.is_audio()) {
  145 + if (av_start_time == -1) {
  146 + av_start_time = msg->header.timestamp;
  147 + }
  148 +
  149 + av_end_time = msg->header.timestamp;
140 } 150 }
141 151
142 - // TODO: check the queue size and drop packets if overflow.  
143 msgs.push_back(msg); 152 msgs.push_back(msg);
  153 +
  154 + while (av_end_time - av_start_time > queue_size_ms) {
  155 + shrink();
  156 + }
144 157
145 return ret; 158 return ret;
146 } 159 }
147 160
148 -int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) 161 +int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
149 { 162 {
150 int ret = ERROR_SUCCESS; 163 int ret = ERROR_SUCCESS;
151 164
152 if (msgs.empty()) { 165 if (msgs.empty()) {
153 return ret; 166 return ret;
154 } 167 }
155 -  
156 - if (paused) {  
157 - if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) {  
158 - shrink();  
159 - }  
160 - return ret;  
161 - }  
162 168
163 if (max_count == 0) { 169 if (max_count == 0) {
164 count = (int)msgs.size(); 170 count = (int)msgs.size();
@@ -181,76 +187,120 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c @@ -181,76 +187,120 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
181 return ret; 187 return ret;
182 } 188 }
183 189
184 -int SrsConsumer::on_play_client_pause(bool is_pause)  
185 -{  
186 - int ret = ERROR_SUCCESS;  
187 -  
188 - srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);  
189 - paused = is_pause;  
190 -  
191 - return ret;  
192 -}  
193 -  
194 -void SrsConsumer::shrink() 190 +void SrsMessageQueue::shrink()
195 { 191 {
196 - int i = 0;  
197 - std::vector<SrsSharedPtrMessage*>::iterator it; 192 + int iframe_index = -1;
198 193
199 - // issue the last video iframe.  
200 - bool has_video = false;  
201 - int frame_to_remove = 0;  
202 - std::vector<SrsSharedPtrMessage*>::iterator iframe = msgs.end();  
203 - for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) {  
204 - SrsSharedPtrMessage* msg = *it; 194 + // issue the first iframe.
  195 + // skip the first frame, whatever the type of it,
  196 + // for when we shrinked, the first is the iframe,
  197 + // we will directly remove the gop next time.
  198 + for (int i = 1; i < (int)msgs.size(); i++) {
  199 + SrsSharedPtrMessage* msg = msgs[i];
  200 +
205 if (msg->header.is_video()) { 201 if (msg->header.is_video()) {
206 - has_video = true;  
207 if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) { 202 if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
208 - iframe = it;  
209 - frame_to_remove = i + 1; 203 + // the max frame index to remove.
  204 + iframe_index = i;
  205 +
  206 + // set the start time, we will remove until this frame.
  207 + av_start_time = msg->header.timestamp;
  208 +
  209 + break;
210 } 210 }
211 } 211 }
212 } 212 }
213 213
214 - // last iframe is the first elem, ignore it.  
215 - if (iframe == msgs.begin()) {  
216 - return;  
217 - }  
218 -  
219 - // recalc the frame to remove  
220 - if (iframe == msgs.end()) {  
221 - frame_to_remove = 0;  
222 - }  
223 - if (!has_video) {  
224 - frame_to_remove = (int)msgs.size();  
225 - }  
226 -  
227 - srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d",  
228 - has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove);  
229 -  
230 - // if no video, remove all audio.  
231 - if (!has_video) { 214 + // no iframe, clear the queue.
  215 + if (iframe_index < 0) {
232 clear(); 216 clear();
233 return; 217 return;
234 } 218 }
235 219
236 - // if exists video Iframe, remove the frames before it.  
237 - if (iframe != msgs.end()) {  
238 - for (it = msgs.begin(); it != iframe; ++it) {  
239 - SrsSharedPtrMessage* msg = *it;  
240 - srs_freep(msg);  
241 - }  
242 - msgs.erase(msgs.begin(), iframe); 220 + // remove the first gop from the front
  221 + for (int i = 0; i < iframe_index; i++) {
  222 + SrsSharedPtrMessage* msg = msgs[i];
  223 + srs_freep(msg);
243 } 224 }
  225 + msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
  226 +
  227 + srs_trace("shrink the cache queue, "
  228 + "size=%d, removed=%d", (int)msgs.size(), iframe_index);
244 } 229 }
245 230
246 -void SrsConsumer::clear() 231 +void SrsMessageQueue::clear()
247 { 232 {
248 std::vector<SrsSharedPtrMessage*>::iterator it; 233 std::vector<SrsSharedPtrMessage*>::iterator it;
  234 +
249 for (it = msgs.begin(); it != msgs.end(); ++it) { 235 for (it = msgs.begin(); it != msgs.end(); ++it) {
250 SrsSharedPtrMessage* msg = *it; 236 SrsSharedPtrMessage* msg = *it;
251 srs_freep(msg); 237 srs_freep(msg);
252 } 238 }
  239 +
253 msgs.clear(); 240 msgs.clear();
  241 +
  242 + av_start_time = av_end_time = -1;
  243 +}
  244 +
  245 +SrsConsumer::SrsConsumer(SrsSource* _source)
  246 +{
  247 + source = _source;
  248 + paused = false;
  249 + jitter = new SrsRtmpJitter();
  250 + queue = new SrsMessageQueue();
  251 +}
  252 +
  253 +SrsConsumer::~SrsConsumer()
  254 +{
  255 + source->on_consumer_destroy(this);
  256 + srs_freep(jitter);
  257 + srs_freep(queue);
  258 +}
  259 +
  260 +void SrsConsumer::set_queue_size(double queue_size)
  261 +{
  262 + queue->set_queue_size(queue_size);
  263 +}
  264 +
  265 +int SrsConsumer::get_time()
  266 +{
  267 + return jitter->get_time();
  268 +}
  269 +
  270 +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
  271 +{
  272 + int ret = ERROR_SUCCESS;
  273 +
  274 + if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
  275 + srs_freep(msg);
  276 + return ret;
  277 + }
  278 +
  279 + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
  280 + return ret;
  281 + }
  282 +
  283 + return ret;
  284 +}
  285 +
  286 +int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
  287 +{
  288 + // paused, return nothing.
  289 + if (paused) {
  290 + return ERROR_SUCCESS;
  291 + }
  292 +
  293 + return queue->get_packets(max_count, pmsgs, count);
  294 +}
  295 +
  296 +int SrsConsumer::on_play_client_pause(bool is_pause)
  297 +{
  298 + int ret = ERROR_SUCCESS;
  299 +
  300 + srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);
  301 + paused = is_pause;
  302 +
  303 + return ret;
254 } 304 }
255 305
256 SrsGopCache::SrsGopCache() 306 SrsGopCache::SrsGopCache()
@@ -436,6 +486,41 @@ int SrsSource::on_reload_gop_cache(string vhost) @@ -436,6 +486,41 @@ int SrsSource::on_reload_gop_cache(string vhost)
436 return ret; 486 return ret;
437 } 487 }
438 488
  489 +int SrsSource::on_reload_queue_length(string vhost)
  490 +{
  491 + int ret = ERROR_SUCCESS;
  492 +
  493 + if (req->vhost != vhost) {
  494 + return ret;
  495 + }
  496 +
  497 + double queue_size = config->get_queue_length(req->vhost);
  498 +
  499 + if (true) {
  500 + std::vector<SrsConsumer*>::iterator it;
  501 +
  502 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  503 + SrsConsumer* consumer = *it;
  504 + consumer->set_queue_size(queue_size);
  505 + }
  506 +
  507 + srs_trace("consumers reload queue size success.");
  508 + }
  509 +
  510 + if (true) {
  511 + std::vector<SrsForwarder*>::iterator it;
  512 +
  513 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  514 + SrsForwarder* forwarder = *it;
  515 + forwarder->set_queue_size(queue_size);
  516 + }
  517 +
  518 + srs_trace("forwarders reload queue size success.");
  519 + }
  520 +
  521 + return ret;
  522 +}
  523 +
439 int SrsSource::on_reload_forward(string vhost) 524 int SrsSource::on_reload_forward(string vhost)
440 { 525 {
441 int ret = ERROR_SUCCESS; 526 int ret = ERROR_SUCCESS;
@@ -735,7 +820,7 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -735,7 +820,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
735 820
736 // cache the last gop packets 821 // cache the last gop packets
737 if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { 822 if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
738 - srs_error("shrink gop cache failed. ret=%d", ret); 823 + srs_error("gop cache msg failed. ret=%d", ret);
739 return ret; 824 return ret;
740 } 825 }
741 srs_verbose("cache gop success."); 826 srs_verbose("cache gop success.");
@@ -809,6 +894,7 @@ void SrsSource::on_unpublish() @@ -809,6 +894,7 @@ void SrsSource::on_unpublish()
809 int ret = ERROR_SUCCESS; 894 int ret = ERROR_SUCCESS;
810 895
811 consumer = new SrsConsumer(this); 896 consumer = new SrsConsumer(this);
  897 + consumer->set_queue_size(config->get_queue_length(req->vhost));
812 consumers.push_back(consumer); 898 consumers.push_back(consumer);
813 899
814 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { 900 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
@@ -75,6 +75,45 @@ public: @@ -75,6 +75,45 @@ public:
75 }; 75 };
76 76
77 /** 77 /**
  78 +* the message queue for the consumer(client), forwarder.
  79 +* we limit the size in seconds, drop old messages(the whole gop) if full.
  80 +*/
  81 +class SrsMessageQueue
  82 +{
  83 +private:
  84 + int64_t av_start_time;
  85 + int64_t av_end_time;
  86 + int queue_size_ms;
  87 + std::vector<SrsSharedPtrMessage*> msgs;
  88 +public:
  89 + SrsMessageQueue();
  90 + virtual ~SrsMessageQueue();
  91 +public:
  92 + /**
  93 + * set the queue size
  94 + * @param queue_size the queue size in seconds.
  95 + */
  96 + virtual void set_queue_size(double queue_size);
  97 +public:
  98 + /**
  99 + * enqueue the message, the timestamp always monotonically.
  100 + * @param msg, the msg to enqueue, user never free it whatever the return code.
  101 + */
  102 + virtual int enqueue(SrsSharedPtrMessage* msg);
  103 + /**
  104 + * get messages from the queue.
  105 + */
  106 + virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
  107 +private:
  108 + /**
  109 + * remove a gop from the front.
  110 + * if no iframe found, clear it.
  111 + */
  112 + virtual void shrink();
  113 + virtual void clear();
  114 +};
  115 +
  116 +/**
78 * the consumer for SrsSource, that is a play client. 117 * the consumer for SrsSource, that is a play client.
79 */ 118 */
80 class SrsConsumer 119 class SrsConsumer
@@ -82,12 +121,15 @@ class SrsConsumer @@ -82,12 +121,15 @@ class SrsConsumer
82 private: 121 private:
83 SrsRtmpJitter* jitter; 122 SrsRtmpJitter* jitter;
84 SrsSource* source; 123 SrsSource* source;
  124 + SrsMessageQueue* queue;
85 std::vector<SrsSharedPtrMessage*> msgs; 125 std::vector<SrsSharedPtrMessage*> msgs;
86 bool paused; 126 bool paused;
87 public: 127 public:
88 SrsConsumer(SrsSource* _source); 128 SrsConsumer(SrsSource* _source);
89 virtual ~SrsConsumer(); 129 virtual ~SrsConsumer();
90 public: 130 public:
  131 + virtual void set_queue_size(double queue_size);
  132 +public:
91 /** 133 /**
92 * get current client time, the last packet time. 134 * get current client time, the last packet time.
93 */ 135 */
@@ -111,13 +153,6 @@ public: @@ -111,13 +153,6 @@ public:
111 * when client send the pause message. 153 * when client send the pause message.
112 */ 154 */
113 virtual int on_play_client_pause(bool is_pause); 155 virtual int on_play_client_pause(bool is_pause);
114 -private:  
115 - /**  
116 - * when paused, shrink the cache queue,  
117 - * remove to cache only one gop.  
118 - */  
119 - virtual void shrink();  
120 - virtual void clear();  
121 }; 156 };
122 157
123 /** 158 /**
@@ -218,6 +253,7 @@ public: @@ -218,6 +253,7 @@ public:
218 // interface ISrsReloadHandler 253 // interface ISrsReloadHandler
219 public: 254 public:
220 virtual int on_reload_gop_cache(std::string vhost); 255 virtual int on_reload_gop_cache(std::string vhost);
  256 + virtual int on_reload_queue_length(std::string vhost);
221 virtual int on_reload_forward(std::string vhost); 257 virtual int on_reload_forward(std::string vhost);
222 virtual int on_reload_hls(std::string vhost); 258 virtual int on_reload_hls(std::string vhost);
223 virtual int on_reload_transcode(std::string vhost); 259 virtual int on_reload_transcode(std::string vhost);