winlin

for bug #241, merge big chunks for publish, no use.

@@ -288,10 +288,16 @@ void SrsPublishRecvThread::on_thread_start() @@ -288,10 +288,16 @@ void SrsPublishRecvThread::on_thread_start()
288 { 288 {
289 // we donot set the auto response to false, 289 // we donot set the auto response to false,
290 // for the main thread never send message. 290 // for the main thread never send message.
  291 +
  292 + // notice the protocol stack to merge chunks to big buffer.
  293 + // for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
  294 + // so we can use read_fullly(64KB) to merge all chunks in 1s.
  295 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  296 + rtmp->set_merge_chunks(true);
291 } 297 }
292 298
293 void SrsPublishRecvThread::on_thread_stop() 299 void SrsPublishRecvThread::on_thread_stop()
294 { 300 {
295 - // we donot set the auto response to true,  
296 - // for we donot set to false yet. 301 + // revert state
  302 + rtmp->set_merge_chunks(false);
297 } 303 }
@@ -26,7 +26,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,7 +26,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #include <srs_kernel_error.hpp> 26 #include <srs_kernel_error.hpp>
27 #include <srs_kernel_log.hpp> 27 #include <srs_kernel_log.hpp>
28 28
29 -#define SOCKET_READ_SIZE 4096 29 +// 4096=4KB
  30 +// 16384=16KB
  31 +// 65536=64KB
  32 +#define SOCKET_READ_SIZE 16384
30 33
31 ISrsBufferReader::ISrsBufferReader() 34 ISrsBufferReader::ISrsBufferReader()
32 { 35 {
@@ -38,10 +41,13 @@ ISrsBufferReader::~ISrsBufferReader() @@ -38,10 +41,13 @@ ISrsBufferReader::~ISrsBufferReader()
38 41
39 SrsBuffer::SrsBuffer() 42 SrsBuffer::SrsBuffer()
40 { 43 {
  44 + merge_chunks_in_big_buffer = false;
  45 + buffer = new char[SOCKET_READ_SIZE];
41 } 46 }
42 47
43 SrsBuffer::~SrsBuffer() 48 SrsBuffer::~SrsBuffer()
44 { 49 {
  50 + srs_freep(buffer);
45 } 51 }
46 52
47 int SrsBuffer::length() 53 int SrsBuffer::length()
@@ -88,11 +94,15 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -88,11 +94,15 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
88 } 94 }
89 95
90 while (length() < required_size) { 96 while (length() < required_size) {
91 - char buffer[SOCKET_READ_SIZE];  
92 -  
93 ssize_t nread; 97 ssize_t nread;
94 - if ((ret = reader->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {  
95 - return ret; 98 + if (merge_chunks_in_big_buffer) {
  99 + if ((ret = reader->read_fully(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
  100 + return ret;
  101 + }
  102 + } else {
  103 + if ((ret = reader->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
  104 + return ret;
  105 + }
96 } 106 }
97 107
98 srs_assert((int)nread > 0); 108 srs_assert((int)nread > 0);
@@ -102,4 +112,9 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -102,4 +112,9 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
102 return ret; 112 return ret;
103 } 113 }
104 114
  115 +void SrsBuffer::set_merge_chunks(bool v)
  116 +{
  117 + merge_chunks_in_big_buffer = v;
  118 +}
  119 +
105 120
@@ -42,7 +42,16 @@ public: @@ -42,7 +42,16 @@ public:
42 virtual ~ISrsBufferReader(); 42 virtual ~ISrsBufferReader();
43 // for protocol/amf0/msg-codec 43 // for protocol/amf0/msg-codec
44 public: 44 public:
  45 + /**
  46 + * read some bytes of data.
  47 + * @param nread, the actually read size, NULL to ignore.
  48 + */
45 virtual int read(void* buf, size_t size, ssize_t* nread) = 0; 49 virtual int read(void* buf, size_t size, ssize_t* nread) = 0;
  50 + /**
  51 + * read specified size bytes of data
  52 + * @param nread, the actually read size, NULL to ignore.
  53 + */
  54 + virtual int read_fully(void* buf, size_t size, ssize_t* nread) = 0;
46 }; 55 };
47 56
48 /** 57 /**
@@ -53,6 +62,15 @@ class SrsBuffer @@ -53,6 +62,15 @@ class SrsBuffer
53 { 62 {
54 private: 63 private:
55 std::vector<char> data; 64 std::vector<char> data;
  65 + /**
  66 + * notice the protocol stack to merge chunks to big buffer.
  67 + * for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
  68 + * so we can use read_fullly(64KB) to merge all chunks in 1s.
  69 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  70 + */
  71 + bool merge_chunks_in_big_buffer;
  72 + // the socket recv buffer.
  73 + char* buffer;
56 public: 74 public:
57 SrsBuffer(); 75 SrsBuffer();
58 virtual ~SrsBuffer(); 76 virtual ~SrsBuffer();
@@ -89,6 +107,14 @@ public: @@ -89,6 +107,14 @@ public:
89 * @remark, we actually maybe read more than required_size, maybe 4k for example. 107 * @remark, we actually maybe read more than required_size, maybe 4k for example.
90 */ 108 */
91 virtual int grow(ISrsBufferReader* reader, int required_size); 109 virtual int grow(ISrsBufferReader* reader, int required_size);
  110 +public:
  111 + /**
  112 + * notice the protocol stack to merge chunks to big buffer.
  113 + * for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
  114 + * so we can use read_fullly(64KB) to merge all chunks in 1s.
  115 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  116 + */
  117 + virtual void set_merge_chunks(bool v);
92 }; 118 };
93 119
94 #endif 120 #endif
@@ -43,17 +43,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -43,17 +43,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
43 | IBufferReader | | IStatistic | | IBufferWriter | 43 | IBufferReader | | IStatistic | | IBufferWriter |
44 +---------------+ +--------------------+ +---------------+ 44 +---------------+ +--------------------+ +---------------+
45 | + read() | | + get_recv_bytes() | | + write() | 45 | + read() | | + get_recv_bytes() | | + write() |
46 -+------+--------+ | + get_recv_bytes() | | + writev() |  
47 - / \ +---+--------------+-+ +-------+-------+  
48 - | / \ / \ / \ 46 +| + readfully() | | + get_recv_bytes() | | + writev() |
  47 ++------+--------+ +---+--------------+-+ +-------+-------+
  48 + / \ / \ / \ / \
49 | | | | 49 | | | |
50 +------+------------------+-+ +-----+----------------+--+ 50 +------+------------------+-+ +-----+----------------+--+
51 | IProtocolReader | | IProtocolWriter | 51 | IProtocolReader | | IProtocolWriter |
52 +---------------------------+ +-------------------------+ 52 +---------------------------+ +-------------------------+
53 -| + readfully() | | + set_send_timeout() |  
54 -| + set_recv_timeout() | +-------+-----------------+  
55 -+------------+--------------+ / \  
56 - / \ | 53 +| + set_recv_timeout() | | + set_send_timeout() |
  54 ++------------+--------------+ +-------+-----------------+
  55 + / \ / \
57 | | 56 | |
58 +--+-----------------------------+-+ 57 +--+-----------------------------+-+
59 | IProtocolReaderWriter | 58 | IProtocolReaderWriter |
@@ -123,13 +122,6 @@ public: @@ -123,13 +122,6 @@ public:
123 * get the recv timeout in us. 122 * get the recv timeout in us.
124 */ 123 */
125 virtual int64_t get_recv_timeout() = 0; 124 virtual int64_t get_recv_timeout() = 0;
126 -// for handshake.  
127 -public:  
128 - /**  
129 - * read specified size bytes of data  
130 - * @param nread, the actually read size, NULL to ignore.  
131 - */  
132 - virtual int read_fully(void* buf, size_t size, ssize_t* nread) = 0;  
133 }; 125 };
134 126
135 /** 127 /**
@@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v) @@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v)
745 protocol->set_auto_response(v); 745 protocol->set_auto_response(v);
746 } 746 }
747 747
  748 +void SrsRtmpServer::set_merge_chunks(bool v)
  749 +{
  750 + protocol->set_merge_chunks(v);
  751 +}
  752 +
748 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) 753 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
749 { 754 {
750 protocol->set_recv_timeout(timeout_us); 755 protocol->set_recv_timeout(timeout_us);
@@ -343,6 +343,13 @@ public: @@ -343,6 +343,13 @@ public:
343 */ 343 */
344 virtual void set_auto_response(bool v); 344 virtual void set_auto_response(bool v);
345 /** 345 /**
  346 + * notice the protocol stack to merge chunks to big buffer.
  347 + * for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
  348 + * so we can use read_fullly(64KB) to merge all chunks in 1s.
  349 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  350 + */
  351 + virtual void set_merge_chunks(bool v);
  352 + /**
346 * set/get the recv timeout in us. 353 * set/get the recv timeout in us.
347 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. 354 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
348 */ 355 */
@@ -478,6 +478,11 @@ int SrsProtocol::manual_response_flush() @@ -478,6 +478,11 @@ int SrsProtocol::manual_response_flush()
478 return ret; 478 return ret;
479 } 479 }
480 480
  481 +void SrsProtocol::set_merge_chunks(bool v)
  482 +{
  483 + in_buffer->set_merge_chunks(v);
  484 +}
  485 +
481 void SrsProtocol::set_recv_timeout(int64_t timeout_us) 486 void SrsProtocol::set_recv_timeout(int64_t timeout_us)
482 { 487 {
483 return skt->set_recv_timeout(timeout_us); 488 return skt->set_recv_timeout(timeout_us);
@@ -269,6 +269,13 @@ public: @@ -269,6 +269,13 @@ public:
269 * @see the auto_response_when_recv and manual_response_queue. 269 * @see the auto_response_when_recv and manual_response_queue.
270 */ 270 */
271 virtual int manual_response_flush(); 271 virtual int manual_response_flush();
  272 + /**
  273 + * notice the protocol stack to merge chunks to big buffer.
  274 + * for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
  275 + * so we can use read_fullly(64KB) to merge all chunks in 1s.
  276 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  277 + */
  278 + virtual void set_merge_chunks(bool v);
272 public: 279 public:
273 /** 280 /**
274 * set/get the recv timeout in us. 281 * set/get the recv timeout in us.
@@ -199,6 +199,11 @@ int MockBufferReader::read(void* buf, size_t size, ssize_t* nread) @@ -199,6 +199,11 @@ int MockBufferReader::read(void* buf, size_t size, ssize_t* nread)
199 return ERROR_SUCCESS; 199 return ERROR_SUCCESS;
200 } 200 }
201 201
  202 +int MockBufferReader::read_fully(void* buf, size_t size, ssize_t* nread)
  203 +{
  204 + return read(buf, size, nread);
  205 +}
  206 +
202 #ifdef ENABLE_UTEST_KERNEL 207 #ifdef ENABLE_UTEST_KERNEL
203 208
204 VOID TEST(KernelBufferTest, DefaultObject) 209 VOID TEST(KernelBufferTest, DefaultObject)
@@ -42,6 +42,7 @@ public: @@ -42,6 +42,7 @@ public:
42 virtual ~MockBufferReader(); 42 virtual ~MockBufferReader();
43 public: 43 public:
44 virtual int read(void* buf, size_t size, ssize_t* nread); 44 virtual int read(void* buf, size_t size, ssize_t* nread);
  45 + virtual int read_fully(void* buf, size_t size, ssize_t* nread);
45 }; 46 };
46 47
47 class MockSrsFileWriter : public SrsFileWriter 48 class MockSrsFileWriter : public SrsFileWriter