winlin

complete kafka

@@ -352,7 +352,8 @@ int SrsKafkaProducer::initialize() @@ -352,7 +352,8 @@ int SrsKafkaProducer::initialize()
352 { 352 {
353 int ret = ERROR_SUCCESS; 353 int ret = ERROR_SUCCESS;
354 354
355 - srs_info("initialize kafka producer ok."); 355 + enabled = _srs_config->get_kafka_enabled();
  356 + srs_info("initialize kafka ok, enabled=%d.", enabled);
356 357
357 return ret; 358 return ret;
358 } 359 }
@@ -415,7 +416,6 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) @@ -415,7 +416,6 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
415 { 416 {
416 int ret = ERROR_SUCCESS; 417 int ret = ERROR_SUCCESS;
417 418
418 - bool enabled = _srs_config->get_kafka_enabled();  
419 if (!enabled) { 419 if (!enabled) {
420 return ret; 420 return ret;
421 } 421 }
@@ -433,7 +433,6 @@ int SrsKafkaProducer::on_close(int key) @@ -433,7 +433,6 @@ int SrsKafkaProducer::on_close(int key)
433 { 433 {
434 int ret = ERROR_SUCCESS; 434 int ret = ERROR_SUCCESS;
435 435
436 - bool enabled = _srs_config->get_kafka_enabled();  
437 if (!enabled) { 436 if (!enabled) {
438 return ret; 437 return ret;
439 } 438 }
@@ -494,7 +493,6 @@ int SrsKafkaProducer::do_cycle() @@ -494,7 +493,6 @@ int SrsKafkaProducer::do_cycle()
494 int ret = ERROR_SUCCESS; 493 int ret = ERROR_SUCCESS;
495 494
496 // ignore when disabled. 495 // ignore when disabled.
497 - bool enabled = _srs_config->get_kafka_enabled();  
498 if (!enabled) { 496 if (!enabled) {
499 return ret; 497 return ret;
500 } 498 }
@@ -513,7 +511,6 @@ int SrsKafkaProducer::request_metadata() @@ -513,7 +511,6 @@ int SrsKafkaProducer::request_metadata()
513 int ret = ERROR_SUCCESS; 511 int ret = ERROR_SUCCESS;
514 512
515 // ignore when disabled. 513 // ignore when disabled.
516 - bool enabled = _srs_config->get_kafka_enabled();  
517 if (!enabled) { 514 if (!enabled) {
518 return ret; 515 return ret;
519 } 516 }
@@ -155,6 +155,8 @@ public: @@ -155,6 +155,8 @@ public:
155 class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster 155 class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster
156 { 156 {
157 private: 157 private:
  158 + // TODO: FIXME: support reload.
  159 + bool enabled;
158 st_mutex_t lock; 160 st_mutex_t lock;
159 SrsReusableThread* pthread; 161 SrsReusableThread* pthread;
160 private: 162 private: