winlin

for bug #241, support mr(merged-read) config and reload. 2.0.52.

@@ -485,6 +485,7 @@ Supported operating systems and hardware: @@ -485,6 +485,7 @@ Supported operating systems and hardware:
485 * 2013-10-17, Created.<br/> 485 * 2013-10-17, Created.<br/>
486 486
487 ## History 487 ## History
  488 +* v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
488 * v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50 489 * v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50
489 * v2.0, 2014-12-04, fix [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), improve about 15% performance for fast buffer. 2.0.49 490 * v2.0, 2014-12-04, fix [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), improve about 15% performance for fast buffer. 2.0.49
490 * v2.0, 2014-12-03, fix [#244](https://github.com/winlinvip/simple-rtmp-server/issues/244), conn thread use cond to wait for recv thread error. 2.0.47. 491 * v2.0, 2014-12-03, fix [#244](https://github.com/winlinvip/simple-rtmp-server/issues/244), conn thread use cond to wait for recv thread error. 2.0.47.
@@ -142,6 +142,26 @@ http_stream { @@ -142,6 +142,26 @@ http_stream {
142 vhost __defaultVhost__ { 142 vhost __defaultVhost__ {
143 } 143 }
144 144
  145 +# the MR(merged-read) setting for publisher.
  146 +vhost mr.srs.com {
  147 + # about MR, read https://github.com/winlinvip/simple-rtmp-server/issues/241
  148 + mr {
  149 + # whether enable the MR(merged-read)
  150 + # default: off
  151 + enabled on;
  152 + # the latency in ms for MR(merged-read),
  153 + # the performance+ when latency+, and memory+,
  154 + # memory(buffer) = latency * kbps / 8
  155 + # for example, latency=500ms, kbps=3000kbps, each publish connection will consume
  156 + # memory = 500 * 3000 / 8 = 187500B = 183KB
  157 + # when there are 2500 publisher, the total memory of SRS atleast:
  158 + # 183KB * 2500 = 446MB
  159 + # the value recomment is [300, 2000]
  160 + # default: 500
  161 + latency 500;
  162 + }
  163 +}
  164 +
145 # vhost for edge, edge and origin is the same vhost 165 # vhost for edge, edge and origin is the same vhost
146 vhost same.edge.srs.com { 166 vhost same.edge.srs.com {
147 # the mode of vhost, local or remote. 167 # the mode of vhost, local or remote.
@@ -816,7 +816,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) @@ -816,7 +816,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
816 return ret; 816 return ret;
817 } 817 }
818 } 818 }
819 - srs_trace("vhost %s reload hls success.", vhost.c_str()); 819 + srs_trace("vhost %s reload hlsdvrsuccess.", vhost.c_str());
  820 + }
  821 + // mr, only one per vhost
  822 + if (!srs_directive_equals(new_vhost->get("mr"), old_vhost->get("mr"))) {
  823 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  824 + ISrsReloadHandler* subscribe = *it;
  825 + if ((ret = subscribe->on_reload_vhost_mr(vhost)) != ERROR_SUCCESS) {
  826 + srs_error("vhost %s notify subscribes mr failed. ret=%d", vhost.c_str(), ret);
  827 + return ret;
  828 + }
  829 + }
  830 + srs_trace("vhost %s reload mr success.", vhost.c_str());
820 } 831 }
821 // http, only one per vhost. 832 // http, only one per vhost.
822 if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) { 833 if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) {
@@ -1316,6 +1327,7 @@ int SrsConfig::check_config() @@ -1316,6 +1327,7 @@ int SrsConfig::check_config()
1316 && n != "time_jitter" 1327 && n != "time_jitter"
1317 && n != "atc" && n != "atc_auto" 1328 && n != "atc" && n != "atc_auto"
1318 && n != "debug_srs_upnode" 1329 && n != "debug_srs_upnode"
  1330 + && n != "mr"
1319 ) { 1331 ) {
1320 ret = ERROR_SYSTEM_CONFIG_INVALID; 1332 ret = ERROR_SYSTEM_CONFIG_INVALID;
1321 srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret); 1333 srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret);
@@ -1333,6 +1345,16 @@ int SrsConfig::check_config() @@ -1333,6 +1345,16 @@ int SrsConfig::check_config()
1333 return ret; 1345 return ret;
1334 } 1346 }
1335 } 1347 }
  1348 + } else if (n == "mr") {
  1349 + for (int j = 0; j < (int)conf->directives.size(); j++) {
  1350 + string m = conf->at(j)->name.c_str();
  1351 + if (m != "enabled" && m != "latency"
  1352 + ) {
  1353 + ret = ERROR_SYSTEM_CONFIG_INVALID;
  1354 + srs_error("unsupported vhost mr directive %s, ret=%d", m.c_str(), ret);
  1355 + return ret;
  1356 + }
  1357 + }
1336 } else if (n == "ingest") { 1358 } else if (n == "ingest") {
1337 for (int j = 0; j < (int)conf->directives.size(); j++) { 1359 for (int j = 0; j < (int)conf->directives.size(); j++) {
1338 string m = conf->at(j)->name.c_str(); 1360 string m = conf->at(j)->name.c_str();
@@ -2078,6 +2100,50 @@ int SrsConfig::get_chunk_size(string vhost) @@ -2078,6 +2100,50 @@ int SrsConfig::get_chunk_size(string vhost)
2078 return ::atoi(conf->arg0().c_str()); 2100 return ::atoi(conf->arg0().c_str());
2079 } 2101 }
2080 2102
  2103 +bool SrsConfig::get_mr_enabled(string vhost)
  2104 +{
  2105 +
  2106 + SrsConfDirective* conf = get_vhost(vhost);
  2107 +
  2108 + if (!conf) {
  2109 + return SRS_CONSTS_RTMP_MR;
  2110 + }
  2111 +
  2112 + conf = conf->get("mr");
  2113 + if (!conf) {
  2114 + return SRS_CONSTS_RTMP_MR;
  2115 + }
  2116 +
  2117 + conf = conf->get("enabled");
  2118 + if (!conf || conf->arg0() != "on") {
  2119 + return SRS_CONSTS_RTMP_MR;
  2120 + }
  2121 +
  2122 + return true;
  2123 +}
  2124 +
  2125 +int SrsConfig::get_mr_sleep_ms(string vhost)
  2126 +{
  2127 +
  2128 + SrsConfDirective* conf = get_vhost(vhost);
  2129 +
  2130 + if (!conf) {
  2131 + return SRS_CONSTS_RTMP_MR_SLEEP;
  2132 + }
  2133 +
  2134 + conf = conf->get("mr");
  2135 + if (!conf) {
  2136 + return SRS_CONSTS_RTMP_MR_SLEEP;
  2137 + }
  2138 +
  2139 + conf = conf->get("latency");
  2140 + if (!conf || conf->arg0().empty()) {
  2141 + return SRS_CONSTS_RTMP_MR_SLEEP;
  2142 + }
  2143 +
  2144 + return ::atoi(conf->arg0().c_str());
  2145 +}
  2146 +
2081 int SrsConfig::get_global_chunk_size() 2147 int SrsConfig::get_global_chunk_size()
2082 { 2148 {
2083 SrsConfDirective* conf = root->get("chunk_size"); 2149 SrsConfDirective* conf = root->get("chunk_size");
@@ -530,6 +530,16 @@ public: @@ -530,6 +530,16 @@ public:
530 * @remark, default 60000. 530 * @remark, default 60000.
531 */ 531 */
532 virtual int get_chunk_size(std::string vhost); 532 virtual int get_chunk_size(std::string vhost);
  533 + /**
  534 + * whether mr is enabled for vhost.
  535 + * @param vhost, the vhost to get the mr.
  536 + */
  537 + virtual bool get_mr_enabled(std::string vhost);
  538 + /**
  539 + * get the mr sleep time in ms for vhost.
  540 + * @param vhost, the vhost to get the mr sleep time.
  541 + */
  542 + virtual int get_mr_sleep_ms(std::string vhost);
533 private: 543 private:
534 /** 544 /**
535 * get the global chunk size. 545 * get the global chunk size.
@@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 #include <srs_protocol_buffer.hpp> 29 #include <srs_protocol_buffer.hpp>
30 #include <srs_kernel_utility.hpp> 30 #include <srs_kernel_utility.hpp>
31 #include <srs_core_performance.hpp> 31 #include <srs_core_performance.hpp>
  32 +#include <srs_app_config.hpp>
  33 +
  34 +using namespace std;
32 35
33 ISrsMessageHandler::ISrsMessageHandler() 36 ISrsMessageHandler::ISrsMessageHandler()
34 { 37 {
@@ -221,11 +224,13 @@ void SrsQueueRecvThread::on_thread_stop() @@ -221,11 +224,13 @@ void SrsQueueRecvThread::on_thread_stop()
221 } 224 }
222 225
223 SrsPublishRecvThread::SrsPublishRecvThread( 226 SrsPublishRecvThread::SrsPublishRecvThread(
224 - SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms, 227 + SrsRtmpServer* rtmp_sdk,
  228 + SrsRequest* _req, int mr_sock_fd, int timeout_ms,
225 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge 229 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
226 ): trd(this, rtmp_sdk, timeout_ms) 230 ): trd(this, rtmp_sdk, timeout_ms)
227 { 231 {
228 rtmp = rtmp_sdk; 232 rtmp = rtmp_sdk;
  233 +
229 _conn = conn; 234 _conn = conn;
230 _source = source; 235 _source = source;
231 _is_fmle = is_fmle; 236 _is_fmle = is_fmle;
@@ -234,12 +239,22 @@ SrsPublishRecvThread::SrsPublishRecvThread( @@ -234,12 +239,22 @@ SrsPublishRecvThread::SrsPublishRecvThread(
234 recv_error_code = ERROR_SUCCESS; 239 recv_error_code = ERROR_SUCCESS;
235 _nb_msgs = 0; 240 _nb_msgs = 0;
236 error = st_cond_new(); 241 error = st_cond_new();
  242 +
  243 + req = _req;
  244 + mr_fd = mr_sock_fd;
237 245
238 - mr_fd = fd; 246 + // the mr settings,
  247 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  248 + mr = _srs_config->get_mr_enabled(req->vhost);
  249 + mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
  250 +
  251 + _srs_config->subscribe(this);
239 } 252 }
240 253
241 SrsPublishRecvThread::~SrsPublishRecvThread() 254 SrsPublishRecvThread::~SrsPublishRecvThread()
242 { 255 {
  256 + _srs_config->unsubscribe(this);
  257 +
243 trd.stop(); 258 trd.stop();
244 st_cond_destroy(error); 259 st_cond_destroy(error);
245 } 260 }
@@ -282,20 +297,8 @@ void SrsPublishRecvThread::on_thread_start() @@ -282,20 +297,8 @@ void SrsPublishRecvThread::on_thread_start()
282 // for the main thread never send message. 297 // for the main thread never send message.
283 298
284 #ifdef SRS_PERF_MERGED_READ 299 #ifdef SRS_PERF_MERGED_READ
285 - // socket recv buffer, system will double it.  
286 - int nb_rbuf = SRS_MR_SOCKET_BUFFER / 2;  
287 - socklen_t sock_buf_size = sizeof(int);  
288 - if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {  
289 - srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);  
290 - }  
291 - getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);  
292 -  
293 - srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",  
294 - SRS_MR_SOCKET_BUFFER, nb_rbuf, SRS_MR_MAX_SLEEP_MS, SRS_MR_SMALL_BYTES);  
295 -  
296 - // enable the merge read  
297 - // @see https://github.com/winlinvip/simple-rtmp-server/issues/241  
298 - rtmp->set_merge_read(true, this); 300 + // for mr.
  301 + update_buffer(mr, mr_sleep);
299 #endif 302 #endif
300 } 303 }
301 304
@@ -349,7 +352,11 @@ void SrsPublishRecvThread::on_recv_error(int ret) @@ -349,7 +352,11 @@ void SrsPublishRecvThread::on_recv_error(int ret)
349 #ifdef SRS_PERF_MERGED_READ 352 #ifdef SRS_PERF_MERGED_READ
350 void SrsPublishRecvThread::on_read(ssize_t nread) 353 void SrsPublishRecvThread::on_read(ssize_t nread)
351 { 354 {
352 - if (nread < 0 || SRS_MR_MAX_SLEEP_MS <= 0) { 355 + if (!mr) {
  356 + return;
  357 + }
  358 +
  359 + if (nread < 0 || mr_sleep <= 0) {
353 return; 360 return;
354 } 361 }
355 362
@@ -360,7 +367,72 @@ void SrsPublishRecvThread::on_read(ssize_t nread) @@ -360,7 +367,72 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
360 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 367 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
361 */ 368 */
362 if (nread < SRS_MR_SMALL_BYTES) { 369 if (nread < SRS_MR_SMALL_BYTES) {
363 - st_usleep(SRS_MR_MAX_SLEEP_MS * 1000); 370 + st_usleep(mr_sleep * 1000);
364 } 371 }
365 } 372 }
366 #endif 373 #endif
  374 +
  375 +int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
  376 +{
  377 + int ret = ERROR_SUCCESS;
  378 +
  379 + // the mr settings,
  380 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  381 + bool mr_enabled = _srs_config->get_mr_enabled(req->vhost);
  382 + int sleep_ms = _srs_config->get_mr_sleep_ms(req->vhost);
  383 + update_buffer(mr_enabled, sleep_ms);
  384 +
  385 + return ret;
  386 +}
  387 +
  388 +void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms)
  389 +{
  390 + // TODO: FIXME: refine it.
  391 +
  392 +#ifdef SRS_PERF_MERGED_READ
  393 + // previous enabled mr, update the buffer.
  394 + if (mr && mr_sleep != sleep_ms) {
  395 + // the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.
  396 + // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
  397 + // 128KB=131072, 256KB=262144, 512KB=524288
  398 + // the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,
  399 + // for example, your system delivery stream in 1000kbps,
  400 + // sleep 800ms for small bytes, the buffer should set to:
  401 + // 800*1000/8=100000B(about 128KB).
  402 + // 2000*3000/8=750000B(about 732KB).
  403 + int kbps = 3000;
  404 + int socket_buffer_size = mr_sleep * kbps / 8;
  405 +
  406 + // socket recv buffer, system will double it.
  407 + int nb_rbuf = socket_buffer_size / 2;
  408 + socklen_t sock_buf_size = sizeof(int);
  409 + if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
  410 + srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
  411 + }
  412 + getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
  413 +
  414 + srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
  415 + socket_buffer_size, nb_rbuf, mr_sleep, SRS_MR_SMALL_BYTES);
  416 +
  417 + rtmp->set_recv_buffer(nb_rbuf);
  418 + }
  419 +#endif
  420 +
  421 + // update to new state
  422 + mr = mr_enabled;
  423 + mr_sleep = sleep_ms;
  424 +
  425 +#ifdef SRS_PERF_MERGED_READ
  426 + // apply new state.
  427 + if (mr) {
  428 + // enable the merge read
  429 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  430 + rtmp->set_merge_read(true, this);
  431 + } else {
  432 + // disable the merge read
  433 + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  434 + rtmp->set_merge_read(false, NULL);
  435 + }
  436 +#endif
  437 +}
  438 +
@@ -35,11 +35,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,11 +35,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #include <srs_app_thread.hpp> 35 #include <srs_app_thread.hpp>
36 #include <srs_protocol_buffer.hpp> 36 #include <srs_protocol_buffer.hpp>
37 #include <srs_core_performance.hpp> 37 #include <srs_core_performance.hpp>
  38 +#include <srs_app_reload.hpp>
38 39
39 class SrsRtmpServer; 40 class SrsRtmpServer;
40 class SrsMessage; 41 class SrsMessage;
41 class SrsRtmpConn; 42 class SrsRtmpConn;
42 class SrsSource; 43 class SrsSource;
  44 +class SrsRequest;
43 45
44 /** 46 /**
45 * for the recv thread to handle the message. 47 * for the recv thread to handle the message.
@@ -138,15 +140,19 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler @@ -138,15 +140,19 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
138 #ifdef SRS_PERF_MERGED_READ 140 #ifdef SRS_PERF_MERGED_READ
139 , virtual public IMergeReadHandler 141 , virtual public IMergeReadHandler
140 #endif 142 #endif
  143 + , virtual public ISrsReloadHandler
141 { 144 {
142 private: 145 private:
143 SrsRecvThread trd; 146 SrsRecvThread trd;
144 SrsRtmpServer* rtmp; 147 SrsRtmpServer* rtmp;
  148 + SrsRequest* req;
145 // the msgs already got. 149 // the msgs already got.
146 int64_t _nb_msgs; 150 int64_t _nb_msgs;
147 // for mr(merged read), 151 // for mr(merged read),
148 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 152 // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  153 + bool mr;
149 int mr_fd; 154 int mr_fd;
  155 + int mr_sleep;
150 // the recv thread error code. 156 // the recv thread error code.
151 int recv_error_code; 157 int recv_error_code;
152 SrsRtmpConn* _conn; 158 SrsRtmpConn* _conn;
@@ -158,7 +164,8 @@ private: @@ -158,7 +164,8 @@ private:
158 // @see https://github.com/winlinvip/simple-rtmp-server/issues/244 164 // @see https://github.com/winlinvip/simple-rtmp-server/issues/244
159 st_cond_t error; 165 st_cond_t error;
160 public: 166 public:
161 - SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms, 167 + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk,
  168 + SrsRequest* _req, int mr_sock_fd, int timeout_ms,
162 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); 169 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge);
163 virtual ~SrsPublishRecvThread(); 170 virtual ~SrsPublishRecvThread();
164 public: 171 public:
@@ -183,6 +190,11 @@ public: @@ -183,6 +190,11 @@ public:
183 #ifdef SRS_PERF_MERGED_READ 190 #ifdef SRS_PERF_MERGED_READ
184 virtual void on_read(ssize_t nread); 191 virtual void on_read(ssize_t nread);
185 #endif 192 #endif
  193 +// interface ISrsReloadHandler
  194 +public:
  195 + virtual int on_reload_vhost_mr(std::string vhost);
  196 +private:
  197 + virtual void update_buffer(bool mr_enabled, int sleep_ms);
186 }; 198 };
187 199
188 #endif 200 #endif
@@ -140,6 +140,11 @@ int ISrsReloadHandler::on_reload_vhost_dvr(string /*vhost*/) @@ -140,6 +140,11 @@ int ISrsReloadHandler::on_reload_vhost_dvr(string /*vhost*/)
140 return ERROR_SUCCESS; 140 return ERROR_SUCCESS;
141 } 141 }
142 142
  143 +int ISrsReloadHandler::on_reload_vhost_mr(string /*vhost*/)
  144 +{
  145 + return ERROR_SUCCESS;
  146 +}
  147 +
143 int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/) 148 int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/)
144 { 149 {
145 return ERROR_SUCCESS; 150 return ERROR_SUCCESS;
@@ -65,6 +65,7 @@ public: @@ -65,6 +65,7 @@ public:
65 virtual int on_reload_vhost_forward(std::string vhost); 65 virtual int on_reload_vhost_forward(std::string vhost);
66 virtual int on_reload_vhost_hls(std::string vhost); 66 virtual int on_reload_vhost_hls(std::string vhost);
67 virtual int on_reload_vhost_dvr(std::string vhost); 67 virtual int on_reload_vhost_dvr(std::string vhost);
  68 + virtual int on_reload_vhost_mr(std::string vhost);
68 virtual int on_reload_vhost_transcode(std::string vhost); 69 virtual int on_reload_vhost_transcode(std::string vhost);
69 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); 70 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
70 virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); 71 virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
@@ -660,9 +660,8 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) @@ -660,9 +660,8 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source)
660 660
661 // use isolate thread to recv, 661 // use isolate thread to recv,
662 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 662 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
663 - SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd),  
664 - SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000,  
665 - this, source, true, vhost_is_edge); 663 + SrsPublishRecvThread trd(rtmp, req,
  664 + st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
666 665
667 srs_info("start to publish stream %s success", req->stream.c_str()); 666 srs_info("start to publish stream %s success", req->stream.c_str());
668 ret = do_publishing(source, &trd); 667 ret = do_publishing(source, &trd);
@@ -696,9 +695,8 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -696,9 +695,8 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
696 695
697 // use isolate thread to recv, 696 // use isolate thread to recv,
698 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 697 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
699 - SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd),  
700 - SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000,  
701 - this, source, false, vhost_is_edge); 698 + SrsPublishRecvThread trd(rtmp, req,
  699 + st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
702 700
703 srs_info("start to publish stream %s success", req->stream.c_str()); 701 srs_info("start to publish stream %s success", req->stream.c_str());
704 ret = do_publishing(source, &trd); 702 ret = do_publishing(source, &trd);
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 51 34 +#define VERSION_REVISION 52
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -42,22 +42,23 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -42,22 +42,23 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
42 * @example, for the default settings, this algorithm will use: 42 * @example, for the default settings, this algorithm will use:
43 * that is, when got nread bytes smaller than 4KB, sleep(780ms). 43 * that is, when got nread bytes smaller than 4KB, sleep(780ms).
44 */ 44 */
45 -#if 1  
46 - // to enable merged read.  
47 - #define SRS_PERF_MERGED_READ  
48 - // the max sleep time in ms  
49 - #define SRS_MR_MAX_SLEEP_MS 780  
50 - // the max small bytes to group  
51 - #define SRS_MR_SMALL_BYTES 4096  
52 - // the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.  
53 - // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,  
54 - // 128KB=131072, 256KB=262144, 512KB=524288  
55 - // the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,  
56 - // for example, your system delivery stream in 1000kbps,  
57 - // sleep 800ms for small bytes, the buffer should set to:  
58 - // 800*1000/8=100000B(about 128KB).  
59 - #define SRS_MR_SOCKET_BUFFER 65536  
60 -#endif 45 +/**
  46 +* https://github.com/winlinvip/simple-rtmp-server/issues/241#issuecomment-65554690
  47 +* The merged read algorithm is ok and can be simplified for:
  48 +* 1. Suppose the client network is ok. All algorithm go wrong when netowrk is not ok.
  49 +* 2. Suppose the client send each packet one by one. Although send some together, it's same.
  50 +* 3. SRS MR algorithm will read all data then sleep.
  51 +* So, the MR algorithm is:
  52 +* while true:
  53 +* read all data from socket.
  54 +* sleep a while
  55 +* For example, sleep 120ms. Then there is, and always 120ms data in buffer.
  56 +* That is, the latency is 120ms(the sleep time).
  57 +*/
  58 +// to enable merged read.
  59 +#undef SRS_PERF_MERGED_READ
  60 +// the max sleep time in ms
  61 +#define SRS_MR_MAX_SLEEP_MS 800
61 62
62 /** 63 /**
63 * the send cache time in ms. 64 * the send cache time in ms.
@@ -50,6 +50,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -50,6 +50,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
50 // 6. Chunking, RTMP protocol default chunk size. 50 // 6. Chunking, RTMP protocol default chunk size.
51 #define SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE 128 51 #define SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE 128
52 52
  53 +// the default setting of mr.
  54 +#define SRS_CONSTS_RTMP_MR false
  55 +#define SRS_CONSTS_RTMP_MR_SLEEP 500
  56 +
53 /** 57 /**
54 * 6. Chunking 58 * 6. Chunking
55 * The chunk size is configurable. It can be set using a control 59 * The chunk size is configurable. It can be set using a control
@@ -28,6 +28,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,6 +28,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 #include <srs_kernel_utility.hpp> 28 #include <srs_kernel_utility.hpp>
29 #include <srs_core_performance.hpp> 29 #include <srs_core_performance.hpp>
30 30
  31 +// the max small bytes to group
  32 +#define SRS_MR_SMALL_BYTES 4096
  33 +// the default recv buffer size
  34 +#define SRS_DEFAULT_RECV_BUFFER_SIZE 8192
  35 +
31 // the max header size, 36 // the max header size,
32 // @see SrsProtocol::read_message_header(). 37 // @see SrsProtocol::read_message_header().
33 #define SRS_RTMP_MAX_MESSAGE_HEADER 11 38 #define SRS_RTMP_MAX_MESSAGE_HEADER 11
@@ -90,11 +95,30 @@ SrsFastBuffer::SrsFastBuffer() @@ -90,11 +95,30 @@ SrsFastBuffer::SrsFastBuffer()
90 _handler = NULL; 95 _handler = NULL;
91 #endif 96 #endif
92 97
93 - nb_buffer = SRS_MR_SOCKET_BUFFER; 98 + nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE;
94 buffer = new char[nb_buffer]; 99 buffer = new char[nb_buffer];
95 p = end = buffer; 100 p = end = buffer;
96 } 101 }
97 102
  103 +void SrsFastBuffer::set_buffer(int buffer_size)
  104 +{
  105 + // only realloc when buffer changed bigger
  106 + if (buffer_size <= nb_buffer) {
  107 + return;
  108 + }
  109 +
  110 + int start = p - buffer;
  111 + int cap = end - p;
  112 +
  113 + char* buf = new char[buffer_size];
  114 + memcpy(buf, buffer, nb_buffer);
  115 + srs_freep(buffer);
  116 +
  117 + buffer = buf;
  118 + p = buffer + start;
  119 + end = p + cap;
  120 +}
  121 +
98 SrsFastBuffer::~SrsFastBuffer() 122 SrsFastBuffer::~SrsFastBuffer()
99 { 123 {
100 srs_freep(buffer); 124 srs_freep(buffer);
@@ -124,6 +124,15 @@ public: @@ -124,6 +124,15 @@ public:
124 virtual ~SrsFastBuffer(); 124 virtual ~SrsFastBuffer();
125 public: 125 public:
126 /** 126 /**
  127 + * create buffer with specifeid size.
  128 + * @param buffer the size of buffer.
  129 + * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
  130 + * @remark when buffer changed, the previous ptr maybe invalid.
  131 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  132 + */
  133 + virtual void set_buffer(int buffer_size);
  134 +public:
  135 + /**
127 * read 1byte from buffer, move to next bytes. 136 * read 1byte from buffer, move to next bytes.
128 * @remark assert buffer already grow(1). 137 * @remark assert buffer already grow(1).
129 */ 138 */
@@ -750,6 +750,11 @@ void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler) @@ -750,6 +750,11 @@ void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler)
750 { 750 {
751 protocol->set_merge_read(v, handler); 751 protocol->set_merge_read(v, handler);
752 } 752 }
  753 +
  754 +void SrsRtmpServer::set_recv_buffer(int buffer_size)
  755 +{
  756 + protocol->set_recv_buffer(buffer_size);
  757 +}
753 #endif 758 #endif
754 759
755 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) 760 void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
@@ -354,6 +354,14 @@ public: @@ -354,6 +354,14 @@ public:
354 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 354 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
355 */ 355 */
356 virtual void set_merge_read(bool v, IMergeReadHandler* handler); 356 virtual void set_merge_read(bool v, IMergeReadHandler* handler);
  357 + /**
  358 + * create buffer with specifeid size.
  359 + * @param buffer the size of buffer.
  360 + * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
  361 + * @remark when buffer changed, the previous ptr maybe invalid.
  362 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  363 + */
  364 + virtual void set_recv_buffer(int buffer_size);
357 #endif 365 #endif
358 /** 366 /**
359 * set/get the recv timeout in us. 367 * set/get the recv timeout in us.
@@ -504,6 +504,11 @@ void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler) @@ -504,6 +504,11 @@ void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler)
504 { 504 {
505 in_buffer->set_merge_read(v, handler); 505 in_buffer->set_merge_read(v, handler);
506 } 506 }
  507 +
  508 +void SrsProtocol::set_recv_buffer(int buffer_size)
  509 +{
  510 + in_buffer->set_buffer(buffer_size);
  511 +}
507 #endif 512 #endif
508 513
509 void SrsProtocol::set_recv_timeout(int64_t timeout_us) 514 void SrsProtocol::set_recv_timeout(int64_t timeout_us)
@@ -257,10 +257,6 @@ private: @@ -257,10 +257,6 @@ private:
257 */ 257 */
258 int32_t out_chunk_size; 258 int32_t out_chunk_size;
259 public: 259 public:
260 - /**  
261 - * use io to create the protocol stack,  
262 - * @param io, provides io interfaces, user must free it.  
263 - */  
264 SrsProtocol(ISrsProtocolReaderWriter* io); 260 SrsProtocol(ISrsProtocolReaderWriter* io);
265 virtual ~SrsProtocol(); 261 virtual ~SrsProtocol();
266 public: 262 public:
@@ -288,6 +284,14 @@ public: @@ -288,6 +284,14 @@ public:
288 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 284 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
289 */ 285 */
290 virtual void set_merge_read(bool v, IMergeReadHandler* handler); 286 virtual void set_merge_read(bool v, IMergeReadHandler* handler);
  287 + /**
  288 + * create buffer with specifeid size.
  289 + * @param buffer the size of buffer.
  290 + * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
  291 + * @remark when buffer changed, the previous ptr maybe invalid.
  292 + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
  293 + */
  294 + virtual void set_recv_buffer(int buffer_size);
291 #endif 295 #endif
292 public: 296 public:
293 /** 297 /**