winlin

for bug #251, add queue fast vector. 2.0.66

@@ -156,6 +156,93 @@ int SrsRtmpJitter::get_time() @@ -156,6 +156,93 @@ int SrsRtmpJitter::get_time()
156 return (int)last_pkt_correct_time; 156 return (int)last_pkt_correct_time;
157 } 157 }
158 158
  159 +#ifdef SRS_PERF_QUEUE_FAST_VECTOR
  160 +SrsFastVector::SrsFastVector()
  161 +{
  162 + count = 0;
  163 + nb_msgs = SRS_PERF_MW_MSGS * 2;
  164 + msgs = new SrsSharedPtrMessage*[nb_msgs];
  165 +}
  166 +
  167 +SrsFastVector::~SrsFastVector()
  168 +{
  169 + free();
  170 +}
  171 +
  172 +int SrsFastVector::size()
  173 +{
  174 + return count;
  175 +}
  176 +
  177 +int SrsFastVector::begin()
  178 +{
  179 + return 0;
  180 +}
  181 +
  182 +int SrsFastVector::end()
  183 +{
  184 + return count;
  185 +}
  186 +
  187 +SrsSharedPtrMessage** SrsFastVector::data()
  188 +{
  189 + return msgs;
  190 +}
  191 +
  192 +SrsSharedPtrMessage* SrsFastVector::at(int index)
  193 +{
  194 + srs_assert(index < count);
  195 + return msgs[index];
  196 +}
  197 +
  198 +void SrsFastVector::clear()
  199 +{
  200 + count = 0;
  201 +}
  202 +
  203 +void SrsFastVector::erase(int _begin, int _end)
  204 +{
  205 + srs_assert(_begin < _end);
  206 +
  207 + // move all erased to previous.
  208 + for (int i = 0; i < count - _end; i++) {
  209 + msgs[_begin + i] = msgs[_end + i];
  210 + }
  211 +
  212 + // update the count.
  213 + count -= _end - _begin;
  214 +}
  215 +
  216 +void SrsFastVector::push_back(SrsSharedPtrMessage* msg)
  217 +{
  218 + // increase vector.
  219 + if (count >= nb_msgs) {
  220 + int size = nb_msgs * 2;
  221 + SrsSharedPtrMessage** buf = new SrsSharedPtrMessage*[size];
  222 + for (int i = 0; i < nb_msgs; i++) {
  223 + buf[i] = msgs[i];
  224 + }
  225 + srs_warn("fast vector incrase %d=>%d", nb_msgs, size);
  226 +
  227 + // use new array.
  228 + srs_freep(msgs);
  229 + msgs = buf;
  230 + nb_msgs = size;
  231 + }
  232 +
  233 + msgs[count++] = msg;
  234 +}
  235 +
  236 +void SrsFastVector::free()
  237 +{
  238 + for (int i = 0; i < count; i++) {
  239 + SrsSharedPtrMessage* msg = msgs[i];
  240 + srs_freep(msg);
  241 + }
  242 + count = 0;
  243 +}
  244 +#endif
  245 +
159 SrsMessageQueue::SrsMessageQueue() 246 SrsMessageQueue::SrsMessageQueue()
160 { 247 {
161 queue_size_ms = 0; 248 queue_size_ms = 0;
@@ -251,7 +338,7 @@ void SrsMessageQueue::shrink() @@ -251,7 +338,7 @@ void SrsMessageQueue::shrink()
251 // for when we shrinked, the first is the iframe, 338 // for when we shrinked, the first is the iframe,
252 // we will directly remove the gop next time. 339 // we will directly remove the gop next time.
253 for (int i = 1; i < (int)msgs.size(); i++) { 340 for (int i = 1; i < (int)msgs.size(); i++) {
254 - SrsSharedPtrMessage* msg = msgs[i]; 341 + SrsSharedPtrMessage* msg = msgs.at(i);
255 342
256 if (msg->is_video()) { 343 if (msg->is_video()) {
257 if (SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) { 344 if (SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) {
@@ -281,7 +368,7 @@ void SrsMessageQueue::shrink() @@ -281,7 +368,7 @@ void SrsMessageQueue::shrink()
281 368
282 // remove the first gop from the front 369 // remove the first gop from the front
283 for (int i = 0; i < iframe_index; i++) { 370 for (int i = 0; i < iframe_index; i++) {
284 - SrsSharedPtrMessage* msg = msgs[i]; 371 + SrsSharedPtrMessage* msg = msgs.at(i);
285 srs_freep(msg); 372 srs_freep(msg);
286 } 373 }
287 msgs.erase(msgs.begin(), msgs.begin() + iframe_index); 374 msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
@@ -289,12 +376,16 @@ void SrsMessageQueue::shrink() @@ -289,12 +376,16 @@ void SrsMessageQueue::shrink()
289 376
290 void SrsMessageQueue::clear() 377 void SrsMessageQueue::clear()
291 { 378 {
  379 +#ifndef SRS_PERF_QUEUE_FAST_VECTOR
292 std::vector<SrsSharedPtrMessage*>::iterator it; 380 std::vector<SrsSharedPtrMessage*>::iterator it;
293 381
294 for (it = msgs.begin(); it != msgs.end(); ++it) { 382 for (it = msgs.begin(); it != msgs.end(); ++it) {
295 SrsSharedPtrMessage* msg = *it; 383 SrsSharedPtrMessage* msg = *it;
296 srs_freep(msg); 384 srs_freep(msg);
297 } 385 }
  386 +#else
  387 + msgs.free();
  388 +#endif
298 389
299 msgs.clear(); 390 msgs.clear();
300 391
@@ -102,6 +102,34 @@ public: @@ -102,6 +102,34 @@ public:
102 virtual int get_time(); 102 virtual int get_time();
103 }; 103 };
104 104
  105 +#ifdef SRS_PERF_QUEUE_FAST_VECTOR
  106 +/**
  107 +* to alloc and increase fixed space,
  108 +* fast remove and insert for msgs sender.
  109 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  110 +*/
  111 +class SrsFastVector
  112 +{
  113 +private:
  114 + SrsSharedPtrMessage** msgs;
  115 + int nb_msgs;
  116 + int count;
  117 +public:
  118 + SrsFastVector();
  119 + virtual ~SrsFastVector();
  120 +public:
  121 + virtual int size();
  122 + virtual int begin();
  123 + virtual int end();
  124 + virtual SrsSharedPtrMessage** data();
  125 + virtual SrsSharedPtrMessage* at(int index);
  126 + virtual void clear();
  127 + virtual void erase(int _begin, int _end);
  128 + virtual void push_back(SrsSharedPtrMessage* msg);
  129 + virtual void free();
  130 +};
  131 +#endif
  132 +
105 /** 133 /**
106 * the message queue for the consumer(client), forwarder. 134 * the message queue for the consumer(client), forwarder.
107 * we limit the size in seconds, drop old messages(the whole gop) if full. 135 * we limit the size in seconds, drop old messages(the whole gop) if full.
@@ -112,7 +140,11 @@ private: @@ -112,7 +140,11 @@ private:
112 int64_t av_start_time; 140 int64_t av_start_time;
113 int64_t av_end_time; 141 int64_t av_end_time;
114 int queue_size_ms; 142 int queue_size_ms;
  143 +#ifdef SRS_PERF_QUEUE_FAST_VECTOR
  144 + SrsFastVector msgs;
  145 +#else
115 std::vector<SrsSharedPtrMessage*> msgs; 146 std::vector<SrsSharedPtrMessage*> msgs;
  147 +#endif
116 public: 148 public:
117 SrsMessageQueue(); 149 SrsMessageQueue();
118 virtual ~SrsMessageQueue(); 150 virtual ~SrsMessageQueue();
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 65 34 +#define VERSION_REVISION 66
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"
@@ -135,6 +135,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -135,6 +135,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
135 */ 135 */
136 #undef SRS_PERF_QUEUE_FAST_CACHE 136 #undef SRS_PERF_QUEUE_FAST_CACHE
137 /** 137 /**
  138 +* whether enable the fast vector for qeueue.
  139 +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
  140 +*/
  141 +#undef SRS_PERF_QUEUE_FAST_VECTOR
  142 +/**
138 * whether use cond wait to send messages. 143 * whether use cond wait to send messages.
139 * @remark this improve performance for large connectios. 144 * @remark this improve performance for large connectios.
140 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251 145 * @see https://github.com/winlinvip/simple-rtmp-server/issues/251