winlin

kafka refine code

@@ -281,28 +281,32 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio @@ -281,28 +281,32 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio
281 // ensure the key exists. 281 // ensure the key exists.
282 srs_assert (cache.find(key) != cache.end()); 282 srs_assert (cache.find(key) != cache.end());
283 283
  284 + // the cache is vector, which is continous store.
  285 + // we remember the messages we have written and clear it when completed.
  286 + int nb_msgs = (int)pc->size();
  287 + if (pc->empty()) {
  288 + return ret;
  289 + }
  290 +
284 // connect transport. 291 // connect transport.
285 if ((ret = partition->connect()) != ERROR_SUCCESS) { 292 if ((ret = partition->connect()) != ERROR_SUCCESS) {
286 srs_error("connect to partition failed. ret=%d", ret); 293 srs_error("connect to partition failed. ret=%d", ret);
287 return ret; 294 return ret;
288 } 295 }
289 296
290 - // copy the messages to a temp cache.  
291 - SrsKafkaPartitionCache tpc(*pc);  
292 -  
293 // TODO: FIXME: implements it. 297 // TODO: FIXME: implements it.
294 298
295 // free all wrote messages. 299 // free all wrote messages.
296 - for (vector<SrsJsonObject*>::iterator it = tpc.begin(); it != tpc.end(); ++it) { 300 + for (vector<SrsJsonObject*>::iterator it = pc->begin(); it != pc->end(); ++it) {
297 SrsJsonObject* obj = *it; 301 SrsJsonObject* obj = *it;
298 srs_freep(obj); 302 srs_freep(obj);
299 } 303 }
300 304
301 // remove the messages from cache. 305 // remove the messages from cache.
302 - if (pc->size() == tpc.size()) { 306 + if (pc->size() == nb_msgs) {
303 pc->clear(); 307 pc->clear();
304 } else { 308 } else {
305 - pc->erase(pc->begin(), pc->begin() + tpc.size()); 309 + pc->erase(pc->begin(), pc->begin() + nb_msgs);
306 } 310 }
307 311
308 return ret; 312 return ret;