winlin

for kafka, support correlation id cache.

@@ -221,8 +221,8 @@ int SrsKafkaBytes::decode(SrsBuffer* buf) @@ -221,8 +221,8 @@ int SrsKafkaBytes::decode(SrsBuffer* buf)
221 SrsKafkaRequestHeader::SrsKafkaRequestHeader() 221 SrsKafkaRequestHeader::SrsKafkaRequestHeader()
222 { 222 {
223 _size = 0; 223 _size = 0;
224 - api_key = api_version = 0;  
225 - correlation_id = 0; 224 + _api_key = api_version = 0;
  225 + _correlation_id = 0;
226 client_id = new SrsKafkaString(); 226 client_id = new SrsKafkaString();
227 } 227 }
228 228
@@ -246,49 +246,69 @@ int SrsKafkaRequestHeader::total_size() @@ -246,49 +246,69 @@ int SrsKafkaRequestHeader::total_size()
246 return 4 + _size; 246 return 4 + _size;
247 } 247 }
248 248
  249 +void SrsKafkaRequestHeader::set_total_size(int s)
  250 +{
  251 + _size = s - 4;
  252 +}
  253 +
  254 +int32_t SrsKafkaRequestHeader::correlation_id()
  255 +{
  256 + return _correlation_id;
  257 +}
  258 +
  259 +void SrsKafkaRequestHeader::set_correlation_id(int32_t cid)
  260 +{
  261 + _correlation_id = cid;
  262 +}
  263 +
  264 +SrsKafkaApiKey SrsKafkaRequestHeader::api_key()
  265 +{
  266 + return (SrsKafkaApiKey)_api_key;
  267 +}
  268 +
  269 +void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key)
  270 +{
  271 + _api_key = (int16_t)key;
  272 +}
  273 +
249 bool SrsKafkaRequestHeader::is_producer_request() 274 bool SrsKafkaRequestHeader::is_producer_request()
250 { 275 {
251 - return api_key == SrsKafkaApiKeyProduceRequest; 276 + return _api_key == SrsKafkaApiKeyProduceRequest;
252 } 277 }
253 278
254 bool SrsKafkaRequestHeader::is_fetch_request() 279 bool SrsKafkaRequestHeader::is_fetch_request()
255 { 280 {
256 - return api_key == SrsKafkaApiKeyFetchRequest; 281 + return _api_key == SrsKafkaApiKeyFetchRequest;
257 } 282 }
258 283
259 bool SrsKafkaRequestHeader::is_offset_request() 284 bool SrsKafkaRequestHeader::is_offset_request()
260 { 285 {
261 - return api_key == SrsKafkaApiKeyOffsetRequest; 286 + return _api_key == SrsKafkaApiKeyOffsetRequest;
262 } 287 }
263 288
264 bool SrsKafkaRequestHeader::is_metadata_request() 289 bool SrsKafkaRequestHeader::is_metadata_request()
265 { 290 {
266 - return api_key == SrsKafkaApiKeyMetadataRequest; 291 + return _api_key == SrsKafkaApiKeyMetadataRequest;
267 } 292 }
268 293
269 bool SrsKafkaRequestHeader::is_offset_commit_request() 294 bool SrsKafkaRequestHeader::is_offset_commit_request()
270 { 295 {
271 - return api_key == SrsKafkaApiKeyOffsetCommitRequest; 296 + return _api_key == SrsKafkaApiKeyOffsetCommitRequest;
272 } 297 }
273 298
274 bool SrsKafkaRequestHeader::is_offset_fetch_request() 299 bool SrsKafkaRequestHeader::is_offset_fetch_request()
275 { 300 {
276 - return api_key == SrsKafkaApiKeyOffsetFetchRequest; 301 + return _api_key == SrsKafkaApiKeyOffsetFetchRequest;
277 } 302 }
278 303
279 bool SrsKafkaRequestHeader::is_consumer_metadata_request() 304 bool SrsKafkaRequestHeader::is_consumer_metadata_request()
280 { 305 {
281 - return api_key == SrsKafkaApiKeyConsumerMetadataRequest;  
282 -}  
283 -  
284 -void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key)  
285 -{  
286 - api_key = (int16_t)key; 306 + return _api_key == SrsKafkaApiKeyConsumerMetadataRequest;
287 } 307 }
288 308
289 int SrsKafkaRequestHeader::size() 309 int SrsKafkaRequestHeader::size()
290 { 310 {
291 - return 4 + _size; 311 + return 4 + header_size();
292 } 312 }
293 313
294 int SrsKafkaRequestHeader::encode(SrsBuffer* buf) 314 int SrsKafkaRequestHeader::encode(SrsBuffer* buf)
@@ -302,9 +322,9 @@ int SrsKafkaRequestHeader::encode(SrsBuffer* buf) @@ -302,9 +322,9 @@ int SrsKafkaRequestHeader::encode(SrsBuffer* buf)
302 } 322 }
303 323
304 buf->write_4bytes(_size); 324 buf->write_4bytes(_size);
305 - buf->write_2bytes(api_key); 325 + buf->write_2bytes(_api_key);
306 buf->write_2bytes(api_version); 326 buf->write_2bytes(api_version);
307 - buf->write_4bytes(correlation_id); 327 + buf->write_4bytes(_correlation_id);
308 328
309 if ((ret = client_id->encode(buf)) != ERROR_SUCCESS) { 329 if ((ret = client_id->encode(buf)) != ERROR_SUCCESS) {
310 srs_error("kafka encode request client_id failed. ret=%d", ret); 330 srs_error("kafka encode request client_id failed. ret=%d", ret);
@@ -335,9 +355,9 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf) @@ -335,9 +355,9 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf)
335 srs_error("kafka decode request message failed. ret=%d", ret); 355 srs_error("kafka decode request message failed. ret=%d", ret);
336 return ret; 356 return ret;
337 } 357 }
338 - api_key = buf->read_2bytes(); 358 + _api_key = buf->read_2bytes();
339 api_version = buf->read_2bytes(); 359 api_version = buf->read_2bytes();
340 - correlation_id = buf->read_4bytes(); 360 + _correlation_id = buf->read_4bytes();
341 361
342 if ((ret = client_id->decode(buf)) != ERROR_SUCCESS) { 362 if ((ret = client_id->decode(buf)) != ERROR_SUCCESS) {
343 srs_error("kafka decode request client_id failed. ret=%d", ret); 363 srs_error("kafka decode request client_id failed. ret=%d", ret);
@@ -372,9 +392,14 @@ int SrsKafkaResponseHeader::total_size() @@ -372,9 +392,14 @@ int SrsKafkaResponseHeader::total_size()
372 return 4 + _size; 392 return 4 + _size;
373 } 393 }
374 394
  395 +void SrsKafkaResponseHeader::set_total_size(int s)
  396 +{
  397 + _size = s - 4;
  398 +}
  399 +
375 int SrsKafkaResponseHeader::size() 400 int SrsKafkaResponseHeader::size()
376 { 401 {
377 - return 4 + _size; 402 + return 4 + header_size();
378 } 403 }
379 404
380 int SrsKafkaResponseHeader::encode(SrsBuffer* buf) 405 int SrsKafkaResponseHeader::encode(SrsBuffer* buf)
@@ -452,12 +477,28 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet() @@ -452,12 +477,28 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet()
452 477
453 SrsKafkaRequest::SrsKafkaRequest() 478 SrsKafkaRequest::SrsKafkaRequest()
454 { 479 {
  480 + header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id());
455 } 481 }
456 482
457 SrsKafkaRequest::~SrsKafkaRequest() 483 SrsKafkaRequest::~SrsKafkaRequest()
458 { 484 {
459 } 485 }
460 486
  487 +void SrsKafkaRequest::update_header(int s)
  488 +{
  489 + header.set_total_size(s);
  490 +}
  491 +
  492 +int32_t SrsKafkaRequest::correlation_id()
  493 +{
  494 + return header.correlation_id();
  495 +}
  496 +
  497 +SrsKafkaApiKey SrsKafkaRequest::api_key()
  498 +{
  499 + return header.api_key();
  500 +}
  501 +
461 int SrsKafkaRequest::size() 502 int SrsKafkaRequest::size()
462 { 503 {
463 return header.size(); 504 return header.size();
@@ -481,6 +522,11 @@ SrsKafkaResponse::~SrsKafkaResponse() @@ -481,6 +522,11 @@ SrsKafkaResponse::~SrsKafkaResponse()
481 { 522 {
482 } 523 }
483 524
  525 +void SrsKafkaResponse::update_header(int s)
  526 +{
  527 + header.set_total_size(s);
  528 +}
  529 +
484 int SrsKafkaResponse::size() 530 int SrsKafkaResponse::size()
485 { 531 {
486 return header.size(); 532 return header.size();
@@ -589,6 +635,50 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) @@ -589,6 +635,50 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
589 return ret; 635 return ret;
590 } 636 }
591 637
  638 +SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool();
  639 +
  640 +SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance()
  641 +{
  642 + return _instance;
  643 +}
  644 +
  645 +SrsKafkaCorrelationPool::SrsKafkaCorrelationPool()
  646 +{
  647 +}
  648 +
  649 +SrsKafkaCorrelationPool::~SrsKafkaCorrelationPool()
  650 +{
  651 + correlation_ids.clear();
  652 +}
  653 +
  654 +int32_t SrsKafkaCorrelationPool::generate_correlation_id()
  655 +{
  656 + static int32_t cid = 1;
  657 + return cid++;
  658 +}
  659 +
  660 +void SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request)
  661 +{
  662 + correlation_ids[correlation_id] = request;
  663 +}
  664 +
  665 +void SrsKafkaCorrelationPool::unset(int32_t correlation_id)
  666 +{
  667 + std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id);
  668 + if (it != correlation_ids.end()) {
  669 + correlation_ids.erase(it);
  670 + }
  671 +}
  672 +
  673 +SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id)
  674 +{
  675 + if (correlation_ids.find(correlation_id) == correlation_ids.end()) {
  676 + return SrsKafkaApiKeyUnknown;
  677 + }
  678 +
  679 + return correlation_ids[correlation_id];
  680 +}
  681 +
592 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) 682 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io)
593 { 683 {
594 skt = io; 684 skt = io;
@@ -610,6 +700,13 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) @@ -610,6 +700,13 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
610 return ret; 700 return ret;
611 } 701 }
612 702
  703 + // update the header of message.
  704 + msg->update_header(size);
  705 +
  706 + // cache the request correlation id to discovery response message.
  707 + SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance();
  708 + pool->set(msg->correlation_id(), msg->api_key());
  709 +
613 // TODO: FIXME: refine for performance issue. 710 // TODO: FIXME: refine for performance issue.
614 char* bytes = new char[size]; 711 char* bytes = new char[size];
615 SrsAutoFree(char, bytes); 712 SrsAutoFree(char, bytes);
@@ -31,6 +31,7 @@ @@ -31,6 +31,7 @@
31 31
32 #include <vector> 32 #include <vector>
33 #include <string> 33 #include <string>
  34 +#include <map>
34 35
35 #include <srs_kernel_buffer.hpp> 36 #include <srs_kernel_buffer.hpp>
36 #include <srs_kernel_error.hpp> 37 #include <srs_kernel_error.hpp>
@@ -45,6 +46,8 @@ class ISrsProtocolReaderWriter; @@ -45,6 +46,8 @@ class ISrsProtocolReaderWriter;
45 */ 46 */
46 enum SrsKafkaApiKey 47 enum SrsKafkaApiKey
47 { 48 {
  49 + SrsKafkaApiKeyUnknown = -1,
  50 +
48 SrsKafkaApiKeyProduceRequest = 0, 51 SrsKafkaApiKeyProduceRequest = 0,
49 SrsKafkaApiKeyFetchRequest = 1, 52 SrsKafkaApiKeyFetchRequest = 1,
50 SrsKafkaApiKeyOffsetRequest = 2, 53 SrsKafkaApiKeyOffsetRequest = 2,
@@ -203,7 +206,7 @@ private: @@ -203,7 +206,7 @@ private:
203 * a metadata request, a produce request, a fetch request, etc). 206 * a metadata request, a produce request, a fetch request, etc).
204 * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest 207 * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
205 */ 208 */
206 - int16_t api_key; 209 + int16_t _api_key;
207 /** 210 /**
208 * This is a numeric version number for this api. We version each API and 211 * This is a numeric version number for this api. We version each API and
209 * this version number allows the server to properly interpret the request 212 * this version number allows the server to properly interpret the request
@@ -216,7 +219,7 @@ private: @@ -216,7 +219,7 @@ private:
216 * the response by the server, unmodified. It is useful for matching 219 * the response by the server, unmodified. It is useful for matching
217 * request and response between the client and server. 220 * request and response between the client and server.
218 */ 221 */
219 - int32_t correlation_id; 222 + int32_t _correlation_id;
220 /** 223 /**
221 * This is a user supplied identifier for the client application. 224 * This is a user supplied identifier for the client application.
222 * The user can use any identifier they like and it will be used 225 * The user can use any identifier they like and it will be used
@@ -254,6 +257,28 @@ private: @@ -254,6 +257,28 @@ private:
254 virtual int total_size(); 257 virtual int total_size();
255 public: 258 public:
256 /** 259 /**
  260 + * when got the whole message size, update the header.
  261 + * @param s the whole message, including the 4 bytes size size.
  262 + */
  263 + virtual void set_total_size(int s);
  264 + /**
  265 + * get the correlation id for message.
  266 + */
  267 + virtual int32_t correlation_id();
  268 + /**
  269 + * set the correlation id for message.
  270 + */
  271 + virtual void set_correlation_id(int32_t cid);
  272 + /**
  273 + * get the api key of header for message.
  274 + */
  275 + virtual SrsKafkaApiKey api_key();
  276 + /**
  277 + * set the api key of header for message.
  278 + */
  279 + virtual void set_api_key(SrsKafkaApiKey key);
  280 +public:
  281 + /**
257 * the api key enumeration. 282 * the api key enumeration.
258 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys 283 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
259 */ 284 */
@@ -264,8 +289,6 @@ public: @@ -264,8 +289,6 @@ public:
264 virtual bool is_offset_commit_request(); 289 virtual bool is_offset_commit_request();
265 virtual bool is_offset_fetch_request(); 290 virtual bool is_offset_fetch_request();
266 virtual bool is_consumer_metadata_request(); 291 virtual bool is_consumer_metadata_request();
267 - // set the api key.  
268 - virtual void set_api_key(SrsKafkaApiKey key);  
269 // interface ISrsCodec 292 // interface ISrsCodec
270 public: 293 public:
271 virtual int size(); 294 virtual int size();
@@ -321,6 +344,12 @@ private: @@ -321,6 +344,12 @@ private:
321 * the total size of the request, includes the 4B size. 344 * the total size of the request, includes the 4B size.
322 */ 345 */
323 virtual int total_size(); 346 virtual int total_size();
  347 +public:
  348 + /**
  349 + * when got the whole message size, update the header.
  350 + * @param s the whole message, including the 4 bytes size size.
  351 + */
  352 + virtual void set_total_size(int s);
324 // interface ISrsCodec 353 // interface ISrsCodec
325 public: 354 public:
326 virtual int size(); 355 virtual int size();
@@ -403,6 +432,20 @@ protected: @@ -403,6 +432,20 @@ protected:
403 public: 432 public:
404 SrsKafkaRequest(); 433 SrsKafkaRequest();
405 virtual ~SrsKafkaRequest(); 434 virtual ~SrsKafkaRequest();
  435 +public:
  436 + /**
  437 + * update the size in header.
  438 + * @param s an int value specifies the size of message in header.
  439 + */
  440 + virtual void update_header(int s);
  441 + /**
  442 + * get the correlation id of header for message.
  443 + */
  444 + virtual int32_t correlation_id();
  445 + /**
  446 + * get the api key of request.
  447 + */
  448 + virtual SrsKafkaApiKey api_key();
406 // interface ISrsCodec 449 // interface ISrsCodec
407 public: 450 public:
408 virtual int size(); 451 virtual int size();
@@ -420,6 +463,12 @@ protected: @@ -420,6 +463,12 @@ protected:
420 public: 463 public:
421 SrsKafkaResponse(); 464 SrsKafkaResponse();
422 virtual ~SrsKafkaResponse(); 465 virtual ~SrsKafkaResponse();
  466 +public:
  467 + /**
  468 + * update the size in header.
  469 + * @param s an int value specifies the size of message in header.
  470 + */
  471 + virtual void update_header(int s);
423 // interface ISrsCodec 472 // interface ISrsCodec
424 public: 473 public:
425 virtual int size(); 474 virtual int size();
@@ -478,6 +527,32 @@ public: @@ -478,6 +527,32 @@ public:
478 }; 527 };
479 528
480 /** 529 /**
  530 + * the poll to discovery reponse.
  531 + * @param CorrelationId This is a user-supplied integer. It will be passed back
  532 + * in the response by the server, unmodified. It is useful for matching
  533 + * request and response between the client and server.
  534 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
  535 + */
  536 +class SrsKafkaCorrelationPool
  537 +{
  538 +private:
  539 + static SrsKafkaCorrelationPool* _instance;
  540 +public:
  541 + static SrsKafkaCorrelationPool* instance();
  542 +private:
  543 + std::map<int32_t, SrsKafkaApiKey> correlation_ids;
  544 +private:
  545 + SrsKafkaCorrelationPool();
  546 +public:
  547 + virtual ~SrsKafkaCorrelationPool();
  548 +public:
  549 + virtual int32_t generate_correlation_id();
  550 + virtual void set(int32_t correlation_id, SrsKafkaApiKey request);
  551 + virtual void unset(int32_t correlation_id);
  552 + virtual SrsKafkaApiKey get(int32_t correlation_id);
  553 +};
  554 +
  555 +/**
481 * the kafka protocol stack, use to send and recv kakfa messages. 556 * the kafka protocol stack, use to send and recv kakfa messages.
482 */ 557 */
483 class SrsKafkaProtocol 558 class SrsKafkaProtocol