winlin

kafka use temp transport to fetch metadata.

... ... @@ -275,8 +275,6 @@ SrsKafkaProducer::SrsKafkaProducer()
cache = new SrsKafkaCache();
lb = new SrsLbRoundRobin();
transport = new SrsTcpClient();
kafka = new SrsKafkaClient(transport);
}
SrsKafkaProducer::~SrsKafkaProducer()
... ... @@ -284,8 +282,6 @@ SrsKafkaProducer::~SrsKafkaProducer()
clear_metadata();
srs_freep(lb);
srs_freep(kafka);
srs_freep(transport);
srs_freep(worker);
srs_freep(pthread);
... ... @@ -443,6 +439,12 @@ int SrsKafkaProducer::request_metadata()
return ret;
}
SrsTcpClient* transport = new SrsTcpClient();
SrsAutoFree(SrsTcpClient, transport);
SrsKafkaClient* kafka = new SrsKafkaClient(transport);
SrsAutoFree(SrsKafkaClient, kafka);
std::string server;
int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
if (true) {
... ... @@ -460,7 +462,6 @@ int SrsKafkaProducer::request_metadata()
}
// reconnect to kafka server.
transport->close();
if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) {
srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
return ret;
... ...
... ... @@ -164,8 +164,6 @@ public:
private:
SrsLbRoundRobin* lb;
SrsAsyncCallWorker* worker;
SrsTcpClient* transport;
SrsKafkaClient* kafka;
public:
SrsKafkaProducer();
virtual ~SrsKafkaProducer();
... ...