winlin

add brokers to config

@@ -240,6 +240,10 @@ kafka { @@ -240,6 +240,10 @@ kafka {
240 # whether enabled kafka. 240 # whether enabled kafka.
241 # default: off 241 # default: off
242 enabled off; 242 enabled off;
  243 + # the broker list, broker is <ip:port>
  244 + # and use space to specify multple brokers.
  245 + # for exampl, 127.0.0.1:9092 127.0.0.1:9093
  246 + brokers 127.0.0.1:9092;
243 } 247 }
244 248
245 ############################################################################################# 249 #############################################################################################
@@ -2120,6 +2120,17 @@ int SrsConfig::global_to_json(SrsJsonObject* obj) @@ -2120,6 +2120,17 @@ int SrsConfig::global_to_json(SrsJsonObject* obj)
2120 } 2120 }
2121 } 2121 }
2122 obj->set(dir->name, sobj); 2122 obj->set(dir->name, sobj);
  2123 + } else if (dir->name == "kafka") {
  2124 + SrsJsonObject* sobj = SrsJsonAny::object();
  2125 + for (int j = 0; j < (int)dir->directives.size(); j++) {
  2126 + SrsConfDirective* sdir = dir->directives.at(j);
  2127 + if (sdir->name == "enabled") {
  2128 + sobj->set(sdir->name, sdir->dumps_arg0_to_boolean());
  2129 + } else if (sdir->name == "brokers") {
  2130 + sobj->set(sdir->name, sdir->dumps_args());
  2131 + }
  2132 + }
  2133 + obj->set(dir->name, sobj);
2123 } else if (dir->name == "stream_caster") { 2134 } else if (dir->name == "stream_caster") {
2124 SrsJsonObject* sobj = SrsJsonAny::object(); 2135 SrsJsonObject* sobj = SrsJsonAny::object();
2125 for (int j = 0; j < (int)dir->directives.size(); j++) { 2136 for (int j = 0; j < (int)dir->directives.size(); j++) {
@@ -3535,7 +3546,7 @@ int SrsConfig::check_config() @@ -3535,7 +3546,7 @@ int SrsConfig::check_config()
3535 SrsConfDirective* conf = root->get("kafka"); 3546 SrsConfDirective* conf = root->get("kafka");
3536 for (int i = 0; conf && i < (int)conf->directives.size(); i++) { 3547 for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
3537 string n = conf->at(i)->name; 3548 string n = conf->at(i)->name;
3538 - if (n != "enabled") { 3549 + if (n != "enabled" && n != "brokers") {
3539 ret = ERROR_SYSTEM_CONFIG_INVALID; 3550 ret = ERROR_SYSTEM_CONFIG_INVALID;
3540 srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret); 3551 srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret);
3541 return ret; 3552 return ret;
@@ -4272,6 +4283,21 @@ bool SrsConfig::get_kafka_enabled() @@ -4272,6 +4283,21 @@ bool SrsConfig::get_kafka_enabled()
4272 return SRS_CONF_PERFER_FALSE(conf->arg0()); 4283 return SRS_CONF_PERFER_FALSE(conf->arg0());
4273 } 4284 }
4274 4285
  4286 +SrsConfDirective* SrsConfig::get_kafka_brokers()
  4287 +{
  4288 + SrsConfDirective* conf = root->get("kafka");
  4289 + if (!conf) {
  4290 + return NULL;
  4291 + }
  4292 +
  4293 + conf->get("brokers");
  4294 + if (!conf || conf->args.empty()) {
  4295 + return NULL;
  4296 + }
  4297 +
  4298 + return conf;
  4299 +}
  4300 +
4275 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) 4301 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
4276 { 4302 {
4277 srs_assert(root); 4303 srs_assert(root);
@@ -634,6 +634,10 @@ public: @@ -634,6 +634,10 @@ public:
634 * whether the kafka enabled. 634 * whether the kafka enabled.
635 */ 635 */
636 virtual bool get_kafka_enabled(); 636 virtual bool get_kafka_enabled();
  637 + /**
  638 + * get the broker list, each is format in <ip:port>.
  639 + */
  640 + virtual SrsConfDirective* get_kafka_brokers();
637 // vhost specified section 641 // vhost specified section
638 public: 642 public:
639 /** 643 /**
@@ -27,6 +27,7 @@ @@ -27,6 +27,7 @@
27 #include <srs_kernel_log.hpp> 27 #include <srs_kernel_log.hpp>
28 #include <srs_app_config.hpp> 28 #include <srs_app_config.hpp>
29 #include <srs_app_async_call.hpp> 29 #include <srs_app_async_call.hpp>
  30 +#include <srs_app_utility.hpp>
30 31
31 #ifdef SRS_AUTO_KAFKA 32 #ifdef SRS_AUTO_KAFKA
32 33
@@ -64,7 +65,8 @@ int SrsKafkaProducer::start() @@ -64,7 +65,8 @@ int SrsKafkaProducer::start()
64 return ret; 65 return ret;
65 } 66 }
66 67
67 - srs_trace("kafka worker ok, enabled:%d", _srs_config->get_kafka_enabled()); 68 + std::string enabled = srs_bool2switch(_srs_config->get_kafka_enabled());
  69 + srs_trace("kafka worker ok, enabled:%s", enabled.c_str());
68 70
69 return ret; 71 return ret;
70 } 72 }