winlin

refine the consumer, use srs message queue to shrink message when overflow.

@@ -93,7 +93,7 @@ vhost __defaultVhost__ { @@ -93,7 +93,7 @@ vhost __defaultVhost__ {
93 vhost dev { 93 vhost dev {
94 enabled on; 94 enabled on;
95 gop_cache on; 95 gop_cache on;
96 - queue_length 30; 96 + queue_length 10;
97 #forward 127.0.0.1:19350; 97 #forward 127.0.0.1:19350;
98 hls { 98 hls {
99 enabled off; 99 enabled off;
@@ -1303,7 +1303,7 @@ double SrsConfig::get_queue_length(string vhost) @@ -1303,7 +1303,7 @@ double SrsConfig::get_queue_length(string vhost)
1303 } 1303 }
1304 1304
1305 conf = conf->get("queue_length"); 1305 conf = conf->get("queue_length");
1306 - if (conf || conf->arg0().empty()) { 1306 + if (!conf || conf->arg0().empty()) {
1307 return SRS_CONF_DEFAULT_QUEUE_LENGTH; 1307 return SRS_CONF_DEFAULT_QUEUE_LENGTH;
1308 } 1308 }
1309 1309
@@ -171,6 +171,10 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in @@ -171,6 +171,10 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in
171 } else { 171 } else {
172 count = srs_min(max_count, (int)msgs.size()); 172 count = srs_min(max_count, (int)msgs.size());
173 } 173 }
  174 +
  175 + if (count <= 0) {
  176 + return ret;
  177 + }
174 178
175 pmsgs = new SrsSharedPtrMessage*[count]; 179 pmsgs = new SrsSharedPtrMessage*[count];
176 180
@@ -178,6 +182,9 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in @@ -178,6 +182,9 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in
178 pmsgs[i] = msgs[i]; 182 pmsgs[i] = msgs[i];
179 } 183 }
180 184
  185 + SrsSharedPtrMessage* last = msgs[count - 1];
  186 + av_start_time = last->header.timestamp;
  187 +
181 if (count == (int)msgs.size()) { 188 if (count == (int)msgs.size()) {
182 msgs.clear(); 189 msgs.clear();
183 } else { 190 } else {
@@ -217,15 +224,15 @@ void SrsMessageQueue::shrink() @@ -217,15 +224,15 @@ void SrsMessageQueue::shrink()
217 return; 224 return;
218 } 225 }
219 226
  227 + srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f",
  228 + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0);
  229 +
220 // remove the first gop from the front 230 // remove the first gop from the front
221 for (int i = 0; i < iframe_index; i++) { 231 for (int i = 0; i < iframe_index; i++) {
222 SrsSharedPtrMessage* msg = msgs[i]; 232 SrsSharedPtrMessage* msg = msgs[i];
223 srs_freep(msg); 233 srs_freep(msg);
224 } 234 }
225 msgs.erase(msgs.begin(), msgs.begin() + iframe_index); 235 msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
226 -  
227 - srs_trace("shrink the cache queue, "  
228 - "size=%d, removed=%d", (int)msgs.size(), iframe_index);  
229 } 236 }
230 237
231 void SrsMessageQueue::clear() 238 void SrsMessageQueue::clear()
@@ -893,9 +900,12 @@ void SrsSource::on_unpublish() @@ -893,9 +900,12 @@ void SrsSource::on_unpublish()
893 { 900 {
894 int ret = ERROR_SUCCESS; 901 int ret = ERROR_SUCCESS;
895 902
  903 + double queue_size = config->get_queue_length(req->vhost);
  904 +
896 consumer = new SrsConsumer(this); 905 consumer = new SrsConsumer(this);
897 - consumer->set_queue_size(config->get_queue_length(req->vhost)); 906 +
898 consumers.push_back(consumer); 907 consumers.push_back(consumer);
  908 + consumer->set_queue_size(queue_size);
899 909
900 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { 910 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
901 srs_error("dispatch metadata failed. ret=%d", ret); 911 srs_error("dispatch metadata failed. ret=%d", ret);
@@ -919,6 +929,8 @@ void SrsSource::on_unpublish() @@ -919,6 +929,8 @@ void SrsSource::on_unpublish()
919 return ret; 929 return ret;
920 } 930 }
921 931
  932 + srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
  933 +
922 return ret; 934 return ret;
923 } 935 }
924 936