winlin

support publish_1stpkt_timeout and publish_normal_timeout

@@ -883,6 +883,12 @@ vhost stream.control.com { @@ -883,6 +883,12 @@ vhost stream.control.com {
883 # while the sequence header is not changed yet. 883 # while the sequence header is not changed yet.
884 # default: off 884 # default: off
885 reduce_sequence_header on; 885 reduce_sequence_header on;
  886 + # the 1st packet timeout in ms for encoder.
  887 + # default: 20000
  888 + publish_1stpkt_timeout 20000;
  889 + # the normal packet timeout in ms for encoder.
  890 + # default: 5000
  891 + publish_normal_timeout 7000;
886 } 892 }
887 893
888 # the vhost for antisuck. 894 # the vhost for antisuck.
@@ -768,6 +768,28 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) @@ -768,6 +768,28 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
768 } 768 }
769 srs_trace("vhost %s reload smi success.", vhost.c_str()); 769 srs_trace("vhost %s reload smi success.", vhost.c_str());
770 } 770 }
  771 + // publish_1stpkt_timeout, only one per vhost
  772 + if (!srs_directive_equals(new_vhost->get("publish_1stpkt_timeout"), old_vhost->get("publish_1stpkt_timeout"))) {
  773 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  774 + ISrsReloadHandler* subscribe = *it;
  775 + if ((ret = subscribe->on_reload_vhost_p1stpt(vhost)) != ERROR_SUCCESS) {
  776 + srs_error("vhost %s notify subscribes p1stpt failed. ret=%d", vhost.c_str(), ret);
  777 + return ret;
  778 + }
  779 + }
  780 + srs_trace("vhost %s reload p1stpt success.", vhost.c_str());
  781 + }
  782 + // publish_normal_timeout, only one per vhost
  783 + if (!srs_directive_equals(new_vhost->get("publish_normal_timeout"), old_vhost->get("publish_normal_timeout"))) {
  784 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  785 + ISrsReloadHandler* subscribe = *it;
  786 + if ((ret = subscribe->on_reload_vhost_pnt(vhost)) != ERROR_SUCCESS) {
  787 + srs_error("vhost %s notify subscribes pnt failed. ret=%d", vhost.c_str(), ret);
  788 + return ret;
  789 + }
  790 + }
  791 + srs_trace("vhost %s reload pnt success.", vhost.c_str());
  792 + }
771 // min_latency, only one per vhost 793 // min_latency, only one per vhost
772 if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) { 794 if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) {
773 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 795 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@@ -1763,6 +1785,7 @@ int SrsConfig::check_config() @@ -1763,6 +1785,7 @@ int SrsConfig::check_config()
1763 && n != "debug_srs_upnode" 1785 && n != "debug_srs_upnode"
1764 && n != "mr" && n != "mw_latency" && n != "min_latency" 1786 && n != "mr" && n != "mw_latency" && n != "min_latency"
1765 && n != "tcp_nodelay" && n != "send_min_interval" && n != "reduce_sequence_header" 1787 && n != "tcp_nodelay" && n != "send_min_interval" && n != "reduce_sequence_header"
  1788 + && n != "publish_1stpkt_timeout" && n != "publish_normal_timeout"
1766 && n != "security" && n != "http_remux" 1789 && n != "security" && n != "http_remux"
1767 && n != "http" && n != "http_static" 1790 && n != "http" && n != "http_static"
1768 && n != "hds" 1791 && n != "hds"
@@ -2552,6 +2575,44 @@ bool SrsConfig::get_reduce_sequence_header(string vhost) @@ -2552,6 +2575,44 @@ bool SrsConfig::get_reduce_sequence_header(string vhost)
2552 return SRS_CONF_PERFER_FALSE(conf->arg0()); 2575 return SRS_CONF_PERFER_FALSE(conf->arg0());
2553 } 2576 }
2554 2577
  2578 +int SrsConfig::get_publish_1stpkt_timeout(string vhost)
  2579 +{
  2580 + // when no msg recevied for publisher, use larger timeout.
  2581 + static int DEFAULT = 20000;
  2582 +
  2583 + SrsConfDirective* conf = get_vhost(vhost);
  2584 + if (!conf) {
  2585 + return DEFAULT;
  2586 + }
  2587 +
  2588 + conf = conf->get("publish_1stpkt_timeout");
  2589 + if (!conf || conf->arg0().empty()) {
  2590 + return DEFAULT;
  2591 + }
  2592 +
  2593 + return ::atoi(conf->arg0().c_str());
  2594 +}
  2595 +
  2596 +int SrsConfig::get_publish_normal_timeout(string vhost)
  2597 +{
  2598 + // the timeout for publish recv.
  2599 + // we must use more smaller timeout, for the recv never know the status
  2600 + // of underlayer socket.
  2601 + static int DEFAULT = 7000;
  2602 +
  2603 + SrsConfDirective* conf = get_vhost(vhost);
  2604 + if (!conf) {
  2605 + return DEFAULT;
  2606 + }
  2607 +
  2608 + conf = conf->get("get_publish_normal_timeout");
  2609 + if (!conf || conf->arg0().empty()) {
  2610 + return DEFAULT;
  2611 + }
  2612 +
  2613 + return ::atoi(conf->arg0().c_str());
  2614 +}
  2615 +
2555 int SrsConfig::get_global_chunk_size() 2616 int SrsConfig::get_global_chunk_size()
2556 { 2617 {
2557 SrsConfDirective* conf = root->get("chunk_size"); 2618 SrsConfDirective* conf = root->get("chunk_size");
@@ -534,6 +534,14 @@ public: @@ -534,6 +534,14 @@ public:
534 * whether reduce the sequence header. 534 * whether reduce the sequence header.
535 */ 535 */
536 virtual bool get_reduce_sequence_header(std::string vhost); 536 virtual bool get_reduce_sequence_header(std::string vhost);
  537 + /**
  538 + * the 1st packet timeout in ms for encoder.
  539 + */
  540 + virtual int get_publish_1stpkt_timeout(std::string vhost);
  541 + /**
  542 + * the normal packet timeout in ms for encoder.
  543 + */
  544 + virtual int get_publish_normal_timeout(std::string vhost);
537 private: 545 private:
538 /** 546 /**
539 * get the global chunk size. 547 * get the global chunk size.
@@ -180,6 +180,16 @@ int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/) @@ -180,6 +180,16 @@ int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/)
180 return ERROR_SUCCESS; 180 return ERROR_SUCCESS;
181 } 181 }
182 182
  183 +int ISrsReloadHandler::on_reload_vhost_p1stpt(string /*vhost*/)
  184 +{
  185 + return ERROR_SUCCESS;
  186 +}
  187 +
  188 +int ISrsReloadHandler::on_reload_vhost_pnt(string /*vhost*/)
  189 +{
  190 + return ERROR_SUCCESS;
  191 +}
  192 +
183 int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/) 193 int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/)
184 { 194 {
185 return ERROR_SUCCESS; 195 return ERROR_SUCCESS;
@@ -75,6 +75,8 @@ public: @@ -75,6 +75,8 @@ public:
75 virtual int on_reload_vhost_mw(std::string vhost); 75 virtual int on_reload_vhost_mw(std::string vhost);
76 virtual int on_reload_vhost_smi(std::string vhost); 76 virtual int on_reload_vhost_smi(std::string vhost);
77 virtual int on_reload_vhost_realtime(std::string vhost); 77 virtual int on_reload_vhost_realtime(std::string vhost);
  78 + virtual int on_reload_vhost_p1stpt(std::string vhost);
  79 + virtual int on_reload_vhost_pnt(std::string vhost);
78 virtual int on_reload_vhost_chunk_size(std::string vhost); 80 virtual int on_reload_vhost_chunk_size(std::string vhost);
79 virtual int on_reload_vhost_transcode(std::string vhost); 81 virtual int on_reload_vhost_transcode(std::string vhost);
80 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); 82 virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
@@ -274,9 +274,45 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost) @@ -274,9 +274,45 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
274 } 274 }
275 275
276 bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost); 276 bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost);
277 - srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);  
278 - realtime = realtime_enabled; 277 + if (realtime_enabled != realtime) {
  278 + srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
  279 + realtime = realtime_enabled;
  280 + }
  281 +
  282 + return ret;
  283 +}
  284 +
  285 +int SrsRtmpConn::on_reload_vhost_p1stpt(string vhost)
  286 +{
  287 + int ret = ERROR_SUCCESS;
  288 +
  289 + if (req->vhost != vhost) {
  290 + return ret;
  291 + }
  292 +
  293 + int p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost);
  294 + if (p1stpt != publish_1stpkt_timeout) {
  295 + srs_trace("p1stpt changed %d=>%d", publish_1stpkt_timeout, p1stpt);
  296 + publish_1stpkt_timeout = p1stpt;
  297 + }
  298 +
  299 + return ret;
  300 +}
279 301
  302 +int SrsRtmpConn::on_reload_vhost_pnt(string vhost)
  303 +{
  304 + int ret = ERROR_SUCCESS;
  305 +
  306 + if (req->vhost != vhost) {
  307 + return ret;
  308 + }
  309 +
  310 + int pnt = _srs_config->get_publish_normal_timeout(req->vhost);
  311 + if (pnt != publish_normal_timeout) {
  312 + srs_trace("p1stpt changed %d=>%d", publish_normal_timeout, pnt);
  313 + publish_normal_timeout = pnt;
  314 + }
  315 +
280 return ret; 316 return ret;
281 } 317 }
282 318
@@ -803,6 +839,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -803,6 +839,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
803 839
804 // set the sock options. 840 // set the sock options.
805 set_sock_options(); 841 set_sock_options();
  842 +
  843 + if (true) {
  844 + bool mr = _srs_config->get_mr_enabled(req->vhost);
  845 + int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
  846 + publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
  847 + publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
  848 + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d", mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout);
  849 + }
806 850
807 int64_t nb_msgs = 0; 851 int64_t nb_msgs = 0;
808 while (!disposed) { 852 while (!disposed) {
@@ -819,9 +863,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -819,9 +863,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
819 if (nb_msgs == 0) { 863 if (nb_msgs == 0) {
820 // when not got msgs, wait for a larger timeout. 864 // when not got msgs, wait for a larger timeout.
821 // @see https://github.com/simple-rtmp-server/srs/issues/441 865 // @see https://github.com/simple-rtmp-server/srs/issues/441
822 - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000); 866 + trd->wait(publish_1stpkt_timeout);
823 } else { 867 } else {
824 - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); 868 + trd->wait(publish_normal_timeout);
825 } 869 }
826 870
827 // check the thread error code. 871 // check the thread error code.
@@ -847,10 +891,10 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -847,10 +891,10 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
847 bool mr = _srs_config->get_mr_enabled(req->vhost); 891 bool mr = _srs_config->get_mr_enabled(req->vhost);
848 int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); 892 int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
849 srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH 893 srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
850 - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pprint->age(), 894 + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", pprint->age(),
851 kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), 895 kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
852 kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), 896 kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
853 - mr, mr_sleep 897 + mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout
854 ); 898 );
855 } 899 }
856 } 900 }
@@ -87,6 +87,10 @@ private: @@ -87,6 +87,10 @@ private:
87 bool realtime; 87 bool realtime;
88 // the minimal interval in ms for delivery stream. 88 // the minimal interval in ms for delivery stream.
89 double send_min_interval; 89 double send_min_interval;
  90 + // publish 1st packet timeout in ms
  91 + int publish_1stpkt_timeout;
  92 + // publish normal packet timeout in ms
  93 + int publish_normal_timeout;
90 public: 94 public:
91 SrsRtmpConn(SrsServer* svr, st_netfd_t c); 95 SrsRtmpConn(SrsServer* svr, st_netfd_t c);
92 virtual ~SrsRtmpConn(); 96 virtual ~SrsRtmpConn();
@@ -100,6 +104,8 @@ public: @@ -100,6 +104,8 @@ public:
100 virtual int on_reload_vhost_mw(std::string vhost); 104 virtual int on_reload_vhost_mw(std::string vhost);
101 virtual int on_reload_vhost_smi(std::string vhost); 105 virtual int on_reload_vhost_smi(std::string vhost);
102 virtual int on_reload_vhost_realtime(std::string vhost); 106 virtual int on_reload_vhost_realtime(std::string vhost);
  107 + virtual int on_reload_vhost_p1stpt(std::string vhost);
  108 + virtual int on_reload_vhost_pnt(std::string vhost);
103 // interface IKbpsDelta 109 // interface IKbpsDelta
104 public: 110 public:
105 virtual void resample(); 111 virtual void resample();
@@ -74,12 +74,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -74,12 +74,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
74 // the timeout to wait client data, 74 // the timeout to wait client data,
75 // if timeout, close the connection. 75 // if timeout, close the connection.
76 #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL) 76 #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL)
77 -// the timeout for publish recv.  
78 -// we must use more smaller timeout, for the recv never know the status  
79 -// of underlayer socket.  
80 -#define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL)  
81 -// when no msg recevied for publisher, use larger timeout.  
82 -#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US  
83 77
84 // the timeout to wait for client control message, 78 // the timeout to wait for client control message,
85 // if timeout, we generally ignore and send the data to client, 79 // if timeout, we generally ignore and send the data to client,