winlin

Revert "for bug #194, add pipe to consumer."

This reverts commit 1e601a6e.
@@ -66,11 +66,6 @@ int SrsPipe::initialize() @@ -66,11 +66,6 @@ int SrsPipe::initialize()
66 return ret; 66 return ret;
67 } 67 }
68 68
69 -st_netfd_t SrsPipe::rfd()  
70 -{  
71 - return read_stfd;  
72 -}  
73 -  
74 bool SrsPipe::already_written() 69 bool SrsPipe::already_written()
75 { 70 {
76 return _already_written; 71 return _already_written;
@@ -57,10 +57,6 @@ public: @@ -57,10 +57,6 @@ public:
57 * initialize pipes, open fds. 57 * initialize pipes, open fds.
58 */ 58 */
59 virtual int initialize(); 59 virtual int initialize();
60 - /**  
61 - * get the read fd to poll.  
62 - */  
63 - virtual st_netfd_t rfd();  
64 public: 60 public:
65 /** 61 /**
66 * for event based service, whether already writen data. 62 * for event based service, whether already writen data.
@@ -41,7 +41,6 @@ using namespace std; @@ -41,7 +41,6 @@ using namespace std;
41 #include <srs_app_edge.hpp> 41 #include <srs_app_edge.hpp>
42 #include <srs_kernel_utility.hpp> 42 #include <srs_kernel_utility.hpp>
43 #include <srs_app_avc_aac.hpp> 43 #include <srs_app_avc_aac.hpp>
44 -#include <srs_app_pipe.hpp>  
45 44
46 #define CONST_MAX_JITTER_MS 500 45 #define CONST_MAX_JITTER_MS 500
47 #define DEFAULT_FRAME_TIME_MS 40 46 #define DEFAULT_FRAME_TIME_MS 40
@@ -172,11 +171,6 @@ void SrsMessageQueue::set_queue_size(double queue_size) @@ -172,11 +171,6 @@ void SrsMessageQueue::set_queue_size(double queue_size)
172 queue_size_ms = (int)(queue_size * 1000); 171 queue_size_ms = (int)(queue_size * 1000);
173 } 172 }
174 173
175 -bool SrsMessageQueue::empty()  
176 -{  
177 - return msgs.size() == 0;  
178 -}  
179 -  
180 int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) 174 int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
181 { 175 {
182 int ret = ERROR_SUCCESS; 176 int ret = ERROR_SUCCESS;
@@ -296,7 +290,6 @@ SrsConsumer::SrsConsumer(SrsSource* _source) @@ -296,7 +290,6 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
296 jitter = new SrsRtmpJitter(); 290 jitter = new SrsRtmpJitter();
297 queue = new SrsMessageQueue(); 291 queue = new SrsMessageQueue();
298 should_update_source_id = false; 292 should_update_source_id = false;
299 - pipe = new SrsPipe();  
300 } 293 }
301 294
302 SrsConsumer::~SrsConsumer() 295 SrsConsumer::~SrsConsumer()
@@ -306,23 +299,6 @@ SrsConsumer::~SrsConsumer() @@ -306,23 +299,6 @@ SrsConsumer::~SrsConsumer()
306 srs_freep(queue); 299 srs_freep(queue);
307 } 300 }
308 301
309 -int SrsConsumer::initialize()  
310 -{  
311 - int ret = ERROR_SUCCESS;  
312 -  
313 - if ((ret = pipe->initialize()) != ERROR_SUCCESS) {  
314 - srs_error("initialize the pipe for consumer failed. ret=%d", ret);  
315 - return ret;  
316 - }  
317 -  
318 - return ret;  
319 -}  
320 -  
321 -st_netfd_t SrsConsumer::pipe_fd()  
322 -{  
323 - return pipe->rfd();  
324 -}  
325 -  
326 void SrsConsumer::set_queue_size(double queue_size) 302 void SrsConsumer::set_queue_size(double queue_size)
327 { 303 {
328 queue->set_queue_size(queue_size); 304 queue->set_queue_size(queue_size);
@@ -353,18 +329,11 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S @@ -353,18 +329,11 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
353 return ret; 329 return ret;
354 } 330 }
355 331
356 - // notify the rtmp connection to resume to send packet.  
357 - if (!pipe->already_written()) {  
358 - pipe->active();  
359 - }  
360 -  
361 return ret; 332 return ret;
362 } 333 }
363 334
364 int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) 335 int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
365 { 336 {
366 - int ret = ERROR_SUCCESS;  
367 -  
368 srs_assert(max_count > 0); 337 srs_assert(max_count > 0);
369 338
370 if (should_update_source_id) { 339 if (should_update_source_id) {
@@ -377,15 +346,7 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c @@ -377,15 +346,7 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c
377 return ERROR_SUCCESS; 346 return ERROR_SUCCESS;
378 } 347 }
379 348
380 - if ((ret = queue->dump_packets(max_count, pmsgs, count)) != ERROR_SUCCESS) {  
381 - return ret;  
382 - }  
383 -  
384 - if (queue->empty()) {  
385 - return pipe->reset();  
386 - }  
387 -  
388 - return ret; 349 + return queue->dump_packets(max_count, pmsgs, count);
389 } 350 }
390 351
391 int SrsConsumer::on_play_client_pause(bool is_pause) 352 int SrsConsumer::on_play_client_pause(bool is_pause)
@@ -1493,13 +1454,7 @@ void SrsSource::on_unpublish() @@ -1493,13 +1454,7 @@ void SrsSource::on_unpublish()
1493 { 1454 {
1494 int ret = ERROR_SUCCESS; 1455 int ret = ERROR_SUCCESS;
1495 1456
1496 - SrsConsumer* c = new SrsConsumer(this);  
1497 - if ((ret = c->initialize()) != ERROR_SUCCESS) {  
1498 - srs_freep(c);  
1499 - return ret;  
1500 - }  
1501 -  
1502 - consumer = c; 1457 + consumer = new SrsConsumer(this);
1503 consumers.push_back(consumer); 1458 consumers.push_back(consumer);
1504 1459
1505 double queue_size = _srs_config->get_queue_length(_req->vhost); 1460 double queue_size = _srs_config->get_queue_length(_req->vhost);
@@ -58,7 +58,6 @@ class SrsDvr; @@ -58,7 +58,6 @@ class SrsDvr;
58 class SrsEncoder; 58 class SrsEncoder;
59 #endif 59 #endif
60 class SrsStream; 60 class SrsStream;
61 -class SrsPipe;  
62 61
63 /** 62 /**
64 * the time jitter algorithm: 63 * the time jitter algorithm:
@@ -123,10 +122,6 @@ public: @@ -123,10 +122,6 @@ public:
123 virtual void set_queue_size(double queue_size); 122 virtual void set_queue_size(double queue_size);
124 public: 123 public:
125 /** 124 /**
126 - * whether queue is empty.  
127 - */  
128 - virtual bool empty();  
129 - /**  
130 * enqueue the message, the timestamp always monotonically. 125 * enqueue the message, the timestamp always monotonically.
131 * @param msg, the msg to enqueue, user never free it whatever the return code. 126 * @param msg, the msg to enqueue, user never free it whatever the return code.
132 */ 127 */
@@ -153,7 +148,6 @@ private: @@ -153,7 +148,6 @@ private:
153 class SrsConsumer 148 class SrsConsumer
154 { 149 {
155 private: 150 private:
156 - SrsPipe* pipe;  
157 SrsRtmpJitter* jitter; 151 SrsRtmpJitter* jitter;
158 SrsSource* source; 152 SrsSource* source;
159 SrsMessageQueue* queue; 153 SrsMessageQueue* queue;
@@ -165,16 +159,6 @@ public: @@ -165,16 +159,6 @@ public:
165 virtual ~SrsConsumer(); 159 virtual ~SrsConsumer();
166 public: 160 public:
167 /** 161 /**
168 - * initialize the consumer.  
169 - */  
170 - virtual int initialize();  
171 - /**  
172 - * source can use this fd to poll with the read event,  
173 - * for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194  
174 - */  
175 - virtual st_netfd_t pipe_fd();  
176 -public:  
177 - /**  
178 * set the size of queue. 162 * set the size of queue.
179 */ 163 */
180 virtual void set_queue_size(double queue_size); 164 virtual void set_queue_size(double queue_size);