winlin

add kafka config

@@ -231,6 +231,18 @@ stream_caster { @@ -231,6 +231,18 @@ stream_caster {
231 } 231 }
232 232
233 ############################################################################################# 233 #############################################################################################
  234 +# Kafka sections
  235 +#############################################################################################
  236 +# Apache Kafka is a high-throughput distributed messaging system.
  237 +# SRS is a Kafka producer to send message to kafka.
  238 +# @see https://kafka.apache.org/documentation.html#introduction
  239 +kafka {
  240 + # whether enabled kafka.
  241 + # default: off
  242 + enabled off;
  243 +}
  244 +
  245 +#############################################################################################
234 # RTMP/HTTP VHOST sections 246 # RTMP/HTTP VHOST sections
235 ############################################################################################# 247 #############################################################################################
236 # vhost list, the __defaultVhost__ is the default vhost 248 # vhost list, the __defaultVhost__ is the default vhost
@@ -3488,7 +3488,7 @@ int SrsConfig::check_config() @@ -3488,7 +3488,7 @@ int SrsConfig::check_config()
3488 && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file" 3488 && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
3489 && n != "max_connections" && n != "daemon" && n != "heartbeat" 3489 && n != "max_connections" && n != "daemon" && n != "heartbeat"
3490 && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" 3490 && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
3491 - && n != "http_server" && n != "stream_caster" 3491 + && n != "http_server" && n != "stream_caster" && n != "kafka"
3492 && n != "utc_time" 3492 && n != "utc_time"
3493 ) { 3493 ) {
3494 ret = ERROR_SYSTEM_CONFIG_INVALID; 3494 ret = ERROR_SYSTEM_CONFIG_INVALID;
@@ -3531,6 +3531,17 @@ int SrsConfig::check_config() @@ -3531,6 +3531,17 @@ int SrsConfig::check_config()
3531 } 3531 }
3532 } 3532 }
3533 if (true) { 3533 if (true) {
  3534 + SrsConfDirective* conf = root->get("kafka");
  3535 + for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
  3536 + string n = conf->at(i)->name;
  3537 + if (n != "enabled") {
  3538 + ret = ERROR_SYSTEM_CONFIG_INVALID;
  3539 + srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret);
  3540 + return ret;
  3541 + }
  3542 + }
  3543 + }
  3544 + if (true) {
3534 SrsConfDirective* conf = get_heartbeart(); 3545 SrsConfDirective* conf = get_heartbeart();
3535 for (int i = 0; conf && i < (int)conf->directives.size(); i++) { 3546 for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
3536 string n = conf->at(i)->name; 3547 string n = conf->at(i)->name;
@@ -4243,6 +4254,23 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf) @@ -4243,6 +4254,23 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf)
4243 return ::atoi(conf->arg0().c_str()); 4254 return ::atoi(conf->arg0().c_str());
4244 } 4255 }
4245 4256
  4257 +bool SrsConfig::get_kafka_enabled()
  4258 +{
  4259 + static bool DEFAULT = false;
  4260 +
  4261 + SrsConfDirective* conf = root->get("kafka");
  4262 + if (!conf) {
  4263 + return DEFAULT;
  4264 + }
  4265 +
  4266 + conf = conf->get("enabled");
  4267 + if (!conf || conf->arg0().empty()) {
  4268 + return DEFAULT;
  4269 + }
  4270 +
  4271 + return SRS_CONF_PERFER_FALSE(conf->arg0());
  4272 +}
  4273 +
4246 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) 4274 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
4247 { 4275 {
4248 srs_assert(root); 4276 srs_assert(root);
@@ -628,6 +628,12 @@ public: @@ -628,6 +628,12 @@ public:
628 * get the max udp port for rtp of stream caster rtsp. 628 * get the max udp port for rtp of stream caster rtsp.
629 */ 629 */
630 virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf); 630 virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf);
  631 +// kafka section.
  632 +public:
  633 + /**
  634 + * whether the kafka enabled.
  635 + */
  636 + virtual bool get_kafka_enabled();
631 // vhost specified section 637 // vhost specified section
632 public: 638 public:
633 /** 639 /**