winlin

convert metadata to partitions

@@ -41,6 +41,83 @@ using namespace std; @@ -41,6 +41,83 @@ using namespace std;
41 41
42 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 42 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000
43 43
  44 +std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
  45 +{
  46 + vector<string> bs;
  47 + for (int i = 0; i < metadata->brokers.size(); i++) {
  48 + SrsKafkaBroker* broker = metadata->brokers.at(i);
  49 +
  50 + string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str();
  51 + if (broker->port > 0) {
  52 + hostport += ":" + srs_int2str(broker->port);
  53 + }
  54 +
  55 + bs.push_back(hostport);
  56 + }
  57 +
  58 + vector<string> ps;
  59 + for (int i = 0; i < metadata->metadatas.size(); i++) {
  60 + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
  61 +
  62 + for (int j = 0; j < topic->metadatas.size(); j++) {
  63 + string desc = "topic=" + topic->name.to_str();
  64 +
  65 + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
  66 +
  67 + desc += "?partition=" + srs_int2str(partition->partition_id);
  68 + desc += "&leader=" + srs_int2str(partition->leader);
  69 +
  70 + vector<string> replicas = srs_kafka_array2vector(&partition->replicas);
  71 + desc += "&replicas=" + srs_join_vector_string(replicas, ",");
  72 +
  73 + ps.push_back(desc);
  74 + }
  75 + }
  76 +
  77 + std::stringstream ss;
  78 + ss << "brokers=" << srs_join_vector_string(bs, ",");
  79 + ss << ", " << srs_join_vector_string(ps, ", ");
  80 +
  81 + return ss.str();
  82 +}
  83 +
  84 +std::string srs_kafka_summary_partitions(const vector<SrsKafkaPartition*>& partitions)
  85 +{
  86 + vector<string> ret;
  87 +
  88 + vector<SrsKafkaPartition*>::const_iterator it;
  89 + for (it = partitions.begin(); it != partitions.end(); ++it) {
  90 + SrsKafkaPartition* partition = *it;
  91 +
  92 + string desc = "tcp://";
  93 + desc += partition->host + ":" + srs_int2str(partition->port);
  94 + desc += "?broker=" + srs_int2str(partition->broker);
  95 + desc += "&partition=" + srs_int2str(partition->id);
  96 + ret.push_back(desc);
  97 + }
  98 +
  99 + return srs_join_vector_string(ret, ", ");
  100 +}
  101 +
  102 +SrsKafkaPartition::SrsKafkaPartition()
  103 +{
  104 + id = broker = 0;
  105 + port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
  106 +}
  107 +
  108 +SrsKafkaPartition::~SrsKafkaPartition()
  109 +{
  110 +}
  111 +
  112 +string SrsKafkaPartition::hostport()
  113 +{
  114 + if (ep.empty()) {
  115 + ep = host + ":" + srs_int2str(port);
  116 + }
  117 +
  118 + return ep;
  119 +}
  120 +
44 SrsKafkaProducer::SrsKafkaProducer() 121 SrsKafkaProducer::SrsKafkaProducer()
45 { 122 {
46 meatadata_ok = false; 123 meatadata_ok = false;
@@ -57,6 +134,13 @@ SrsKafkaProducer::SrsKafkaProducer() @@ -57,6 +134,13 @@ SrsKafkaProducer::SrsKafkaProducer()
57 134
58 SrsKafkaProducer::~SrsKafkaProducer() 135 SrsKafkaProducer::~SrsKafkaProducer()
59 { 136 {
  137 + vector<SrsKafkaPartition*>::iterator it;
  138 + for (it = partitions.begin(); it != partitions.end(); ++it) {
  139 + SrsKafkaPartition* partition = *it;
  140 + srs_freep(partition);
  141 + }
  142 + partitions.clear();
  143 +
60 srs_freep(lb); 144 srs_freep(lb);
61 srs_freep(kafka); 145 srs_freep(kafka);
62 srs_freep(transport); 146 srs_freep(transport);
@@ -203,46 +287,33 @@ int SrsKafkaProducer::request_metadata() @@ -203,46 +287,33 @@ int SrsKafkaProducer::request_metadata()
203 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); 287 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
204 288
205 // show kafka metadata. 289 // show kafka metadata.
206 - string summary;  
207 - if (true) {  
208 - vector<string> bs;  
209 - for (int i = 0; i < metadata->brokers.size(); i++) {  
210 - SrsKafkaBroker* broker = metadata->brokers.at(i);  
211 -  
212 - string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str();  
213 - if (broker->port > 0) {  
214 - hostport += ":" + srs_int2str(broker->port);  
215 - }  
216 -  
217 - bs.push_back(hostport);  
218 - } 290 + string summary = srs_kafka_metadata_summary(metadata);
  291 + srs_trace("kafka metadata: %s", summary.c_str());
  292 +
  293 + // generate the partition info.
  294 + for (int i = 0; i < metadata->metadatas.size(); i++) {
  295 + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
219 296
220 - vector<string> ps;  
221 - for (int i = 0; i < metadata->metadatas.size(); i++) {  
222 - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); 297 + for (int j = 0; j < topic->metadatas.size(); j++) {
  298 + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
223 299
224 - string desc = "topic=" + topic->name.to_str(); 300 + SrsKafkaPartition* p = new SrsKafkaPartition();
  301 + p->id = partition->partition_id;
  302 + p->broker = partition->leader;
225 303
226 - for (int j = 0; j < topic->metadatas.size(); j++) {  
227 - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);  
228 -  
229 - desc += ", partition" + srs_int2str(partition->partition_id) +"=";  
230 - desc += srs_int2str(partition->leader) + "/";  
231 -  
232 - vector<string> replicas = srs_kafka_array2vector(&partition->replicas);  
233 - desc += srs_join_vector_string(replicas, ","); 304 + for (int i = 0; i < metadata->brokers.size(); i++) {
  305 + SrsKafkaBroker* broker = metadata->brokers.at(i);
  306 + if (broker->node_id == p->broker) {
  307 + p->host = broker->host.to_str();
  308 + p->port = broker->port;
  309 + break;
  310 + }
234 } 311 }
235 312
236 - ps.push_back(desc); 313 + partitions.push_back(p);
237 } 314 }
238 -  
239 - std::stringstream ss;  
240 - ss << "brokers=" << srs_join_vector_string(bs, ",");  
241 - ss << ", " << srs_join_vector_string(ps, ",");  
242 -  
243 - summary = ss.str();  
244 } 315 }
245 - srs_trace("kafka metadata: %s", summary.c_str()); 316 + srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str());
246 317
247 meatadata_ok = true; 318 meatadata_ok = true;
248 319
@@ -29,6 +29,8 @@ @@ -29,6 +29,8 @@
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
  32 +#include <vector>
  33 +
32 class SrsLbRoundRobin; 34 class SrsLbRoundRobin;
33 class SrsAsyncCallWorker; 35 class SrsAsyncCallWorker;
34 class SrsTcpClient; 36 class SrsTcpClient;
@@ -39,6 +41,26 @@ class SrsKafkaClient; @@ -39,6 +41,26 @@ class SrsKafkaClient;
39 #ifdef SRS_AUTO_KAFKA 41 #ifdef SRS_AUTO_KAFKA
40 42
41 /** 43 /**
  44 + * the kafka partition info.
  45 + */
  46 +struct SrsKafkaPartition
  47 +{
  48 +private:
  49 + std::string ep;
  50 +public:
  51 + int id;
  52 + // leader.
  53 + int broker;
  54 + std::string host;
  55 + int port;
  56 +public:
  57 + SrsKafkaPartition();
  58 + virtual ~SrsKafkaPartition();
  59 +public:
  60 + virtual std::string hostport();
  61 +};
  62 +
  63 +/**
42 * the kafka producer used to save log to kafka cluster. 64 * the kafka producer used to save log to kafka cluster.
43 */ 65 */
44 class SrsKafkaProducer : public ISrsReusableThreadHandler 66 class SrsKafkaProducer : public ISrsReusableThreadHandler
@@ -49,6 +71,8 @@ private: @@ -49,6 +71,8 @@ private:
49 private: 71 private:
50 bool meatadata_ok; 72 bool meatadata_ok;
51 st_cond_t metadata_expired; 73 st_cond_t metadata_expired;
  74 +public:
  75 + std::vector<SrsKafkaPartition*> partitions;
52 private: 76 private:
53 SrsLbRoundRobin* lb; 77 SrsLbRoundRobin* lb;
54 SrsAsyncCallWorker* worker; 78 SrsAsyncCallWorker* worker;