winlin

connect to kafka and send metadata request.

@@ -242,8 +242,11 @@ kafka { @@ -242,8 +242,11 @@ kafka {
242 enabled off; 242 enabled off;
243 # the broker list, broker is <ip:port> 243 # the broker list, broker is <ip:port>
244 # and use space to specify multple brokers. 244 # and use space to specify multple brokers.
245 - # for exampl, 127.0.0.1:9092 127.0.0.1:9093 245 + # for example, 127.0.0.1:9092 127.0.0.1:9093
246 brokers 127.0.0.1:9092; 246 brokers 127.0.0.1:9092;
  247 + # the kafka topic to use.
  248 + # default: srs
  249 + topic srs;
247 } 250 }
248 251
249 ############################################################################################# 252 #############################################################################################
@@ -4298,6 +4298,23 @@ SrsConfDirective* SrsConfig::get_kafka_brokers() @@ -4298,6 +4298,23 @@ SrsConfDirective* SrsConfig::get_kafka_brokers()
4298 return conf; 4298 return conf;
4299 } 4299 }
4300 4300
  4301 +string SrsConfig::get_kafka_topic()
  4302 +{
  4303 + static string DEFAULT = "srs";
  4304 +
  4305 + SrsConfDirective* conf = root->get("kafka");
  4306 + if (!conf) {
  4307 + return DEFAULT;
  4308 + }
  4309 +
  4310 + conf = conf->get("topic");
  4311 + if (!conf || conf->arg0().empty()) {
  4312 + return DEFAULT;
  4313 + }
  4314 +
  4315 + return conf->arg0();
  4316 +}
  4317 +
4301 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) 4318 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
4302 { 4319 {
4303 srs_assert(root); 4320 srs_assert(root);
@@ -638,6 +638,10 @@ public: @@ -638,6 +638,10 @@ public:
638 * get the broker list, each is format in <ip:port>. 638 * get the broker list, each is format in <ip:port>.
639 */ 639 */
640 virtual SrsConfDirective* get_kafka_brokers(); 640 virtual SrsConfDirective* get_kafka_brokers();
  641 + /**
  642 + * get the kafka topic to use for srs.
  643 + */
  644 + virtual std::string get_kafka_topic();
641 // vhost specified section 645 // vhost specified section
642 public: 646 public:
643 /** 647 /**
@@ -33,6 +33,7 @@ using namespace std; @@ -33,6 +33,7 @@ using namespace std;
33 #include <srs_app_utility.hpp> 33 #include <srs_app_utility.hpp>
34 #include <srs_kernel_utility.hpp> 34 #include <srs_kernel_utility.hpp>
35 #include <srs_kernel_balance.hpp> 35 #include <srs_kernel_balance.hpp>
  36 +#include <srs_kafka_stack.hpp>
36 37
37 #ifdef SRS_AUTO_KAFKA 38 #ifdef SRS_AUTO_KAFKA
38 39
@@ -40,12 +41,16 @@ SrsKafkaProducer::SrsKafkaProducer() @@ -40,12 +41,16 @@ SrsKafkaProducer::SrsKafkaProducer()
40 { 41 {
41 lb = new SrsLbRoundRobin(); 42 lb = new SrsLbRoundRobin();
42 worker = new SrsAsyncCallWorker(); 43 worker = new SrsAsyncCallWorker();
  44 + transport = new SrsTcpClient();
  45 + kafka = new SrsKafkaClient(transport);
43 } 46 }
44 47
45 SrsKafkaProducer::~SrsKafkaProducer() 48 SrsKafkaProducer::~SrsKafkaProducer()
46 { 49 {
47 srs_freep(lb); 50 srs_freep(lb);
48 srs_freep(worker); 51 srs_freep(worker);
  52 + srs_freep(kafka);
  53 + srs_freep(transport);
49 } 54 }
50 55
51 int SrsKafkaProducer::initialize() 56 int SrsKafkaProducer::initialize()
@@ -86,25 +91,46 @@ int SrsKafkaProducer::request_metadata() @@ -86,25 +91,46 @@ int SrsKafkaProducer::request_metadata()
86 { 91 {
87 int ret = ERROR_SUCCESS; 92 int ret = ERROR_SUCCESS;
88 93
  94 + // ignore when disabled.
89 bool enabled = _srs_config->get_kafka_enabled(); 95 bool enabled = _srs_config->get_kafka_enabled();
90 if (!enabled) { 96 if (!enabled) {
91 return ret; 97 return ret;
92 } 98 }
93 99
  100 + // select one broker to connect to.
94 SrsConfDirective* brokers = _srs_config->get_kafka_brokers(); 101 SrsConfDirective* brokers = _srs_config->get_kafka_brokers();
95 if (!brokers) { 102 if (!brokers) {
96 srs_warn("ignore for empty brokers."); 103 srs_warn("ignore for empty brokers.");
97 return ret; 104 return ret;
98 } 105 }
99 106
  107 + std::string server;
  108 + int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
  109 + if (true) {
100 srs_assert(!brokers->args.empty()); 110 srs_assert(!brokers->args.empty());
101 std::string broker = lb->select(brokers->args); 111 std::string broker = lb->select(brokers->args);
  112 + srs_parse_endpoint(broker, server, port);
  113 + }
  114 +
  115 + // connect to kafka server.
  116 + if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) {
  117 + srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
  118 + return ret;
  119 + }
  120 +
  121 + // do fetch medata from broker.
  122 + std::string topic = _srs_config->get_kafka_topic();
  123 + if ((ret = kafka->fetch_metadata(topic)) != ERROR_SUCCESS) {
  124 + srs_error("kafka fetch metadata failed. ret=%d", ret);
  125 + return ret;
  126 + }
102 127
  128 + // log when completed.
103 if (true) { 129 if (true) {
104 std::string senabled = srs_bool2switch(enabled); 130 std::string senabled = srs_bool2switch(enabled);
105 std::string sbrokers = srs_join_vector_string(brokers->args, ","); 131 std::string sbrokers = srs_join_vector_string(brokers->args, ",");
106 - srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s",  
107 - senabled.c_str(), sbrokers.c_str(), lb->current(), broker.c_str()); 132 + srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s",
  133 + senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
108 } 134 }
109 135
110 return ret; 136 return ret;
@@ -31,6 +31,8 @@ @@ -31,6 +31,8 @@
31 31
32 class SrsLbRoundRobin; 32 class SrsLbRoundRobin;
33 class SrsAsyncCallWorker; 33 class SrsAsyncCallWorker;
  34 +class SrsTcpClient;
  35 +class SrsKafkaClient;
34 36
35 #ifdef SRS_AUTO_KAFKA 37 #ifdef SRS_AUTO_KAFKA
36 38
@@ -42,6 +44,8 @@ class SrsKafkaProducer @@ -42,6 +44,8 @@ class SrsKafkaProducer
42 private: 44 private:
43 SrsLbRoundRobin* lb; 45 SrsLbRoundRobin* lb;
44 SrsAsyncCallWorker* worker; 46 SrsAsyncCallWorker* worker;
  47 + SrsTcpClient* transport;
  48 + SrsKafkaClient* kafka;
45 public: 49 public:
46 SrsKafkaProducer(); 50 SrsKafkaProducer();
47 virtual ~SrsKafkaProducer(); 51 virtual ~SrsKafkaProducer();
@@ -398,5 +398,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -398,5 +398,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
398 #define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str "RTSP Version Not Supported" 398 #define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str "RTSP Version Not Supported"
399 #define SRS_CONSTS_RTSP_OptionNotSupported_str "Option not support" 399 #define SRS_CONSTS_RTSP_OptionNotSupported_str "Option not support"
400 400
  401 +///////////////////////////////////////////////////////////
  402 +// KAFKA consts values
  403 +///////////////////////////////////////////////////////////
  404 +#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092
  405 +
  406 +// the common io timeout, for both recv and send.
  407 +#define SRS_CONSTS_KAFKA_TIMEOUT_US (int64_t)(30*1000*1000LL)
  408 +
401 #endif 409 #endif
402 410
@@ -23,8 +23,11 @@ @@ -23,8 +23,11 @@
23 23
24 #include <srs_kafka_stack.hpp> 24 #include <srs_kafka_stack.hpp>
25 25
  26 +#include <string>
26 using namespace std; 27 using namespace std;
27 28
  29 +#include <srs_kernel_error.hpp>
  30 +
28 #ifdef SRS_AUTO_KAFKA 31 #ifdef SRS_AUTO_KAFKA
29 32
30 SrsKafkaString::SrsKafkaString() 33 SrsKafkaString::SrsKafkaString()
@@ -196,5 +199,42 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() @@ -196,5 +199,42 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest()
196 { 199 {
197 } 200 }
198 201
  202 +SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io)
  203 +{
  204 + skt = io;
  205 +}
  206 +
  207 +SrsKafkaProtocol::~SrsKafkaProtocol()
  208 +{
  209 +}
  210 +
  211 +int SrsKafkaProtocol::send_and_free_message(SrsKafkaMessage* msg)
  212 +{
  213 + int ret = ERROR_SUCCESS;
  214 +
  215 + // TODO: FIXME: implements it.
  216 +
  217 + return ret;
  218 +}
  219 +
  220 +SrsKafkaClient::SrsKafkaClient(ISrsProtocolReaderWriter* io)
  221 +{
  222 + protocol = new SrsKafkaProtocol(io);
  223 +}
  224 +
  225 +SrsKafkaClient::~SrsKafkaClient()
  226 +{
  227 + srs_freep(protocol);
  228 +}
  229 +
  230 +int SrsKafkaClient::fetch_metadata(string topic)
  231 +{
  232 + int ret = ERROR_SUCCESS;
  233 +
  234 + // TODO: FIXME: implements it.
  235 +
  236 + return ret;
  237 +}
  238 +
199 #endif 239 #endif
200 240
@@ -30,6 +30,9 @@ @@ -30,6 +30,9 @@
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
32 #include <vector> 32 #include <vector>
  33 +#include <string>
  34 +
  35 +class ISrsProtocolReaderWriter;
33 36
34 #ifdef SRS_AUTO_KAFKA 37 #ifdef SRS_AUTO_KAFKA
35 38
@@ -94,7 +97,7 @@ public: @@ -94,7 +97,7 @@ public:
94 * array of a structure foo as [foo]. 97 * array of a structure foo as [foo].
95 * 98 *
96 * Usage: 99 * Usage:
97 - * SrsKafkaArray<SrsKafkaBytes> body; 100 + * SrsKafkaArray<SrsKafkaBytes*> body;
98 * body.append(new SrsKafkaBytes()); 101 * body.append(new SrsKafkaBytes());
99 * 102 *
100 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests 103 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
@@ -104,8 +107,8 @@ class SrsKafkaArray @@ -104,8 +107,8 @@ class SrsKafkaArray
104 { 107 {
105 private: 108 private:
106 int length; 109 int length;
107 - std::vector<T*> elems;  
108 - typedef typename std::vector<T*>::iterator SrsIterator; 110 + std::vector<T> elems;
  111 + typedef typename std::vector<T>::iterator SrsIterator;
109 public: 112 public:
110 SrsKafkaArray() 113 SrsKafkaArray()
111 { 114 {
@@ -114,13 +117,13 @@ public: @@ -114,13 +117,13 @@ public:
114 virtual ~SrsKafkaArray() 117 virtual ~SrsKafkaArray()
115 { 118 {
116 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { 119 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
117 - T* elem = *it; 120 + T elem = *it;
118 srs_freep(elem); 121 srs_freep(elem);
119 } 122 }
120 elems.clear(); 123 elems.clear();
121 } 124 }
122 public: 125 public:
123 - virtual void append(T* elem) 126 + virtual void append(T elem)
124 { 127 {
125 length++; 128 length++;
126 elems.push_back(elem); 129 elems.push_back(elem);
@@ -307,12 +310,47 @@ class SrsKafkaTopicMetadataRequest @@ -307,12 +310,47 @@ class SrsKafkaTopicMetadataRequest
307 { 310 {
308 private: 311 private:
309 SrsKafkaRequestHeader header; 312 SrsKafkaRequestHeader header;
310 - SrsKafkaArray<SrsKafkaString> request; 313 + SrsKafkaArray<SrsKafkaString*> request;
311 public: 314 public:
312 SrsKafkaTopicMetadataRequest(); 315 SrsKafkaTopicMetadataRequest();
313 virtual ~SrsKafkaTopicMetadataRequest(); 316 virtual ~SrsKafkaTopicMetadataRequest();
314 }; 317 };
315 318
  319 +/**
  320 + * the kafka protocol stack, use to send and recv kakfa messages.
  321 + */
  322 +class SrsKafkaProtocol
  323 +{
  324 +private:
  325 + ISrsProtocolReaderWriter* skt;
  326 +public:
  327 + SrsKafkaProtocol(ISrsProtocolReaderWriter* io);
  328 + virtual ~SrsKafkaProtocol();
  329 +public:
  330 + /**
  331 + * write the message to kafka server.
  332 + * @param msg the msg to send. user must not free it again.
  333 + */
  334 + virtual int send_and_free_message(SrsKafkaMessage* msg);
  335 +};
  336 +
  337 +/**
  338 + * the kafka client, for producer or consumer.
  339 + */
  340 +class SrsKafkaClient
  341 +{
  342 +private:
  343 + SrsKafkaProtocol* protocol;
  344 +public:
  345 + SrsKafkaClient(ISrsProtocolReaderWriter* io);
  346 + virtual ~SrsKafkaClient();
  347 +public:
  348 + /**
  349 + * fetch the metadata from broker for topic.
  350 + */
  351 + virtual int fetch_metadata(std::string topic);
  352 +};
  353 +
316 #endif 354 #endif
317 355
318 #endif 356 #endif