winlin

for bug #241, calc the small and sleep for merged read.

@@ -27,13 +27,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -27,13 +27,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 #include <srs_protocol_stack.hpp> 27 #include <srs_protocol_stack.hpp>
28 #include <srs_app_rtmp_conn.hpp> 28 #include <srs_app_rtmp_conn.hpp>
29 #include <srs_protocol_buffer.hpp> 29 #include <srs_protocol_buffer.hpp>
  30 +#include <srs_kernel_utility.hpp>
30 31
31 // when we read from socket less than this value, 32 // when we read from socket less than this value,
32 // sleep a while to merge read. 33 // sleep a while to merge read.
33 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 34 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
34 -#define SRS_MERGED_READ_SIZE(buffer) (buffer / 10)  
35 -// the time to sleep to merge read, to read more bytes.  
36 -#define SRS_MERGED_READ_US (300 * 1000) 35 +// use the bitrate in kbps to calc the max sleep time.
  36 +#define SRS_MR_MAX_BITRATE_KBPS 10000
  37 +#define SRS_MR_AVERAGE_BITRATE_KBPS 1000
  38 +#define SRS_MR_MIN_BITRATE_KBPS 64
  39 +// the max sleep time in ms
  40 +#define SRS_MR_MAX_SLEEP_MS 3000
  41 +// the max small bytes to group
  42 +#define SRS_MR_SMALL_BYTES 64
  43 +// the percent of buffer to set as small bytes
  44 +#define SRS_MR_SMALL_PERCENT 100
37 45
38 ISrsMessageHandler::ISrsMessageHandler() 46 ISrsMessageHandler::ISrsMessageHandler()
39 { 47 {
@@ -226,7 +234,7 @@ void SrsQueueRecvThread::on_thread_stop() @@ -226,7 +234,7 @@ void SrsQueueRecvThread::on_thread_stop()
226 } 234 }
227 235
228 SrsPublishRecvThread::SrsPublishRecvThread( 236 SrsPublishRecvThread::SrsPublishRecvThread(
229 - SrsRtmpServer* rtmp_sdk, int timeout_ms, 237 + SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
230 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge 238 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
231 ): trd(this, rtmp_sdk, timeout_ms) 239 ): trd(this, rtmp_sdk, timeout_ms)
232 { 240 {
@@ -239,6 +247,10 @@ SrsPublishRecvThread::SrsPublishRecvThread( @@ -239,6 +247,10 @@ SrsPublishRecvThread::SrsPublishRecvThread(
239 recv_error_code = ERROR_SUCCESS; 247 recv_error_code = ERROR_SUCCESS;
240 _nb_msgs = 0; 248 _nb_msgs = 0;
241 error = st_cond_new(); 249 error = st_cond_new();
  250 +
  251 + mr_fd = fd;
  252 + mr_small_bytes = 0;
  253 + mr_sleep_ms = 0;
242 } 254 }
243 255
244 SrsPublishRecvThread::~SrsPublishRecvThread() 256 SrsPublishRecvThread::~SrsPublishRecvThread()
@@ -284,9 +296,17 @@ void SrsPublishRecvThread::on_thread_start() @@ -284,9 +296,17 @@ void SrsPublishRecvThread::on_thread_start()
284 // we donot set the auto response to false, 296 // we donot set the auto response to false,
285 // for the main thread never send message. 297 // for the main thread never send message.
286 298
  299 + // 128KB recv buffer.
  300 + int nb_rbuf = 128 * 1024;
  301 + socklen_t sock_buf_size = sizeof(int);
  302 + if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
  303 + srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
  304 + }
  305 + getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
  306 +
287 // enable the merge read 307 // enable the merge read
288 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 308 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
289 - rtmp->set_merge_read(true, this); 309 + rtmp->set_merge_read(true, nb_rbuf, this);
290 } 310 }
291 311
292 void SrsPublishRecvThread::on_thread_stop() 312 void SrsPublishRecvThread::on_thread_stop()
@@ -300,7 +320,7 @@ void SrsPublishRecvThread::on_thread_stop() @@ -300,7 +320,7 @@ void SrsPublishRecvThread::on_thread_stop()
300 320
301 // disable the merge read 321 // disable the merge read
302 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 322 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
303 - rtmp->set_merge_read(false, NULL); 323 + rtmp->set_merge_read(false, 0, NULL);
304 } 324 }
305 325
306 bool SrsPublishRecvThread::can_handle() 326 bool SrsPublishRecvThread::can_handle()
@@ -334,9 +354,9 @@ void SrsPublishRecvThread::on_recv_error(int ret) @@ -334,9 +354,9 @@ void SrsPublishRecvThread::on_recv_error(int ret)
334 st_cond_signal(error); 354 st_cond_signal(error);
335 } 355 }
336 356
337 -void SrsPublishRecvThread::on_read(int nb_buffer, ssize_t nread) 357 +void SrsPublishRecvThread::on_read(ssize_t nread)
338 { 358 {
339 - if (nread < 0) { 359 + if (nread < 0 || mr_sleep_ms <= 0) {
340 return; 360 return;
341 } 361 }
342 362
@@ -346,7 +366,31 @@ void SrsPublishRecvThread::on_read(int nb_buffer, ssize_t nread) @@ -346,7 +366,31 @@ void SrsPublishRecvThread::on_read(int nb_buffer, ssize_t nread)
346 * that is, we merge some data to read together. 366 * that is, we merge some data to read together.
347 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 367 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
348 */ 368 */
349 - if (nread < SRS_MERGED_READ_SIZE(nb_buffer)) {  
350 - st_usleep(SRS_MERGED_READ_US); 369 + if (nread < mr_small_bytes) {
  370 + st_usleep(mr_sleep_ms * 1000);
351 } 371 }
352 } 372 }
  373 +
  374 +void SrsPublishRecvThread::on_buffer_change(int nb_buffer)
  375 +{
  376 + // set percent.
  377 + mr_small_bytes = (int)(nb_buffer / SRS_MR_SMALL_PERCENT);
  378 + // select the smaller
  379 + mr_small_bytes = srs_min(mr_small_bytes, SRS_MR_SMALL_BYTES);
  380 +
  381 + // the recv sleep is [buffer / max_kbps, buffer / min_kbps]
  382 + // for example, buffer is 256KB, max kbps is 10Mbps, min kbps is 10Kbps,
  383 + // the buffer is 256KB*8=2048Kb, which can provides sleep time in
  384 + // min: 2038Kb/10Mbps=2038Kb/10Kbpms=203.8ms
  385 + // max: 2038Kb/10Kbps=203.8s
  386 + // sleep = Xb * 8 / (N * 1000 b / 1000 ms) = (X * 8 / N) ms
  387 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  388 + int min_sleep = (int)(nb_buffer * 8.0 / SRS_MR_MAX_BITRATE_KBPS);
  389 + int average_sleep = (int)(nb_buffer * 8.0 / SRS_MR_AVERAGE_BITRATE_KBPS);
  390 + int max_sleep = (int)(nb_buffer * 8.0 / SRS_MR_MIN_BITRATE_KBPS);
  391 + // 80% min, 16% average, 4% max.
  392 + mr_sleep_ms = (int)(min_sleep * 0.8 + average_sleep * 0.16 + max_sleep * 0.04);
  393 + mr_sleep_ms = srs_min(mr_sleep_ms, SRS_MR_MAX_SLEEP_MS);
  394 +
  395 + srs_trace("merged read, buffer=%d, small=%d, sleep=%d", nb_buffer, mr_small_bytes, mr_sleep_ms);
  396 +}
@@ -140,6 +140,11 @@ private: @@ -140,6 +140,11 @@ private:
140 SrsRtmpServer* rtmp; 140 SrsRtmpServer* rtmp;
141 // the msgs already got. 141 // the msgs already got.
142 int64_t _nb_msgs; 142 int64_t _nb_msgs;
  143 + // for mr(merged read),
  144 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  145 + int mr_fd;
  146 + int mr_small_bytes;
  147 + int mr_sleep_ms;
143 // the recv thread error code. 148 // the recv thread error code.
144 int recv_error_code; 149 int recv_error_code;
145 SrsRtmpConn* _conn; 150 SrsRtmpConn* _conn;
@@ -151,7 +156,7 @@ private: @@ -151,7 +156,7 @@ private:
151 // @see https://github.com/winlinvip/simple-rtmp-server/issues/244 156 // @see https://github.com/winlinvip/simple-rtmp-server/issues/244
152 st_cond_t error; 157 st_cond_t error;
153 public: 158 public:
154 - SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms, 159 + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
155 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); 160 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge);
156 virtual ~SrsPublishRecvThread(); 161 virtual ~SrsPublishRecvThread();
157 public: 162 public:
@@ -173,7 +178,8 @@ public: @@ -173,7 +178,8 @@ public:
173 virtual void on_recv_error(int ret); 178 virtual void on_recv_error(int ret);
174 // interface IMergeReadHandler 179 // interface IMergeReadHandler
175 public: 180 public:
176 - virtual void on_read(int nb_buffer, ssize_t nread); 181 + virtual void on_read(ssize_t nread);
  182 + virtual void on_buffer_change(int nb_buffer);
177 }; 183 };
178 184
179 #endif 185 #endif
@@ -659,7 +659,7 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) @@ -659,7 +659,7 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source)
659 659
660 // use isolate thread to recv, 660 // use isolate thread to recv,
661 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 661 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
662 - SrsPublishRecvThread trd(rtmp, 662 + SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd),
663 SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000, 663 SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000,
664 this, source, true, vhost_is_edge); 664 this, source, true, vhost_is_edge);
665 665
@@ -695,7 +695,7 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -695,7 +695,7 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
695 695
696 // use isolate thread to recv, 696 // use isolate thread to recv,
697 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 697 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
698 - SrsPublishRecvThread trd(rtmp, 698 + SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd),
699 SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000, 699 SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000,
700 this, source, false, vhost_is_edge); 700 this, source, false, vhost_is_edge);
701 701
@@ -112,7 +112,7 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -112,7 +112,7 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
112 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 112 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
113 */ 113 */
114 if (merged_read && _handler) { 114 if (merged_read && _handler) {
115 - _handler->on_read(nb_buffer, nread); 115 + _handler->on_read(nread);
116 } 116 }
117 117
118 srs_assert((int)nread > 0); 118 srs_assert((int)nread > 0);
@@ -122,10 +122,14 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -122,10 +122,14 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
122 return ret; 122 return ret;
123 } 123 }
124 124
125 -void SrsBuffer::set_merge_read(bool v, IMergeReadHandler* handler) 125 +void SrsBuffer::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler)
126 { 126 {
127 merged_read = v; 127 merged_read = v;
128 _handler = handler; 128 _handler = handler;
  129 +
  130 + if (v && max_buffer != nb_buffer) {
  131 + reset_buffer(max_buffer);
  132 + }
129 } 133 }
130 134
131 void SrsBuffer::on_chunk_size(int32_t chunk_size) 135 void SrsBuffer::on_chunk_size(int32_t chunk_size)
@@ -134,10 +138,7 @@ void SrsBuffer::on_chunk_size(int32_t chunk_size) @@ -134,10 +138,7 @@ void SrsBuffer::on_chunk_size(int32_t chunk_size)
134 return; 138 return;
135 } 139 }
136 140
137 - srs_freep(buffer);  
138 -  
139 - nb_buffer = chunk_size;  
140 - buffer = new char[nb_buffer]; 141 + reset_buffer(chunk_size);
141 } 142 }
142 143
143 int SrsBuffer::buffer_size() 144 int SrsBuffer::buffer_size()
@@ -145,4 +146,14 @@ int SrsBuffer::buffer_size() @@ -145,4 +146,14 @@ int SrsBuffer::buffer_size()
145 return nb_buffer; 146 return nb_buffer;
146 } 147 }
147 148
  149 +void SrsBuffer::reset_buffer(int size)
  150 +{
  151 + srs_freep(buffer);
  152 +
  153 + nb_buffer = size;
  154 + buffer = new char[nb_buffer];
148 155
  156 + if (_handler) {
  157 + _handler->on_buffer_change(nb_buffer);
  158 + }
  159 +}
@@ -51,7 +51,12 @@ public: @@ -51,7 +51,12 @@ public:
51 * some small bytes. 51 * some small bytes.
52 * @remark, it only for server-side, client srs-librtmp just ignore. 52 * @remark, it only for server-side, client srs-librtmp just ignore.
53 */ 53 */
54 - virtual void on_read(int nb_buffer, ssize_t nread) = 0; 54 + virtual void on_read(ssize_t nread) = 0;
  55 + /**
  56 + * when buffer size changed.
  57 + * @param nb_buffer the new buffer size.
  58 + */
  59 + virtual void on_buffer_change(int nb_buffer) = 0;
55 }; 60 };
56 61
57 /** 62 /**
@@ -110,11 +115,11 @@ public: @@ -110,11 +115,11 @@ public:
110 * when it on and read small bytes, we sleep to wait more data., 115 * when it on and read small bytes, we sleep to wait more data.,
111 * that is, we merge some data to read together. 116 * that is, we merge some data to read together.
112 * @param v true to ename merged read. 117 * @param v true to ename merged read.
  118 + * @param max_buffer the max buffer size, the socket buffer.
113 * @param handler the handler when merge read is enabled. 119 * @param handler the handler when merge read is enabled.
114 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 120 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
115 */ 121 */
116 - virtual void set_merge_read(bool v, IMergeReadHandler* handler);  
117 -public: 122 + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler);
118 /** 123 /**
119 * when chunk size changed, the buffer should change the buffer also. 124 * when chunk size changed, the buffer should change the buffer also.
120 * to keep the socket buffer size always greater than chunk size. 125 * to keep the socket buffer size always greater than chunk size.
@@ -125,6 +130,8 @@ public: @@ -125,6 +130,8 @@ public:
125 * get the size of socket buffer to read. 130 * get the size of socket buffer to read.
126 */ 131 */
127 virtual int buffer_size(); 132 virtual int buffer_size();
  133 +private:
  134 + virtual void reset_buffer(int size);
128 }; 135 };
129 136
130 #endif 137 #endif
@@ -745,9 +745,9 @@ void SrsRtmpServer::set_auto_response(bool v) @@ -745,9 +745,9 @@ void SrsRtmpServer::set_auto_response(bool v)
745 protocol->set_auto_response(v); 745 protocol->set_auto_response(v);
746 } 746 }
747 747
748 -void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler) 748 +void SrsRtmpServer::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler)
749 { 749 {
750 - protocol->set_merge_read(v, handler); 750 + protocol->set_merge_read(v, max_buffer, handler);
751 } 751 }
752 752
753 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) 753 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
@@ -348,10 +348,11 @@ public: @@ -348,10 +348,11 @@ public:
348 * when it on and read small bytes, we sleep to wait more data., 348 * when it on and read small bytes, we sleep to wait more data.,
349 * that is, we merge some data to read together. 349 * that is, we merge some data to read together.
350 * @param v true to ename merged read. 350 * @param v true to ename merged read.
  351 + * @param max_buffer the max buffer size, the socket buffer.
351 * @param handler the handler when merge read is enabled. 352 * @param handler the handler when merge read is enabled.
352 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 353 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
353 */ 354 */
354 - virtual void set_merge_read(bool v, IMergeReadHandler* handler); 355 + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler);
355 /** 356 /**
356 * set/get the recv timeout in us. 357 * set/get the recv timeout in us.
357 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. 358 * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
@@ -479,9 +479,9 @@ int SrsProtocol::manual_response_flush() @@ -479,9 +479,9 @@ int SrsProtocol::manual_response_flush()
479 return ret; 479 return ret;
480 } 480 }
481 481
482 -void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler) 482 +void SrsProtocol::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler)
483 { 483 {
484 - in_buffer->set_merge_read(v, handler); 484 + in_buffer->set_merge_read(v, max_buffer, handler);
485 } 485 }
486 486
487 void SrsProtocol::set_recv_timeout(int64_t timeout_us) 487 void SrsProtocol::set_recv_timeout(int64_t timeout_us)
@@ -276,10 +276,11 @@ public: @@ -276,10 +276,11 @@ public:
276 * when it on and read small bytes, we sleep to wait more data., 276 * when it on and read small bytes, we sleep to wait more data.,
277 * that is, we merge some data to read together. 277 * that is, we merge some data to read together.
278 * @param v true to ename merged read. 278 * @param v true to ename merged read.
  279 + * @param max_buffer the max buffer size, the socket buffer.
279 * @param handler the handler when merge read is enabled. 280 * @param handler the handler when merge read is enabled.
280 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 281 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
281 */ 282 */
282 - virtual void set_merge_read(bool v, IMergeReadHandler* handler); 283 + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler);
283 public: 284 public:
284 /** 285 /**
285 * set/get the recv timeout in us. 286 * set/get the recv timeout in us.