winlin

retry when metadata empty

@@ -459,7 +459,8 @@ int SrsKafkaProducer::request_metadata() @@ -459,7 +459,8 @@ int SrsKafkaProducer::request_metadata()
459 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); 459 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
460 } 460 }
461 461
462 - // connect to kafka server. 462 + // reconnect to kafka server.
  463 + transport->close();
463 if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { 464 if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) {
464 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); 465 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
465 return ret; 466 return ret;
@@ -473,6 +474,16 @@ int SrsKafkaProducer::request_metadata() @@ -473,6 +474,16 @@ int SrsKafkaProducer::request_metadata()
473 } 474 }
474 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); 475 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
475 476
  477 + // we may need to request multiple times.
  478 + // for example, the first time to create a none-exists topic, then query metadata.
  479 + if (!metadata->metadatas.empty()) {
  480 + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0);
  481 + if (topic->metadatas.empty()) {
  482 + srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str());
  483 + return ret;
  484 + }
  485 + }
  486 +
476 // show kafka metadata. 487 // show kafka metadata.
477 string summary = srs_kafka_metadata_summary(metadata); 488 string summary = srs_kafka_metadata_summary(metadata);
478 srs_trace("kafka metadata: %s", summary.c_str()); 489 srs_trace("kafka metadata: %s", summary.c_str());
@@ -152,6 +152,10 @@ public: @@ -152,6 +152,10 @@ public:
152 { 152 {
153 return length; 153 return length;
154 } 154 }
  155 + virtual bool empty()
  156 + {
  157 + return elems.empty();
  158 + }
155 virtual T* at(int index) 159 virtual T* at(int index)
156 { 160 {
157 return elems.at(index); 161 return elems.at(index);
@@ -241,6 +245,10 @@ public: @@ -241,6 +245,10 @@ public:
241 { 245 {
242 return length; 246 return length;
243 } 247 }
  248 + virtual bool empty()
  249 + {
  250 + return elems.empty();
  251 + }
244 virtual int32_t at(int index) 252 virtual int32_t at(int index)
245 { 253 {
246 return elems.at(index); 254 return elems.at(index);