winlin

support set the live queue length(in seconds), drop when full.

@@ -212,7 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -212,7 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
212 * nginx v1.5.0: 139524 lines <br/> 212 * nginx v1.5.0: 139524 lines <br/>
213 213
214 ### History 214 ### History
215 -* v0.9, 2013-12-15, drop the old whole gop when live message queue full. 215 +* v0.9, 2013-12-15, support set the live queue length(in seconds), drop when full.
216 * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header. 216 * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header.
217 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. 217 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder.
218 * v0.9, 2013-12-14, refine the thread model for the retry threads. 218 * v0.9, 2013-12-14, refine the thread model for the retry threads.
@@ -94,7 +94,7 @@ vhost dev { @@ -94,7 +94,7 @@ vhost dev {
94 enabled on; 94 enabled on;
95 gop_cache on; 95 gop_cache on;
96 queue_length 10; 96 queue_length 10;
97 - #forward 127.0.0.1:19350; 97 + forward 127.0.0.1:19350;
98 hls { 98 hls {
99 enabled off; 99 enabled off;
100 hls_path ./objs/nginx/html; 100 hls_path ./objs/nginx/html;
@@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 #include <srs_core_rtmp.hpp> 36 #include <srs_core_rtmp.hpp>
37 #include <srs_core_config.hpp> 37 #include <srs_core_config.hpp>
38 #include <srs_core_source.hpp> 38 #include <srs_core_source.hpp>
  39 +#include <srs_core_autofree.hpp>
39 40
40 #define SRS_PULSE_TIMEOUT_MS 100 41 #define SRS_PULSE_TIMEOUT_MS 100
41 #define SRS_FORWARDER_SLEEP_MS 2000 42 #define SRS_FORWARDER_SLEEP_MS 2000
@@ -138,7 +139,7 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) @@ -138,7 +139,7 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
138 int ret = ERROR_SUCCESS; 139 int ret = ERROR_SUCCESS;
139 140
140 if ((ret = jitter->correct(metadata, 0, 0)) != ERROR_SUCCESS) { 141 if ((ret = jitter->correct(metadata, 0, 0)) != ERROR_SUCCESS) {
141 - srs_freep(msg); 142 + srs_freep(metadata);
142 return ret; 143 return ret;
143 } 144 }
144 145
@@ -308,7 +309,7 @@ int SrsForwarder::forward() @@ -308,7 +309,7 @@ int SrsForwarder::forward()
308 // forward all messages. 309 // forward all messages.
309 int count = 0; 310 int count = 0;
310 SrsSharedPtrMessage** msgs = NULL; 311 SrsSharedPtrMessage** msgs = NULL;
311 - if ((ret = queue->get_packets(0, &msgs, count)) != ERROR_SUCCESS) { 312 + if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
312 srs_error("get message to forward failed. ret=%d", ret); 313 srs_error("get message to forward failed. ret=%d", ret);
313 return ret; 314 return ret;
314 } 315 }
@@ -900,11 +900,10 @@ void SrsSource::on_unpublish() @@ -900,11 +900,10 @@ void SrsSource::on_unpublish()
900 { 900 {
901 int ret = ERROR_SUCCESS; 901 int ret = ERROR_SUCCESS;
902 902
903 - double queue_size = config->get_queue_length(req->vhost);  
904 -  
905 consumer = new SrsConsumer(this); 903 consumer = new SrsConsumer(this);
906 -  
907 consumers.push_back(consumer); 904 consumers.push_back(consumer);
  905 +
  906 + double queue_size = config->get_queue_length(req->vhost);
908 consumer->set_queue_size(queue_size); 907 consumer->set_queue_size(queue_size);
909 908
910 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { 909 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
@@ -959,6 +958,9 @@ int SrsSource::create_forwarders() @@ -959,6 +958,9 @@ int SrsSource::create_forwarders()
959 958
960 SrsForwarder* forwarder = new SrsForwarder(this); 959 SrsForwarder* forwarder = new SrsForwarder(this);
961 forwarders.push_back(forwarder); 960 forwarders.push_back(forwarder);
  961 +
  962 + double queue_size = config->get_queue_length(req->vhost);
  963 + forwarder->set_queue_size(queue_size);
962 964
963 if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { 965 if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {
964 srs_error("start forwarder failed. " 966 srs_error("start forwarder failed. "