winlin

Merge branch 'srs.master'

@@ -66,6 +66,11 @@ int SrsPipe::initialize() @@ -66,6 +66,11 @@ int SrsPipe::initialize()
66 return ret; 66 return ret;
67 } 67 }
68 68
  69 +st_netfd_t SrsPipe::rfd()
  70 +{
  71 + return read_stfd;
  72 +}
  73 +
69 bool SrsPipe::already_written() 74 bool SrsPipe::already_written()
70 { 75 {
71 return _already_written; 76 return _already_written;
@@ -57,6 +57,10 @@ public: @@ -57,6 +57,10 @@ public:
57 * initialize pipes, open fds. 57 * initialize pipes, open fds.
58 */ 58 */
59 virtual int initialize(); 59 virtual int initialize();
  60 + /**
  61 + * get the read fd to poll.
  62 + */
  63 + virtual st_netfd_t rfd();
60 public: 64 public:
61 /** 65 /**
62 * for event based service, whether already writen data. 66 * for event based service, whether already writen data.
@@ -41,6 +41,7 @@ using namespace std; @@ -41,6 +41,7 @@ using namespace std;
41 #include <srs_app_edge.hpp> 41 #include <srs_app_edge.hpp>
42 #include <srs_kernel_utility.hpp> 42 #include <srs_kernel_utility.hpp>
43 #include <srs_app_avc_aac.hpp> 43 #include <srs_app_avc_aac.hpp>
  44 +#include <srs_app_pipe.hpp>
44 45
45 #define CONST_MAX_JITTER_MS 500 46 #define CONST_MAX_JITTER_MS 500
46 #define DEFAULT_FRAME_TIME_MS 40 47 #define DEFAULT_FRAME_TIME_MS 40
@@ -171,6 +172,11 @@ void SrsMessageQueue::set_queue_size(double queue_size) @@ -171,6 +172,11 @@ void SrsMessageQueue::set_queue_size(double queue_size)
171 queue_size_ms = (int)(queue_size * 1000); 172 queue_size_ms = (int)(queue_size * 1000);
172 } 173 }
173 174
  175 +bool SrsMessageQueue::empty()
  176 +{
  177 + return msgs.size() == 0;
  178 +}
  179 +
174 int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) 180 int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
175 { 181 {
176 int ret = ERROR_SUCCESS; 182 int ret = ERROR_SUCCESS;
@@ -290,6 +296,7 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -290,6 +296,7 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
290 jitter = new SrsRtmpJitter(); 296 jitter = new SrsRtmpJitter();
291 queue = new SrsMessageQueue(); 297 queue = new SrsMessageQueue();
292 should_update_source_id = false; 298 should_update_source_id = false;
  299 + pipe = new SrsPipe();
293 } 300 }
294 301
295 SrsConsumer::~SrsConsumer() 302 SrsConsumer::~SrsConsumer()
@@ -299,6 +306,23 @@ SrsConsumer::~SrsConsumer() @@ -299,6 +306,23 @@ SrsConsumer::~SrsConsumer()
299 srs_freep(queue); 306 srs_freep(queue);
300 } 307 }
301 308
  309 +int SrsConsumer::initialize()
  310 +{
  311 + int ret = ERROR_SUCCESS;
  312 +
  313 + if ((ret = pipe->initialize()) != ERROR_SUCCESS) {
  314 + srs_error("initialize the pipe for consumer failed. ret=%d", ret);
  315 + return ret;
  316 + }
  317 +
  318 + return ret;
  319 +}
  320 +
  321 +st_netfd_t SrsConsumer::pipe_fd()
  322 +{
  323 + return pipe->rfd();
  324 +}
  325 +
302 void SrsConsumer::set_queue_size(double queue_size) 326 void SrsConsumer::set_queue_size(double queue_size)
303 { 327 {
304 queue->set_queue_size(queue_size); 328 queue->set_queue_size(queue_size);
@@ -329,11 +353,18 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S @@ -329,11 +353,18 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
329 return ret; 353 return ret;
330 } 354 }
331 355
  356 + // notify the rtmp connection to resume to send packet.
  357 + if (!pipe->already_written()) {
  358 + pipe->active();
  359 + }
  360 +
332 return ret; 361 return ret;
333 } 362 }
334 363
335 int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) 364 int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
336 { 365 {
  366 + int ret = ERROR_SUCCESS;
  367 +
337 srs_assert(max_count > 0); 368 srs_assert(max_count > 0);
338 369
339 if (should_update_source_id) { 370 if (should_update_source_id) {
@@ -346,7 +377,15 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c @@ -346,7 +377,15 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c
346 return ERROR_SUCCESS; 377 return ERROR_SUCCESS;
347 } 378 }
348 379
349 - return queue->dump_packets(max_count, pmsgs, count); 380 + if ((ret = queue->dump_packets(max_count, pmsgs, count)) != ERROR_SUCCESS) {
  381 + return ret;
  382 + }
  383 +
  384 + if (queue->empty()) {
  385 + return pipe->reset();
  386 + }
  387 +
  388 + return ret;
350 } 389 }
351 390
352 int SrsConsumer::on_play_client_pause(bool is_pause) 391 int SrsConsumer::on_play_client_pause(bool is_pause)
@@ -1454,7 +1493,13 @@ void SrsSource::on_unpublish() @@ -1454,7 +1493,13 @@ void SrsSource::on_unpublish()
1454 { 1493 {
1455 int ret = ERROR_SUCCESS; 1494 int ret = ERROR_SUCCESS;
1456 1495
1457 - consumer = new SrsConsumer(this); 1496 + SrsConsumer* c = new SrsConsumer(this);
  1497 + if ((ret = c->initialize()) != ERROR_SUCCESS) {
  1498 + srs_freep(c);
  1499 + return ret;
  1500 + }
  1501 +
  1502 + consumer = c;
1458 consumers.push_back(consumer); 1503 consumers.push_back(consumer);
1459 1504
1460 double queue_size = _srs_config->get_queue_length(_req->vhost); 1505 double queue_size = _srs_config->get_queue_length(_req->vhost);
@@ -58,6 +58,7 @@ class SrsDvr; @@ -58,6 +58,7 @@ class SrsDvr;
58 class SrsEncoder; 58 class SrsEncoder;
59 #endif 59 #endif
60 class SrsStream; 60 class SrsStream;
  61 +class SrsPipe;
61 62
62 /** 63 /**
63 * the time jitter algorithm: 64 * the time jitter algorithm:
@@ -122,6 +123,10 @@ public: @@ -122,6 +123,10 @@ public:
122 virtual void set_queue_size(double queue_size); 123 virtual void set_queue_size(double queue_size);
123 public: 124 public:
124 /** 125 /**
  126 + * whether queue is empty.
  127 + */
  128 + virtual bool empty();
  129 + /**
125 * enqueue the message, the timestamp always monotonically. 130 * enqueue the message, the timestamp always monotonically.
126 * @param msg, the msg to enqueue, user never free it whatever the return code. 131 * @param msg, the msg to enqueue, user never free it whatever the return code.
127 */ 132 */
@@ -148,6 +153,7 @@ private: @@ -148,6 +153,7 @@ private:
148 class SrsConsumer 153 class SrsConsumer
149 { 154 {
150 private: 155 private:
  156 + SrsPipe* pipe;
151 SrsRtmpJitter* jitter; 157 SrsRtmpJitter* jitter;
152 SrsSource* source; 158 SrsSource* source;
153 SrsMessageQueue* queue; 159 SrsMessageQueue* queue;
@@ -159,6 +165,16 @@ public: @@ -159,6 +165,16 @@ public:
159 virtual ~SrsConsumer(); 165 virtual ~SrsConsumer();
160 public: 166 public:
161 /** 167 /**
  168 + * initialize the consumer.
  169 + */
  170 + virtual int initialize();
  171 + /**
  172 + * source can use this fd to poll with the read event,
  173 + * for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194
  174 + */
  175 + virtual st_netfd_t pipe_fd();
  176 +public:
  177 + /**
162 * set the size of queue. 178 * set the size of queue.
163 */ 179 */
164 virtual void set_queue_size(double queue_size); 180 virtual void set_queue_size(double queue_size);