winlin

kafka refine comments.

@@ -99,6 +99,32 @@ std::string srs_kafka_summary_partitions(const vector<SrsKafkaPartition*>& parti @@ -99,6 +99,32 @@ std::string srs_kafka_summary_partitions(const vector<SrsKafkaPartition*>& parti
99 return srs_join_vector_string(ret, ", "); 99 return srs_join_vector_string(ret, ", ");
100 } 100 }
101 101
  102 +void srs_kafka_metadata2connector(SrsKafkaTopicMetadataResponse* metadata, vector<SrsKafkaPartition*>& partitions)
  103 +{
  104 + for (int i = 0; i < metadata->metadatas.size(); i++) {
  105 + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
  106 +
  107 + for (int j = 0; j < topic->metadatas.size(); j++) {
  108 + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
  109 +
  110 + SrsKafkaPartition* p = new SrsKafkaPartition();
  111 + p->id = partition->partition_id;
  112 + p->broker = partition->leader;
  113 +
  114 + for (int i = 0; i < metadata->brokers.size(); i++) {
  115 + SrsKafkaBroker* broker = metadata->brokers.at(i);
  116 + if (broker->node_id == p->broker) {
  117 + p->host = broker->host.to_str();
  118 + p->port = broker->port;
  119 + break;
  120 + }
  121 + }
  122 +
  123 + partitions.push_back(p);
  124 + }
  125 + }
  126 +}
  127 +
102 SrsKafkaPartition::SrsKafkaPartition() 128 SrsKafkaPartition::SrsKafkaPartition()
103 { 129 {
104 id = broker = 0; 130 id = broker = 0;
@@ -120,7 +146,7 @@ string SrsKafkaPartition::hostport() @@ -120,7 +146,7 @@ string SrsKafkaPartition::hostport()
120 146
121 SrsKafkaProducer::SrsKafkaProducer() 147 SrsKafkaProducer::SrsKafkaProducer()
122 { 148 {
123 - meatadata_ok = false; 149 + metadata_ok = false;
124 metadata_expired = st_cond_new(); 150 metadata_expired = st_cond_new();
125 151
126 lock = st_mutex_new(); 152 lock = st_mutex_new();
@@ -156,7 +182,6 @@ int SrsKafkaProducer::initialize() @@ -156,7 +182,6 @@ int SrsKafkaProducer::initialize()
156 { 182 {
157 int ret = ERROR_SUCCESS; 183 int ret = ERROR_SUCCESS;
158 184
159 - meatadata_ok = false;  
160 srs_info("initialize kafka producer ok."); 185 srs_info("initialize kafka producer ok.");
161 186
162 return ret; 187 return ret;
@@ -175,9 +200,7 @@ int SrsKafkaProducer::start() @@ -175,9 +200,7 @@ int SrsKafkaProducer::start()
175 srs_error("start kafka thread failed. ret=%d", ret); 200 srs_error("start kafka thread failed. ret=%d", ret);
176 } 201 }
177 202
178 - meatadata_ok = false;  
179 - st_cond_signal(metadata_expired);  
180 - srs_trace("kafka work in background"); 203 + refresh_metadata();
181 204
182 return ret; 205 return ret;
183 } 206 }
@@ -203,7 +226,7 @@ int SrsKafkaProducer::on_before_cycle() @@ -203,7 +226,7 @@ int SrsKafkaProducer::on_before_cycle()
203 { 226 {
204 // wait for the metadata expired. 227 // wait for the metadata expired.
205 // when metadata is ok, wait for it expired. 228 // when metadata is ok, wait for it expired.
206 - if (meatadata_ok) { 229 + if (metadata_ok) {
207 st_cond_wait(metadata_expired); 230 st_cond_wait(metadata_expired);
208 } 231 }
209 232
@@ -291,34 +314,20 @@ int SrsKafkaProducer::request_metadata() @@ -291,34 +314,20 @@ int SrsKafkaProducer::request_metadata()
291 srs_trace("kafka metadata: %s", summary.c_str()); 314 srs_trace("kafka metadata: %s", summary.c_str());
292 315
293 // generate the partition info. 316 // generate the partition info.
294 - for (int i = 0; i < metadata->metadatas.size(); i++) {  
295 - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);  
296 -  
297 - for (int j = 0; j < topic->metadatas.size(); j++) {  
298 - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);  
299 -  
300 - SrsKafkaPartition* p = new SrsKafkaPartition();  
301 - p->id = partition->partition_id;  
302 - p->broker = partition->leader;  
303 -  
304 - for (int i = 0; i < metadata->brokers.size(); i++) {  
305 - SrsKafkaBroker* broker = metadata->brokers.at(i);  
306 - if (broker->node_id == p->broker) {  
307 - p->host = broker->host.to_str();  
308 - p->port = broker->port;  
309 - break;  
310 - }  
311 - }  
312 -  
313 - partitions.push_back(p);  
314 - }  
315 - } 317 + srs_kafka_metadata2connector(metadata, partitions);
316 srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); 318 srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str());
317 319
318 - meatadata_ok = true; 320 + metadata_ok = true;
319 321
320 return ret; 322 return ret;
321 } 323 }
322 324
  325 +void SrsKafkaProducer::refresh_metadata()
  326 +{
  327 + metadata_ok = false;
  328 + st_cond_signal(metadata_expired);
  329 + srs_trace("kafka async refresh metadata in background");
  330 +}
  331 +
323 #endif 332 #endif
324 333
@@ -69,7 +69,7 @@ private: @@ -69,7 +69,7 @@ private:
69 st_mutex_t lock; 69 st_mutex_t lock;
70 SrsReusableThread* pthread; 70 SrsReusableThread* pthread;
71 private: 71 private:
72 - bool meatadata_ok; 72 + bool metadata_ok;
73 st_cond_t metadata_expired; 73 st_cond_t metadata_expired;
74 public: 74 public:
75 std::vector<SrsKafkaPartition*> partitions; 75 std::vector<SrsKafkaPartition*> partitions;
@@ -93,6 +93,8 @@ public: @@ -93,6 +93,8 @@ public:
93 private: 93 private:
94 virtual int do_cycle(); 94 virtual int do_cycle();
95 virtual int request_metadata(); 95 virtual int request_metadata();
  96 + // set the metadata to invalid and refresh it.
  97 + virtual void refresh_metadata();
96 }; 98 };
97 99
98 #endif 100 #endif
@@ -1010,7 +1010,7 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) @@ -1010,7 +1010,7 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg)
1010 // fetch cached api key. 1010 // fetch cached api key.
1011 SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); 1011 SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance();
1012 SrsKafkaApiKey key = pool->unset(header.correlation_id()); 1012 SrsKafkaApiKey key = pool->unset(header.correlation_id());
1013 - srs_trace("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id()); 1013 + srs_info("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id());
1014 1014
1015 // create message by cached api key. 1015 // create message by cached api key.
1016 SrsKafkaResponse* res = NULL; 1016 SrsKafkaResponse* res = NULL;