winlin

for bug #241, support merged read. 2.0.48

@@ -26,6 +26,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,6 +26,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #include <srs_protocol_rtmp.hpp> 26 #include <srs_protocol_rtmp.hpp>
27 #include <srs_protocol_stack.hpp> 27 #include <srs_protocol_stack.hpp>
28 #include <srs_app_rtmp_conn.hpp> 28 #include <srs_app_rtmp_conn.hpp>
  29 +#include <srs_protocol_buffer.hpp>
  30 +
  31 +// when we read from socket less than this value,
  32 +// sleep a while to merge read.
  33 +// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  34 +#define SRS_MERGED_READ_SIZE (SOCKET_READ_SIZE / 10)
  35 +// the time to sleep to merge read, to read more bytes.
  36 +#define SRS_MERGED_READ_US (300 * 1000)
29 37
30 ISrsMessageHandler::ISrsMessageHandler() 38 ISrsMessageHandler::ISrsMessageHandler()
31 { 39 {
@@ -271,6 +279,30 @@ void SrsPublishRecvThread::stop() @@ -271,6 +279,30 @@ void SrsPublishRecvThread::stop()
271 trd.stop(); 279 trd.stop();
272 } 280 }
273 281
  282 +void SrsPublishRecvThread::on_thread_start()
  283 +{
  284 + // we donot set the auto response to false,
  285 + // for the main thread never send message.
  286 +
  287 + // enable the merge read
  288 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  289 + rtmp->set_merge_read(true, this);
  290 +}
  291 +
  292 +void SrsPublishRecvThread::on_thread_stop()
  293 +{
  294 + // we donot set the auto response to true,
  295 + // for we donot set to false yet.
  296 +
  297 + // when thread stop, signal the conn thread which wait.
  298 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/244
  299 + st_cond_signal(error);
  300 +
  301 + // disable the merge read
  302 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  303 + rtmp->set_merge_read(false, NULL);
  304 +}
  305 +
274 bool SrsPublishRecvThread::can_handle() 306 bool SrsPublishRecvThread::can_handle()
275 { 307 {
276 // publish thread always can handle message. 308 // publish thread always can handle message.
@@ -302,18 +334,19 @@ void SrsPublishRecvThread::on_recv_error(int ret) @@ -302,18 +334,19 @@ void SrsPublishRecvThread::on_recv_error(int ret)
302 st_cond_signal(error); 334 st_cond_signal(error);
303 } 335 }
304 336
305 -void SrsPublishRecvThread::on_thread_start()  
306 -{  
307 - // we donot set the auto response to false,  
308 - // for the main thread never send message.  
309 -}  
310 -  
311 -void SrsPublishRecvThread::on_thread_stop() 337 +void SrsPublishRecvThread::on_read(ssize_t nread)
312 { 338 {
313 - // we donot set the auto response to true,  
314 - // for we donot set to false yet. 339 + if (nread < 0) {
  340 + return;
  341 + }
315 342
316 - // when thread stop, signal the conn thread which wait.  
317 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/244  
318 - st_cond_signal(error); 343 + /**
  344 + * to improve read performance, merge some packets then read,
  345 + * when it on and read small bytes, we sleep to wait more data.,
  346 + * that is, we merge some data to read together.
  347 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  348 + */
  349 + if (nread < SRS_MERGED_READ_SIZE) {
  350 + st_usleep(SRS_MERGED_READ_US);
  351 + }
319 } 352 }
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 #include <vector> 33 #include <vector>
34 34
35 #include <srs_app_thread.hpp> 35 #include <srs_app_thread.hpp>
  36 +#include <srs_protocol_buffer.hpp>
36 37
37 class SrsRtmpServer; 38 class SrsRtmpServer;
38 class SrsMessage; 39 class SrsMessage;
@@ -132,7 +133,7 @@ public: @@ -132,7 +133,7 @@ public:
132 * the publish recv thread got message and callback the source method to process message. 133 * the publish recv thread got message and callback the source method to process message.
133 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 134 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
134 */ 135 */
135 -class SrsPublishRecvThread : public ISrsMessageHandler 136 +class SrsPublishRecvThread : virtual public ISrsMessageHandler, virtual public IMergeReadHandler
136 { 137 {
137 private: 138 private:
138 SrsRecvThread trd; 139 SrsRecvThread trd;
@@ -163,13 +164,16 @@ public: @@ -163,13 +164,16 @@ public:
163 public: 164 public:
164 virtual int start(); 165 virtual int start();
165 virtual void stop(); 166 virtual void stop();
  167 + virtual void on_thread_start();
  168 + virtual void on_thread_stop();
  169 +// interface ISrsMessageHandler
166 public: 170 public:
167 virtual bool can_handle(); 171 virtual bool can_handle();
168 virtual int handle(SrsMessage* msg); 172 virtual int handle(SrsMessage* msg);
169 virtual void on_recv_error(int ret); 173 virtual void on_recv_error(int ret);
  174 +// interface IMergeReadHandler
170 public: 175 public:
171 - virtual void on_thread_start();  
172 - virtual void on_thread_stop(); 176 + virtual void on_read(ssize_t nread);
173 }; 177 };
174 178
175 #endif 179 #endif
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 47 34 +#define VERSION_REVISION 48
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -26,16 +26,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,16 +26,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #include <srs_kernel_error.hpp> 26 #include <srs_kernel_error.hpp>
27 #include <srs_kernel_log.hpp> 27 #include <srs_kernel_log.hpp>
28 28
29 -// 4KB=4096  
30 -// 8KB=8192  
31 -// 16KB=16384  
32 -// 32KB=32768  
33 -// 64KB=65536  
34 -// @see https://github.com/winlinvip/simple-rtmp-server/issues/241  
35 -#define SOCKET_READ_SIZE 4096 29 +IMergeReadHandler::IMergeReadHandler()
  30 +{
  31 +}
  32 +
  33 +IMergeReadHandler::~IMergeReadHandler()
  34 +{
  35 +}
36 36
37 SrsBuffer::SrsBuffer() 37 SrsBuffer::SrsBuffer()
38 { 38 {
  39 + merged_read = false;
  40 + _handler = NULL;
  41 +
39 buffer = new char[SOCKET_READ_SIZE]; 42 buffer = new char[SOCKET_READ_SIZE];
40 } 43 }
41 44
@@ -93,6 +96,16 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -93,6 +96,16 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
93 return ret; 96 return ret;
94 } 97 }
95 98
  99 + /**
  100 + * to improve read performance, merge some packets then read,
  101 + * when it on and read small bytes, we sleep to wait more data.,
  102 + * that is, we merge some data to read together.
  103 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  104 + */
  105 + if (merged_read && _handler) {
  106 + _handler->on_read(nread);
  107 + }
  108 +
96 srs_assert((int)nread > 0); 109 srs_assert((int)nread > 0);
97 append(buffer, (int)nread); 110 append(buffer, (int)nread);
98 } 111 }
@@ -100,4 +113,10 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -100,4 +113,10 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
100 return ret; 113 return ret;
101 } 114 }
102 115
  116 +void SrsBuffer::set_merge_read(bool v, IMergeReadHandler* handler)
  117 +{
  118 + merged_read = v;
  119 + _handler = handler;
  120 +}
  121 +
103 122
@@ -34,6 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 34
35 #include <srs_protocol_io.hpp> 35 #include <srs_protocol_io.hpp>
36 36
  37 +// 4KB=4096
  38 +// 8KB=8192
  39 +// 16KB=16384
  40 +// 32KB=32768
  41 +// 64KB=65536
  42 +// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  43 +#define SOCKET_READ_SIZE 4096
  44 +
  45 +/**
  46 +* to improve read performance, merge some packets then read,
  47 +* when it on and read small bytes, we sleep to wait more data.,
  48 +* that is, we merge some data to read together.
  49 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  50 +*/
  51 +class IMergeReadHandler
  52 +{
  53 +public:
  54 + IMergeReadHandler();
  55 + virtual ~IMergeReadHandler();
  56 +public:
  57 + /**
  58 + * when read from channel, notice the merge handler to sleep for
  59 + * some small bytes.
  60 + * @remark, it only for server-side, client srs-librtmp just ignore.
  61 + */
  62 + virtual void on_read(ssize_t nread) = 0;
  63 +};
  64 +
37 /** 65 /**
38 * the buffer provices bytes cache for protocol. generally, 66 * the buffer provices bytes cache for protocol. generally,
39 * protocol recv data from socket, put into buffer, decode to RTMP message. 67 * protocol recv data from socket, put into buffer, decode to RTMP message.
@@ -41,6 +69,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -41,6 +69,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
41 class SrsBuffer 69 class SrsBuffer
42 { 70 {
43 private: 71 private:
  72 + // the merged handler
  73 + bool merged_read;
  74 + IMergeReadHandler* _handler;
  75 + // data and socket buffer
44 std::vector<char> data; 76 std::vector<char> data;
45 char* buffer; 77 char* buffer;
46 public: 78 public:
@@ -79,6 +111,16 @@ public: @@ -79,6 +111,16 @@ public:
79 * @remark, we actually maybe read more than required_size, maybe 4k for example. 111 * @remark, we actually maybe read more than required_size, maybe 4k for example.
80 */ 112 */
81 virtual int grow(ISrsBufferReader* reader, int required_size); 113 virtual int grow(ISrsBufferReader* reader, int required_size);
  114 +public:
  115 + /**
  116 + * to improve read performance, merge some packets then read,
  117 + * when it on and read small bytes, we sleep to wait more data.,
  118 + * that is, we merge some data to read together.
  119 + * @param v true to ename merged read.
  120 + * @param handler the handler when merge read is enabled.
  121 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  122 + */
  123 + virtual void set_merge_read(bool v, IMergeReadHandler* handler);
82 }; 124 };
83 125
84 #endif 126 #endif
@@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v) @@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v)
745 protocol->set_auto_response(v); 745 protocol->set_auto_response(v);
746 } 746 }
747 747
  748 +void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler)
  749 +{
  750 + protocol->set_merge_read(v, handler);
  751 +}
  752 +
748 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) 753 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
749 { 754 {
750 protocol->set_recv_timeout(timeout_us); 755 protocol->set_recv_timeout(timeout_us);
@@ -46,6 +46,7 @@ class SrsPlayPacket; @@ -46,6 +46,7 @@ class SrsPlayPacket;
46 class SrsMessage; 46 class SrsMessage;
47 class SrsPacket; 47 class SrsPacket;
48 class SrsAmf0Object; 48 class SrsAmf0Object;
  49 +class IMergeReadHandler;
49 50
50 /** 51 /**
51 * the original request from client. 52 * the original request from client.
@@ -343,6 +344,15 @@ public: @@ -343,6 +344,15 @@ public:
343 */ 344 */
344 virtual void set_auto_response(bool v); 345 virtual void set_auto_response(bool v);
345 /** 346 /**
  347 + * to improve read performance, merge some packets then read,
  348 + * when it on and read small bytes, we sleep to wait more data.,
  349 + * that is, we merge some data to read together.
  350 + * @param v true to ename merged read.
  351 + * @param handler the handler when merge read is enabled.
  352 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  353 + */
  354 + virtual void set_merge_read(bool v, IMergeReadHandler* handler);
  355 + /**
346 * set/get the recv timeout in us. 356 * set/get the recv timeout in us.
347 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. 357 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
348 */ 358 */
@@ -479,6 +479,11 @@ int SrsProtocol::manual_response_flush() @@ -479,6 +479,11 @@ int SrsProtocol::manual_response_flush()
479 return ret; 479 return ret;
480 } 480 }
481 481
  482 +void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler)
  483 +{
  484 + in_buffer->set_merge_read(v, handler);
  485 +}
  486 +
482 void SrsProtocol::set_recv_timeout(int64_t timeout_us) 487 void SrsProtocol::set_recv_timeout(int64_t timeout_us)
483 { 488 {
484 return skt->set_recv_timeout(timeout_us); 489 return skt->set_recv_timeout(timeout_us);
@@ -53,6 +53,7 @@ class SrsMessageHeader; @@ -53,6 +53,7 @@ class SrsMessageHeader;
53 class SrsMessage; 53 class SrsMessage;
54 class SrsChunkStream; 54 class SrsChunkStream;
55 class SrsSharedPtrMessage; 55 class SrsSharedPtrMessage;
  56 +class IMergeReadHandler;
56 57
57 /** 58 /**
58 * 4.1. Message Header 59 * 4.1. Message Header
@@ -271,6 +272,16 @@ public: @@ -271,6 +272,16 @@ public:
271 virtual int manual_response_flush(); 272 virtual int manual_response_flush();
272 public: 273 public:
273 /** 274 /**
  275 + * to improve read performance, merge some packets then read,
  276 + * when it on and read small bytes, we sleep to wait more data.,
  277 + * that is, we merge some data to read together.
  278 + * @param v true to ename merged read.
  279 + * @param handler the handler when merge read is enabled.
  280 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  281 + */
  282 + virtual void set_merge_read(bool v, IMergeReadHandler* handler);
  283 +public:
  284 + /**
274 * set/get the recv timeout in us. 285 * set/get the recv timeout in us.
275 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. 286 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
276 */ 287 */