winlin

for #320, support macro to disable the complex send algorithm and enable tcp no delay. 2.0.129

1 -#ifndef _srs_icpp_init_stub  
2 -#define _srs_icpp_init_stub 1 +#ifndef _srs_upp_icpp_init_stub
  2 +#define _srs_upp_icpp_init_stub
3 #endif 3 #endif
@@ -27,6 +27,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -27,6 +27,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 #include <sys/socket.h> 27 #include <sys/socket.h>
28 #include <netinet/in.h> 28 #include <netinet/in.h>
29 #include <arpa/inet.h> 29 #include <arpa/inet.h>
  30 +#include <netinet/tcp.h>
30 31
31 using namespace std; 32 using namespace std;
32 33
@@ -624,6 +625,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe @@ -624,6 +625,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
624 mw_enabled = true; 625 mw_enabled = true;
625 change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); 626 change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
626 627
  628 + // set the sock options.
  629 + play_set_sock_options();
  630 +
627 while (true) { 631 while (true) {
628 // collect elapse for pithy print. 632 // collect elapse for pithy print.
629 pprint->elapse(); 633 pprint->elapse();
@@ -1123,6 +1127,29 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) @@ -1123,6 +1127,29 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
1123 mw_sleep = sleep_ms; 1127 mw_sleep = sleep_ms;
1124 } 1128 }
1125 1129
  1130 +void SrsRtmpConn::play_set_sock_options()
  1131 +{
  1132 + int fd = st_netfd_fileno(stfd);
  1133 +
  1134 +#ifdef SRS_PERF_TCP_NODELAY
  1135 + if (true) {
  1136 + socklen_t nb_v = sizeof(int);
  1137 +
  1138 + int ov = 0;
  1139 + getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v);
  1140 +
  1141 + int v = 1;
  1142 + // set the socket send buffer when required larger buffer
  1143 + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, nb_v) < 0) {
  1144 + srs_warn("set sock TCP_NODELAY=%d failed.", v);
  1145 + }
  1146 + getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, &nb_v);
  1147 +
  1148 + srs_trace("set TCP_NODELAY %d=>%d", ov, v);
  1149 + }
  1150 +#endif
  1151 +}
  1152 +
1126 int SrsRtmpConn::check_edge_token_traverse_auth() 1153 int SrsRtmpConn::check_edge_token_traverse_auth()
1127 { 1154 {
1128 int ret = ERROR_SUCCESS; 1155 int ret = ERROR_SUCCESS;
@@ -111,6 +111,7 @@ private: @@ -111,6 +111,7 @@ private:
111 virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); 111 virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge);
112 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); 112 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
113 virtual void change_mw_sleep(int sleep_ms); 113 virtual void change_mw_sleep(int sleep_ms);
  114 + virtual void play_set_sock_options();
114 private: 115 private:
115 virtual int check_edge_token_traverse_auth(); 116 virtual int check_edge_token_traverse_auth();
116 virtual int connect_server(int origin_index, st_netfd_t* pstsock); 117 virtual int connect_server(int origin_index, st_netfd_t* pstsock);
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 128 34 +#define VERSION_REVISION 129
35 35
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"
@@ -108,7 +108,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -108,7 +108,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
108 * whether set the socket send buffer size. 108 * whether set the socket send buffer size.
109 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251 109 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251
110 */ 110 */
111 -#undef SRS_PERF_MW_SO_SNDBUF 111 +#define SRS_PERF_MW_SO_SNDBUF
  112 +
112 /** 113 /**
113 * whether set the socket recv buffer size. 114 * whether set the socket recv buffer size.
114 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251 115 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251
@@ -154,5 +155,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -154,5 +155,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
154 // in seconds, the live queue length. 155 // in seconds, the live queue length.
155 #define SRS_PERF_PLAY_QUEUE 30 156 #define SRS_PERF_PLAY_QUEUE 30
156 157
  158 +/**
  159 +* whether always use complex send algorithm.
  160 +* for some network does not support the complex send,
  161 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/320
  162 +*/
  163 +//#undef SRS_PERF_COMPLEX_SEND
  164 +#define SRS_PERF_COMPLEX_SEND
  165 +/**
  166 +* whether enable the TCP_NODELAY
  167 +* user maybe need send small tcp packet for some network.
  168 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/320
  169 +*/
  170 +//#define SRS_PERF_TCP_NODELAY
  171 +#undef SRS_PERF_TCP_NODELAY
  172 +
157 #endif 173 #endif
158 174
@@ -762,8 +762,9 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -762,8 +762,9 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
762 { 762 {
763 int ret = ERROR_SUCCESS; 763 int ret = ERROR_SUCCESS;
764 764
  765 +#ifdef SRS_PERF_COMPLEX_SEND
765 int iov_index = 0; 766 int iov_index = 0;
766 - iovec* iov = out_iovs + iov_index; 767 + iovec* iovs = out_iovs + iov_index;
767 768
768 int c0c3_cache_index = 0; 769 int c0c3_cache_index = 0;
769 char* c0c3_cache = out_c0c3_caches + c0c3_cache_index; 770 char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
@@ -796,13 +797,13 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -796,13 +797,13 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
796 srs_assert(nbh > 0); 797 srs_assert(nbh > 0);
797 798
798 // header iov 799 // header iov
799 - iov[0].iov_base = c0c3_cache;  
800 - iov[0].iov_len = nbh; 800 + iovs[0].iov_base = c0c3_cache;
  801 + iovs[0].iov_len = nbh;
801 802
802 // payload iov 803 // payload iov
803 int payload_size = srs_min(out_chunk_size, pend - p); 804 int payload_size = srs_min(out_chunk_size, pend - p);
804 - iov[1].iov_base = p;  
805 - iov[1].iov_len = payload_size; 805 + iovs[1].iov_base = p;
  806 + iovs[1].iov_len = payload_size;
806 807
807 // consume sendout bytes. 808 // consume sendout bytes.
808 p += payload_size; 809 p += payload_size;
@@ -822,7 +823,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -822,7 +823,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
822 823
823 // to next pair of iovs 824 // to next pair of iovs
824 iov_index += 2; 825 iov_index += 2;
825 - iov = out_iovs + iov_index; 826 + iovs = out_iovs + iov_index;
826 827
827 // to next c0c3 header cache 828 // to next c0c3 header cache
828 c0c3_cache_index += nbh; 829 c0c3_cache_index += nbh;
@@ -849,7 +850,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -849,7 +850,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
849 // reset caches, while these cache ensure 850 // reset caches, while these cache ensure
850 // atleast we can sendout a chunk. 851 // atleast we can sendout a chunk.
851 iov_index = 0; 852 iov_index = 0;
852 - iov = out_iovs + iov_index; 853 + iovs = out_iovs + iov_index;
853 854
854 c0c3_cache_index = 0; 855 c0c3_cache_index = 0;
855 c0c3_cache = out_c0c3_caches + c0c3_cache_index; 856 c0c3_cache = out_c0c3_caches + c0c3_cache_index;
@@ -866,6 +867,61 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) @@ -866,6 +867,61 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
866 nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); 867 nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
867 868
868 return do_iovs_send(out_iovs, iov_index); 869 return do_iovs_send(out_iovs, iov_index);
  870 +#else
  871 + // try to send use the c0c3 header cache,
  872 + // if cache is consumed, try another loop.
  873 + for (int i = 0; i < nb_msgs; i++) {
  874 + SrsSharedPtrMessage* msg = msgs[i];
  875 +
  876 + if (!msg) {
  877 + continue;
  878 + }
  879 +
  880 + // ignore empty message.
  881 + if (!msg->payload || msg->size <= 0) {
  882 + srs_info("ignore empty message.");
  883 + continue;
  884 + }
  885 +
  886 + // p set to current write position,
  887 + // it's ok when payload is NULL and size is 0.
  888 + char* p = msg->payload;
  889 + char* pend = msg->payload + msg->size;
  890 +
  891 + // always write the header event payload is empty.
  892 + while (p < pend) {
  893 + // for simple send, send each chunk one by one
  894 + iovec* iovs = out_iovs;
  895 + char* c0c3_cache = out_c0c3_caches;
  896 + int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX;
  897 +
  898 + // always has header
  899 + int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
  900 + srs_assert(nbh > 0);
  901 +
  902 + // header iov
  903 + iovs[0].iov_base = c0c3_cache;
  904 + iovs[0].iov_len = nbh;
  905 +
  906 + // payload iov
  907 + int payload_size = srs_min(out_chunk_size, pend - p);
  908 + iovs[1].iov_base = p;
  909 + iovs[1].iov_len = payload_size;
  910 +
  911 + // consume sendout bytes.
  912 + p += payload_size;
  913 +
  914 + if ((ret = skt->writev(iovs, 2, NULL)) != ERROR_SUCCESS) {
  915 + if (!srs_is_client_gracefully_close(ret)) {
  916 + srs_error("send packet with writev failed. ret=%d", ret);
  917 + }
  918 + return ret;
  919 + }
  920 + }
  921 + }
  922 +
  923 + return ret;
  924 +#endif
869 } 925 }
870 926
871 int SrsProtocol::do_iovs_send(iovec* iovs, int size) 927 int SrsProtocol::do_iovs_send(iovec* iovs, int size)