winlin

kakfa erase messages when wrote.

@@ -41,7 +41,8 @@ using namespace std; @@ -41,7 +41,8 @@ using namespace std;
41 #ifdef SRS_AUTO_KAFKA 41 #ifdef SRS_AUTO_KAFKA
42 42
43 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 43 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000
44 -#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 10 44 +#define SRS_KAFKA_PRODUCER_TIMEOUT 30000
  45 +#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1
45 46
46 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) 47 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
47 { 48 {
@@ -131,10 +132,15 @@ SrsKafkaPartition::SrsKafkaPartition() @@ -131,10 +132,15 @@ SrsKafkaPartition::SrsKafkaPartition()
131 { 132 {
132 id = broker = 0; 133 id = broker = 0;
133 port = SRS_CONSTS_KAFKA_DEFAULT_PORT; 134 port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
  135 +
  136 + transport = new SrsTcpClient();
  137 + kafka = new SrsKafkaClient(transport);
134 } 138 }
135 139
136 SrsKafkaPartition::~SrsKafkaPartition() 140 SrsKafkaPartition::~SrsKafkaPartition()
137 { 141 {
  142 + srs_freep(kafka);
  143 + srs_freep(transport);
138 } 144 }
139 145
140 string SrsKafkaPartition::hostport() 146 string SrsKafkaPartition::hostport()
@@ -146,6 +152,25 @@ string SrsKafkaPartition::hostport() @@ -146,6 +152,25 @@ string SrsKafkaPartition::hostport()
146 return ep; 152 return ep;
147 } 153 }
148 154
  155 +int SrsKafkaPartition::connect()
  156 +{
  157 + int ret = ERROR_SUCCESS;
  158 +
  159 + if (transport->connected()) {
  160 + return ret;
  161 + }
  162 +
  163 + int64_t timeout = SRS_KAFKA_PRODUCER_TIMEOUT * 1000;
  164 + if ((ret = transport->connect(host, port, timeout)) != ERROR_SUCCESS) {
  165 + srs_error("connect to %s partition=%d failed, timeout=%"PRId64". ret=%d", hostport().c_str(), id, timeout, ret);
  166 + return ret;
  167 + }
  168 +
  169 + srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker);
  170 +
  171 + return ret;
  172 +}
  173 +
149 SrsKafkaMessage::SrsKafkaMessage(int k) 174 SrsKafkaMessage::SrsKafkaMessage(int k)
150 { 175 {
151 key = k; 176 key = k;
@@ -252,7 +277,34 @@ bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc) @@ -252,7 +277,34 @@ bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc)
252 int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) 277 int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
253 { 278 {
254 int ret = ERROR_SUCCESS; 279 int ret = ERROR_SUCCESS;
  280 +
  281 + // ensure the key exists.
  282 + srs_assert (cache.find(key) != cache.end());
  283 +
  284 + // connect transport.
  285 + if ((ret = partition->connect()) != ERROR_SUCCESS) {
  286 + srs_error("connect to partition failed. ret=%d", ret);
  287 + return ret;
  288 + }
  289 +
  290 + // copy the messages to a temp cache.
  291 + SrsKafkaPartitionCache tpc(*pc);
  292 +
255 // TODO: FIXME: implements it. 293 // TODO: FIXME: implements it.
  294 +
  295 + // free all wrote messages.
  296 + for (vector<SrsJsonObject*>::iterator it = tpc.begin(); it != tpc.end(); ++it) {
  297 + SrsJsonObject* obj = *it;
  298 + srs_freep(obj);
  299 + }
  300 +
  301 + // remove the messages from cache.
  302 + if (pc->size() == tpc.size()) {
  303 + pc->clear();
  304 + } else {
  305 + pc->erase(pc->begin(), pc->begin() + tpc.size());
  306 + }
  307 +
256 return ret; 308 return ret;
257 } 309 }
258 310
@@ -516,7 +568,7 @@ int SrsKafkaProducer::flush() @@ -516,7 +568,7 @@ int SrsKafkaProducer::flush()
516 568
517 // flush all available partition caches. 569 // flush all available partition caches.
518 while (true) { 570 while (true) {
519 - int key = 0; 571 + int key = -1;
520 SrsKafkaPartitionCache* pc = NULL; 572 SrsKafkaPartitionCache* pc = NULL;
521 573
522 // all flushed, or no kafka partition to write to. 574 // all flushed, or no kafka partition to write to.
@@ -525,7 +577,7 @@ int SrsKafkaProducer::flush() @@ -525,7 +577,7 @@ int SrsKafkaProducer::flush()
525 } 577 }
526 578
527 // flush specified partition. 579 // flush specified partition.
528 - srs_assert(key && pc); 580 + srs_assert(key >= 0 && pc);
529 SrsKafkaPartition* partition = partitions.at(key % partitions.size()); 581 SrsKafkaPartition* partition = partitions.at(key % partitions.size());
530 if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) { 582 if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) {
531 srs_error("flush partition failed. ret=%d", ret); 583 srs_error("flush partition failed. ret=%d", ret);
@@ -52,6 +52,8 @@ struct SrsKafkaPartition @@ -52,6 +52,8 @@ struct SrsKafkaPartition
52 { 52 {
53 private: 53 private:
54 std::string ep; 54 std::string ep;
  55 + SrsTcpClient* transport;
  56 + SrsKafkaClient* kafka;
55 public: 57 public:
56 int id; 58 int id;
57 // leader. 59 // leader.
@@ -63,6 +65,7 @@ public: @@ -63,6 +65,7 @@ public:
63 virtual ~SrsKafkaPartition(); 65 virtual ~SrsKafkaPartition();
64 public: 66 public:
65 virtual std::string hostport(); 67 virtual std::string hostport();
  68 + virtual int connect();
66 }; 69 };
67 70
68 /** 71 /**