winlin

refine code for kakfa request/response, string/bytes.

@@ -27,6 +27,7 @@ @@ -27,6 +27,7 @@
27 using namespace std; 27 using namespace std;
28 28
29 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
  30 +#include <srs_core_autofree.hpp>
30 31
31 #ifdef SRS_AUTO_KAFKA 32 #ifdef SRS_AUTO_KAFKA
32 33
@@ -36,6 +37,15 @@ SrsKafkaString::SrsKafkaString() @@ -36,6 +37,15 @@ SrsKafkaString::SrsKafkaString()
36 data = NULL; 37 data = NULL;
37 } 38 }
38 39
  40 +SrsKafkaString::SrsKafkaString(string v)
  41 +{
  42 + size = (int16_t)v.length();
  43 +
  44 + srs_assert(size > 0);
  45 + data = new char[size];
  46 + memcpy(data, v.data(), size);
  47 +}
  48 +
39 SrsKafkaString::~SrsKafkaString() 49 SrsKafkaString::~SrsKafkaString()
40 { 50 {
41 srs_freep(data); 51 srs_freep(data);
@@ -62,6 +72,15 @@ SrsKafkaBytes::SrsKafkaBytes() @@ -62,6 +72,15 @@ SrsKafkaBytes::SrsKafkaBytes()
62 data = NULL; 72 data = NULL;
63 } 73 }
64 74
  75 +SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
  76 +{
  77 + size = (int16_t)nb_v;
  78 +
  79 + srs_assert(size > 0);
  80 + data = new char[size];
  81 + memcpy(data, v, size);
  82 +}
  83 +
65 SrsKafkaBytes::~SrsKafkaBytes() 84 SrsKafkaBytes::~SrsKafkaBytes()
66 { 85 {
67 srs_freep(data); 86 srs_freep(data);
@@ -175,7 +194,7 @@ int SrsKafkaResponseHeader::total_size() @@ -175,7 +194,7 @@ int SrsKafkaResponseHeader::total_size()
175 return 4 + size; 194 return 4 + size;
176 } 195 }
177 196
178 -SrsKafkaMessage::SrsKafkaMessage() 197 +SrsKafkaRawMessage::SrsKafkaRawMessage()
179 { 198 {
180 offset = 0; 199 offset = 0;
181 message_size = 0; 200 message_size = 0;
@@ -186,7 +205,7 @@ SrsKafkaMessage::SrsKafkaMessage() @@ -186,7 +205,7 @@ SrsKafkaMessage::SrsKafkaMessage()
186 value = new SrsKafkaBytes(); 205 value = new SrsKafkaBytes();
187 } 206 }
188 207
189 -SrsKafkaMessage::~SrsKafkaMessage() 208 +SrsKafkaRawMessage::~SrsKafkaRawMessage()
190 { 209 {
191 srs_freep(key); 210 srs_freep(key);
192 srs_freep(value); 211 srs_freep(value);
@@ -198,14 +217,30 @@ SrsKafkaMessageSet::SrsKafkaMessageSet() @@ -198,14 +217,30 @@ SrsKafkaMessageSet::SrsKafkaMessageSet()
198 217
199 SrsKafkaMessageSet::~SrsKafkaMessageSet() 218 SrsKafkaMessageSet::~SrsKafkaMessageSet()
200 { 219 {
201 - vector<SrsKafkaMessage*>::iterator it; 220 + vector<SrsKafkaRawMessage*>::iterator it;
202 for (it = messages.begin(); it != messages.end(); ++it) { 221 for (it = messages.begin(); it != messages.end(); ++it) {
203 - SrsKafkaMessage* message = *it; 222 + SrsKafkaRawMessage* message = *it;
204 srs_freep(message); 223 srs_freep(message);
205 } 224 }
206 messages.clear(); 225 messages.clear();
207 } 226 }
208 227
  228 +SrsKafkaRequest::SrsKafkaRequest()
  229 +{
  230 +}
  231 +
  232 +SrsKafkaRequest::~SrsKafkaRequest()
  233 +{
  234 +}
  235 +
  236 +SrsKafkaResponse::SrsKafkaResponse()
  237 +{
  238 +}
  239 +
  240 +SrsKafkaResponse::~SrsKafkaResponse()
  241 +{
  242 +}
  243 +
209 SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() 244 SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest()
210 { 245 {
211 header.set_api_key(SrsKafkaApiKeyMetadataRequest); 246 header.set_api_key(SrsKafkaApiKeyMetadataRequest);
@@ -215,6 +250,19 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() @@ -215,6 +250,19 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest()
215 { 250 {
216 } 251 }
217 252
  253 +void SrsKafkaTopicMetadataRequest::add_topic(string topic)
  254 +{
  255 + topics.append(new SrsKafkaString(topic));
  256 +}
  257 +
  258 +SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse()
  259 +{
  260 +}
  261 +
  262 +SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
  263 +{
  264 +}
  265 +
218 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) 266 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io)
219 { 267 {
220 skt = io; 268 skt = io;
@@ -224,7 +272,7 @@ SrsKafkaProtocol::~SrsKafkaProtocol() @@ -224,7 +272,7 @@ SrsKafkaProtocol::~SrsKafkaProtocol()
224 { 272 {
225 } 273 }
226 274
227 -int SrsKafkaProtocol::send_and_free_message(SrsKafkaMessage* msg) 275 +int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
228 { 276 {
229 int ret = ERROR_SUCCESS; 277 int ret = ERROR_SUCCESS;
230 278
@@ -247,6 +295,11 @@ int SrsKafkaClient::fetch_metadata(string topic) @@ -247,6 +295,11 @@ int SrsKafkaClient::fetch_metadata(string topic)
247 { 295 {
248 int ret = ERROR_SUCCESS; 296 int ret = ERROR_SUCCESS;
249 297
  298 + SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest();
  299 + SrsAutoFree(SrsKafkaTopicMetadataRequest, req);
  300 +
  301 + req->add_topic(topic);
  302 +
250 // TODO: FIXME: implements it. 303 // TODO: FIXME: implements it.
251 304
252 return ret; 305 return ret;
@@ -64,6 +64,7 @@ private: @@ -64,6 +64,7 @@ private:
64 char* data; 64 char* data;
65 public: 65 public:
66 SrsKafkaString(); 66 SrsKafkaString();
  67 + SrsKafkaString(std::string v);
67 virtual ~SrsKafkaString(); 68 virtual ~SrsKafkaString();
68 public: 69 public:
69 virtual bool null(); 70 virtual bool null();
@@ -83,6 +84,7 @@ private: @@ -83,6 +84,7 @@ private:
83 char* data; 84 char* data;
84 public: 85 public:
85 SrsKafkaBytes(); 86 SrsKafkaBytes();
  87 + SrsKafkaBytes(const char* v, int nb_v);
86 virtual ~SrsKafkaBytes(); 88 virtual ~SrsKafkaBytes();
87 public: 89 public:
88 virtual bool null(); 90 virtual bool null();
@@ -269,7 +271,7 @@ public: @@ -269,7 +271,7 @@ public:
269 * the kafka message in message set. 271 * the kafka message in message set.
270 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets 272 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
271 */ 273 */
272 -struct SrsKafkaMessage 274 +struct SrsKafkaRawMessage
273 { 275 {
274 // metadata. 276 // metadata.
275 public: 277 public:
@@ -313,8 +315,8 @@ public: @@ -313,8 +315,8 @@ public:
313 */ 315 */
314 SrsKafkaBytes* value; 316 SrsKafkaBytes* value;
315 public: 317 public:
316 - SrsKafkaMessage();  
317 - virtual ~SrsKafkaMessage(); 318 + SrsKafkaRawMessage();
  319 + virtual ~SrsKafkaRawMessage();
318 }; 320 };
319 321
320 /** 322 /**
@@ -324,13 +326,33 @@ public: @@ -324,13 +326,33 @@ public:
324 class SrsKafkaMessageSet 326 class SrsKafkaMessageSet
325 { 327 {
326 private: 328 private:
327 - std::vector<SrsKafkaMessage*> messages; 329 + std::vector<SrsKafkaRawMessage*> messages;
328 public: 330 public:
329 SrsKafkaMessageSet(); 331 SrsKafkaMessageSet();
330 virtual ~SrsKafkaMessageSet(); 332 virtual ~SrsKafkaMessageSet();
331 }; 333 };
332 334
333 /** 335 /**
  336 + * the kafka request message, for protocol to send.
  337 + */
  338 +class SrsKafkaRequest
  339 +{
  340 +public:
  341 + SrsKafkaRequest();
  342 + virtual ~SrsKafkaRequest();
  343 +};
  344 +
  345 +/**
  346 + * the kafka response message, for protocol to recv.
  347 + */
  348 +class SrsKafkaResponse
  349 +{
  350 +public:
  351 + SrsKafkaResponse();
  352 + virtual ~SrsKafkaResponse();
  353 +};
  354 +
  355 +/**
334 * request the metadata from broker. 356 * request the metadata from broker.
335 * This API answers the following questions: 357 * This API answers the following questions:
336 * What topics exist? 358 * What topics exist?
@@ -344,20 +366,30 @@ public: @@ -344,20 +366,30 @@ public:
344 * 366 *
345 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI 367 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
346 */ 368 */
347 -class SrsKafkaTopicMetadataRequest 369 +class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest
348 { 370 {
349 private: 371 private:
350 SrsKafkaRequestHeader header; 372 SrsKafkaRequestHeader header;
351 - SrsKafkaArray<SrsKafkaString*> request; 373 + SrsKafkaArray<SrsKafkaString*> topics;
352 public: 374 public:
353 SrsKafkaTopicMetadataRequest(); 375 SrsKafkaTopicMetadataRequest();
354 virtual ~SrsKafkaTopicMetadataRequest(); 376 virtual ~SrsKafkaTopicMetadataRequest();
  377 +public:
  378 + virtual void add_topic(std::string topic);
355 }; 379 };
356 380
357 -class SrsKafkaTopicMetadataResponse 381 +/**
  382 + * response for the metadata request from broker.
  383 + * The response contains metadata for each partition,
  384 + * with partitions grouped together by topic. This
  385 + * metadata refers to brokers by their broker id.
  386 + * The brokers each have a host and port.
  387 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
  388 + */
  389 +class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
358 { 390 {
359 private: 391 private:
360 - SrsKafkaRequestHeader header; 392 + SrsKafkaResponseHeader header;
361 public: 393 public:
362 SrsKafkaTopicMetadataResponse(); 394 SrsKafkaTopicMetadataResponse();
363 virtual ~SrsKafkaTopicMetadataResponse(); 395 virtual ~SrsKafkaTopicMetadataResponse();
@@ -378,7 +410,7 @@ public: @@ -378,7 +410,7 @@ public:
378 * write the message to kafka server. 410 * write the message to kafka server.
379 * @param msg the msg to send. user must not free it again. 411 * @param msg the msg to send. user must not free it again.
380 */ 412 */
381 - virtual int send_and_free_message(SrsKafkaMessage* msg); 413 + virtual int send_and_free_message(SrsKafkaRequest* msg);
382 }; 414 };
383 415
384 /** 416 /**