winlin

refine the fast buffer. 2.0.144

@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 143 34 +#define VERSION_REVISION 144
35 35
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_rtmp_buffer.hpp> 24 #include <srs_rtmp_buffer.hpp>
25 25
  26 +#include <stdlib.h>
  27 +
26 #include <srs_kernel_error.hpp> 28 #include <srs_kernel_error.hpp>
27 #include <srs_kernel_log.hpp> 29 #include <srs_kernel_log.hpp>
28 #include <srs_kernel_utility.hpp> 30 #include <srs_kernel_utility.hpp>
@@ -58,10 +60,16 @@ SrsFastBuffer::SrsFastBuffer() @@ -58,10 +60,16 @@ SrsFastBuffer::SrsFastBuffer()
58 #endif 60 #endif
59 61
60 nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE; 62 nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE;
61 - buffer = new char[nb_buffer]; 63 + buffer = (char*)malloc(nb_buffer);
62 p = end = buffer; 64 p = end = buffer;
63 } 65 }
64 66
  67 +SrsFastBuffer::~SrsFastBuffer()
  68 +{
  69 + free(buffer);
  70 + buffer = NULL;
  71 +}
  72 +
65 int SrsFastBuffer::size() 73 int SrsFastBuffer::size()
66 { 74 {
67 return end - p; 75 return end - p;
@@ -75,33 +83,23 @@ char* SrsFastBuffer::bytes() @@ -75,33 +83,23 @@ char* SrsFastBuffer::bytes()
75 void SrsFastBuffer::set_buffer(int buffer_size) 83 void SrsFastBuffer::set_buffer(int buffer_size)
76 { 84 {
77 // the user-space buffer size limit to a max value. 85 // the user-space buffer size limit to a max value.
78 - int nb_max_buf = srs_min(buffer_size, SRS_MAX_SOCKET_BUFFER);  
79 - if (nb_max_buf < buffer_size) {  
80 - srs_warn("limit the user-space buffer from %d to %d", buffer_size, nb_max_buf); 86 + int nb_resize_buf = srs_min(buffer_size, SRS_MAX_SOCKET_BUFFER);
  87 + if (buffer_size > SRS_MAX_SOCKET_BUFFER) {
  88 + srs_warn("limit the user-space buffer from %d to %d", buffer_size, SRS_MAX_SOCKET_BUFFER);
81 } 89 }
82 90
83 // only realloc when buffer changed bigger 91 // only realloc when buffer changed bigger
84 - if (nb_max_buf <= nb_buffer) { 92 + if (nb_resize_buf <= nb_buffer) {
85 return; 93 return;
86 } 94 }
87 95
88 int start = p - buffer; 96 int start = p - buffer;
89 - int cap = end - p;  
90 -  
91 - char* buf = new char[nb_max_buf];  
92 - if (cap > 0) {  
93 - memcpy(buf, buffer, nb_buffer);  
94 - }  
95 - srs_freep(buffer); 97 + int nb_bytes = end - p;
96 98
97 - buffer = buf; 99 + buffer = (char*)realloc(buffer, nb_resize_buf);
  100 + nb_buffer = nb_resize_buf;
98 p = buffer + start; 101 p = buffer + start;
99 - end = p + cap;  
100 -}  
101 -  
102 -SrsFastBuffer::~SrsFastBuffer()  
103 -{  
104 - srs_freep(buffer); 102 + end = p + nb_bytes;
105 } 103 }
106 104
107 char SrsFastBuffer::read_1byte() 105 char SrsFastBuffer::read_1byte()
@@ -117,12 +115,6 @@ char* SrsFastBuffer::read_slice(int size) @@ -117,12 +115,6 @@ char* SrsFastBuffer::read_slice(int size)
117 115
118 char* ptr = p; 116 char* ptr = p;
119 p += size; 117 p += size;
120 -  
121 - // reset when consumed all.  
122 - if (p == end) {  
123 - p = end = buffer;  
124 - srs_verbose("all consumed, reset fast buffer");  
125 - }  
126 118
127 return ptr; 119 return ptr;
128 } 120 }
@@ -138,7 +130,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -138,7 +130,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
138 { 130 {
139 int ret = ERROR_SUCCESS; 131 int ret = ERROR_SUCCESS;
140 132
141 - // generally the required size is ok. 133 + // already got required size of bytes.
142 if (end - p >= required_size) { 134 if (end - p >= required_size) {
143 return ret; 135 return ret;
144 } 136 }
@@ -146,31 +138,42 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -146,31 +138,42 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
146 // must be positive. 138 // must be positive.
147 srs_assert(required_size > 0); 139 srs_assert(required_size > 0);
148 140
149 - // when read payload or there is no space to read,  
150 - // reset the buffer with exists bytes.  
151 - int max_to_read = buffer + nb_buffer - end;  
152 - if (required_size > SRS_RTMP_MAX_MESSAGE_HEADER || max_to_read < required_size) {  
153 - int nb_cap = end - p;  
154 - srs_verbose("move fast buffer %d bytes", nb_cap);  
155 - if (nb_cap < nb_buffer) {  
156 - buffer = (char*)memmove(buffer, p, nb_cap); 141 + // the free space of buffer,
  142 + // buffer = consumed_bytes + exists_bytes + free_space.
  143 + int nb_free_space = buffer + nb_buffer - end;
  144 + // resize the space when no left space.
  145 + if (nb_free_space < required_size) {
  146 + // the bytes already in buffer
  147 + int nb_exists_bytes = end - p;
  148 + srs_assert(nb_exists_bytes >= 0);
  149 + srs_verbose("move fast buffer %d bytes", nb_exists_bytes);
  150 +
  151 + if (!nb_exists_bytes) {
  152 + // reset when buffer is empty.
  153 + p = end = buffer;
  154 + srs_verbose("all consumed, reset fast buffer");
  155 + } else {
  156 + // move the left bytes to start of buffer.
  157 + srs_assert(nb_exists_bytes < nb_buffer);
  158 + buffer = (char*)memmove(buffer, p, nb_exists_bytes);
157 p = buffer; 159 p = buffer;
158 - end = p + nb_cap; 160 + end = p + nb_exists_bytes;
159 } 161 }
160 } 162 }
161 163
162 - // directly check the available bytes to read in buffer.  
163 - max_to_read = buffer + nb_buffer - end;  
164 - if (max_to_read < required_size) { 164 + // check whether enough free space in buffer.
  165 + nb_free_space = buffer + nb_buffer - end;
  166 + if (nb_free_space < required_size) {
165 ret = ERROR_READER_BUFFER_OVERFLOW; 167 ret = ERROR_READER_BUFFER_OVERFLOW;
166 - srs_error("buffer overflow, required=%d, max=%d, ret=%d", required_size, nb_buffer, ret); 168 + srs_error("buffer overflow, required=%d, max=%d, left=%d, ret=%d",
  169 + required_size, nb_buffer, nb_free_space, ret);
167 return ret; 170 return ret;
168 } 171 }
169 172
170 // buffer is ok, read required size of bytes. 173 // buffer is ok, read required size of bytes.
171 while (end - p < required_size) { 174 while (end - p < required_size) {
172 ssize_t nread; 175 ssize_t nread;
173 - if ((ret = reader->read(end, max_to_read, &nread)) != ERROR_SUCCESS) { 176 + if ((ret = reader->read(end, nb_free_space, &nread)) != ERROR_SUCCESS) {
174 return ret; 177 return ret;
175 } 178 }
176 179
@@ -189,7 +192,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -189,7 +192,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
189 // we just move the ptr to next. 192 // we just move the ptr to next.
190 srs_assert((int)nread > 0); 193 srs_assert((int)nread > 0);
191 end += nread; 194 end += nread;
192 - max_to_read -= nread; 195 + nb_free_space -= nread;
193 } 196 }
194 197
195 return ret; 198 return ret;
@@ -59,6 +59,12 @@ public: @@ -59,6 +59,12 @@ public:
59 /** 59 /**
60 * the buffer provices bytes cache for protocol. generally, 60 * the buffer provices bytes cache for protocol. generally,
61 * protocol recv data from socket, put into buffer, decode to RTMP message. 61 * protocol recv data from socket, put into buffer, decode to RTMP message.
  62 +* Usage:
  63 +* ISrsBufferReader* r = ......;
  64 +* SrsFastBuffer* fb = ......;
  65 +* fb->grow(r, 1024);
  66 +* char* header = fb->read_slice(100);
  67 +* char* payload = fb->read_payload(924);
62 */ 68 */
63 // TODO: FIXME: add utest for it. 69 // TODO: FIXME: add utest for it.
64 class SrsFastBuffer 70 class SrsFastBuffer
@@ -79,7 +85,7 @@ private: @@ -79,7 +85,7 @@ private:
79 // ptr to the buffer. 85 // ptr to the buffer.
80 // buffer <= p <= end <= buffer+nb_buffer 86 // buffer <= p <= end <= buffer+nb_buffer
81 char* buffer; 87 char* buffer;
82 - // the max size of buffer. 88 + // the size of buffer.
83 int nb_buffer; 89 int nb_buffer;
84 public: 90 public:
85 SrsFastBuffer(); 91 SrsFastBuffer();
@@ -97,7 +103,7 @@ public: @@ -97,7 +103,7 @@ public:
97 virtual char* bytes(); 103 virtual char* bytes();
98 /** 104 /**
99 * create buffer with specifeid size. 105 * create buffer with specifeid size.
100 - * @param buffer the size of buffer. 106 + * @param buffer the size of buffer. ignore when smaller than SRS_MAX_SOCKET_BUFFER.
101 * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K. 107 * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
102 * @remark when buffer changed, the previous ptr maybe invalid. 108 * @remark when buffer changed, the previous ptr maybe invalid.
103 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 109 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
@@ -112,8 +118,8 @@ public: @@ -112,8 +118,8 @@ public:
112 /** 118 /**
113 * read a slice in size bytes, move to next bytes. 119 * read a slice in size bytes, move to next bytes.
114 * user can use this char* ptr directly, and should never free it. 120 * user can use this char* ptr directly, and should never free it.
115 - * @remark assert buffer already grow(size).  
116 - * @remark the ptr returned maybe invalid after grow(x). 121 + * @remark user can use the returned ptr util grow(size),
  122 + * for the ptr returned maybe invalid after grow(x).
117 */ 123 */
118 virtual char* read_slice(int size); 124 virtual char* read_slice(int size);
119 /** 125 /**