winlin

refine the recv buffer for mr.

@@ -300,8 +300,14 @@ void SrsPublishRecvThread::on_thread_start() @@ -300,8 +300,14 @@ void SrsPublishRecvThread::on_thread_start()
300 // for the main thread never send message. 300 // for the main thread never send message.
301 301
302 #ifdef SRS_PERF_MERGED_READ 302 #ifdef SRS_PERF_MERGED_READ
303 - // for mr.  
304 - update_buffer(mr, mr_sleep); 303 + if (mr) {
  304 + // set underlayer buffer size
  305 + set_socket_buffer(mr_sleep);
  306 +
  307 + // disable the merge read
  308 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  309 + rtmp->set_merge_read(true, this);
  310 + }
305 #endif 311 #endif
306 } 312 }
307 313
@@ -315,9 +321,11 @@ void SrsPublishRecvThread::on_thread_stop() @@ -315,9 +321,11 @@ void SrsPublishRecvThread::on_thread_stop()
315 st_cond_signal(error); 321 st_cond_signal(error);
316 322
317 #ifdef SRS_PERF_MERGED_READ 323 #ifdef SRS_PERF_MERGED_READ
318 - // disable the merge read  
319 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/241  
320 - rtmp->set_merge_read(false, NULL); 324 + if (mr) {
  325 + // disable the merge read
  326 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  327 + rtmp->set_merge_read(false, NULL);
  328 + }
321 #endif 329 #endif
322 } 330 }
323 331
@@ -383,41 +391,24 @@ int SrsPublishRecvThread::on_reload_vhost_mr(string vhost) @@ -383,41 +391,24 @@ int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
383 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 391 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
384 bool mr_enabled = _srs_config->get_mr_enabled(req->vhost); 392 bool mr_enabled = _srs_config->get_mr_enabled(req->vhost);
385 int sleep_ms = _srs_config->get_mr_sleep_ms(req->vhost); 393 int sleep_ms = _srs_config->get_mr_sleep_ms(req->vhost);
386 - update_buffer(mr_enabled, sleep_ms);  
387 -  
388 - return ret;  
389 -}  
390 394
391 -void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms)  
392 -{  
393 - // TODO: FIXME: refine it.  
394 - 395 + // update buffer when sleep ms changed.
  396 + if (mr_sleep != sleep_ms) {
  397 + set_socket_buffer(sleep_ms);
  398 + }
  399 +
395 #ifdef SRS_PERF_MERGED_READ 400 #ifdef SRS_PERF_MERGED_READ
396 - // update the buffer.  
397 - if (true) {  
398 - // the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.  
399 - // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,  
400 - // 128KB=131072, 256KB=262144, 512KB=524288  
401 - // the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,  
402 - // for example, your system delivery stream in 1000kbps,  
403 - // sleep 800ms for small bytes, the buffer should set to:  
404 - // 800*1000/8=100000B(about 128KB).  
405 - // 2000*3000/8=750000B(about 732KB).  
406 - int kbps = 3000;  
407 - int socket_buffer_size = mr_sleep * kbps / 8;  
408 -  
409 - // socket recv buffer, system will double it.  
410 - int nb_rbuf = socket_buffer_size / 2;  
411 - socklen_t sock_buf_size = sizeof(int);  
412 - if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {  
413 - srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);  
414 - }  
415 - getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);  
416 -  
417 - srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",  
418 - socket_buffer_size, nb_rbuf, mr_sleep, SRS_MR_SMALL_BYTES);  
419 -  
420 - rtmp->set_recv_buffer(nb_rbuf); 401 + // mr enabled=>disabled
  402 + if (mr && !mr_enabled) {
  403 + // disable the merge read
  404 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  405 + rtmp->set_merge_read(false, NULL);
  406 + }
  407 + // mr disabled=>enabled
  408 + if (!mr && mr_enabled) {
  409 + // enable the merge read
  410 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  411 + rtmp->set_merge_read(true, this);
421 } 412 }
422 #endif 413 #endif
423 414
@@ -425,17 +416,33 @@ void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms) @@ -425,17 +416,33 @@ void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms)
425 mr = mr_enabled; 416 mr = mr_enabled;
426 mr_sleep = sleep_ms; 417 mr_sleep = sleep_ms;
427 418
428 -#ifdef SRS_PERF_MERGED_READ  
429 - // apply new state.  
430 - if (mr) {  
431 - // enable the merge read  
432 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/241  
433 - rtmp->set_merge_read(true, this);  
434 - } else {  
435 - // disable the merge read  
436 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/241  
437 - rtmp->set_merge_read(false, NULL); 419 + return ret;
  420 +}
  421 +
  422 +void SrsPublishRecvThread::set_socket_buffer(int sleep_ms)
  423 +{
  424 + // the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.
  425 + // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
  426 + // 128KB=131072, 256KB=262144, 512KB=524288
  427 + // the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,
  428 + // for example, your system delivery stream in 1000kbps,
  429 + // sleep 800ms for small bytes, the buffer should set to:
  430 + // 800*1000/8=100000B(about 128KB).
  431 + // 2000*3000/8=750000B(about 732KB).
  432 + int kbps = 3000;
  433 + int socket_buffer_size = sleep_ms * kbps / 8;
  434 +
  435 + // socket recv buffer, system will double it.
  436 + int nb_rbuf = socket_buffer_size / 2;
  437 + socklen_t sock_buf_size = sizeof(int);
  438 + if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
  439 + srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
438 } 440 }
439 -#endif 441 + getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
  442 +
  443 + srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
  444 + socket_buffer_size, nb_rbuf, sleep_ms, SRS_MR_SMALL_BYTES);
  445 +
  446 + rtmp->set_recv_buffer(nb_rbuf);
440 } 447 }
441 448
@@ -194,7 +194,7 @@ public: @@ -194,7 +194,7 @@ public:
194 public: 194 public:
195 virtual int on_reload_vhost_mr(std::string vhost); 195 virtual int on_reload_vhost_mr(std::string vhost);
196 private: 196 private:
197 - virtual void update_buffer(bool mr_enabled, int sleep_ms); 197 + virtual void set_socket_buffer(int sleep_ms);
198 }; 198 };
199 199
200 #endif 200 #endif