winlin

kafka recv and decode message.

@@ -371,7 +371,7 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf) @@ -371,7 +371,7 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf)
371 SrsKafkaResponseHeader::SrsKafkaResponseHeader() 371 SrsKafkaResponseHeader::SrsKafkaResponseHeader()
372 { 372 {
373 _size = 0; 373 _size = 0;
374 - correlation_id = 0; 374 + _correlation_id = 0;
375 } 375 }
376 376
377 SrsKafkaResponseHeader::~SrsKafkaResponseHeader() 377 SrsKafkaResponseHeader::~SrsKafkaResponseHeader()
@@ -398,6 +398,11 @@ void SrsKafkaResponseHeader::set_total_size(int s) @@ -398,6 +398,11 @@ void SrsKafkaResponseHeader::set_total_size(int s)
398 _size = s - 4; 398 _size = s - 4;
399 } 399 }
400 400
  401 +int32_t SrsKafkaResponseHeader::correlation_id()
  402 +{
  403 + return _correlation_id;
  404 +}
  405 +
401 int SrsKafkaResponseHeader::size() 406 int SrsKafkaResponseHeader::size()
402 { 407 {
403 return 4 + header_size(); 408 return 4 + header_size();
@@ -414,7 +419,7 @@ int SrsKafkaResponseHeader::encode(SrsBuffer* buf) @@ -414,7 +419,7 @@ int SrsKafkaResponseHeader::encode(SrsBuffer* buf)
414 } 419 }
415 420
416 buf->write_4bytes(_size); 421 buf->write_4bytes(_size);
417 - buf->write_4bytes(correlation_id); 422 + buf->write_4bytes(_correlation_id);
418 423
419 return ret; 424 return ret;
420 } 425 }
@@ -440,7 +445,7 @@ int SrsKafkaResponseHeader::decode(SrsBuffer* buf) @@ -440,7 +445,7 @@ int SrsKafkaResponseHeader::decode(SrsBuffer* buf)
440 srs_error("kafka decode response message failed. ret=%d", ret); 445 srs_error("kafka decode response message failed. ret=%d", ret);
441 return ret; 446 return ret;
442 } 447 }
443 - correlation_id = buf->read_4bytes(); 448 + _correlation_id = buf->read_4bytes();
444 449
445 return ret; 450 return ret;
446 } 451 }
@@ -658,17 +663,31 @@ int32_t SrsKafkaCorrelationPool::generate_correlation_id() @@ -658,17 +663,31 @@ int32_t SrsKafkaCorrelationPool::generate_correlation_id()
658 return cid++; 663 return cid++;
659 } 664 }
660 665
661 -void SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request) 666 +SrsKafkaApiKey SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request)
662 { 667 {
  668 + SrsKafkaApiKey previous = SrsKafkaApiKeyUnknown;
  669 +
  670 + std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id);
  671 + if (it != correlation_ids.end()) {
  672 + previous = it->second;
  673 + }
  674 +
663 correlation_ids[correlation_id] = request; 675 correlation_ids[correlation_id] = request;
  676 +
  677 + return previous;
664 } 678 }
665 679
666 -void SrsKafkaCorrelationPool::unset(int32_t correlation_id) 680 +SrsKafkaApiKey SrsKafkaCorrelationPool::unset(int32_t correlation_id)
667 { 681 {
668 std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id); 682 std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id);
  683 +
669 if (it != correlation_ids.end()) { 684 if (it != correlation_ids.end()) {
  685 + SrsKafkaApiKey key = it->second;
670 correlation_ids.erase(it); 686 correlation_ids.erase(it);
  687 + return key;
671 } 688 }
  689 +
  690 + return SrsKafkaApiKeyUnknown;
672 } 691 }
673 692
674 SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) 693 SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id)
@@ -742,27 +761,72 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) @@ -742,27 +761,72 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg)
742 761
743 int ret = ERROR_SUCCESS; 762 int ret = ERROR_SUCCESS;
744 763
745 - SrsKafkaResponseHeader header;  
746 - while (reader->size() < header.size()) { 764 + while (true) {
  765 + SrsKafkaResponseHeader header;
  766 +
  767 + // ensure enough bytes for response header.
747 if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) { 768 if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) {
748 srs_error("kafka recv message failed. ret=%d", ret); 769 srs_error("kafka recv message failed. ret=%d", ret);
749 return ret; 770 return ret;
750 } 771 }
  772 +
  773 + // decode response header.
  774 + SrsBuffer buffer;
  775 + if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) {
  776 + return ret;
  777 + }
  778 +
  779 + SrsBuffer* buf = &buffer;
  780 + if ((ret = header.decode(buf)) != ERROR_SUCCESS) {
  781 + srs_error("kafka decode response header failed. ret=%d", ret);
  782 + return ret;
  783 + }
  784 +
  785 + // skip the used buffer for header.
  786 + buf->skip(-1 * buf->pos());
  787 +
  788 + // fetch cached api key.
  789 + SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance();
  790 + SrsKafkaApiKey key = pool->unset(header.correlation_id());
  791 + srs_trace("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id());
  792 +
  793 + // create message by cached api key.
  794 + SrsKafkaResponse* res = NULL;
  795 + switch (key) {
  796 + case SrsKafkaApiKeyMetadataRequest:
  797 + srs_info("kafka got metadata response");
  798 + res = new SrsKafkaTopicMetadataResponse();
  799 + break;
  800 + case SrsKafkaApiKeyUnknown:
  801 + default:
  802 + break;
  803 + }
  804 +
  805 + // ensure enough bytes to decode message.
  806 + if ((ret = reader->grow(skt, header.total_size())) != ERROR_SUCCESS) {
  807 + srs_freep(res);
  808 + srs_error("kafka recv message body failed. ret=%d", ret);
  809 + return ret;
  810 + }
  811 +
  812 + // dropped message, fetch next.
  813 + if (!res) {
  814 + reader->skip(header.total_size());
  815 + srs_warn("kafka ignore unknown message, size=%d.", header.total_size());
  816 + continue;
  817 + }
  818 +
  819 + // parse the whole message.
  820 + if ((ret = res->decode(buf)) != ERROR_SUCCESS) {
  821 + srs_freep(res);
  822 + srs_error("kafka decode message failed. ret=%d", ret);
  823 + return ret;
  824 + }
  825 +
  826 + *pmsg = res;
  827 + break;
751 } 828 }
752 829
753 - SrsBuffer buffer;  
754 - if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) {  
755 - return ret;  
756 - }  
757 -  
758 - SrsBuffer* buf = &buffer;  
759 - if ((ret = header.decode(buf)) != ERROR_SUCCESS) {  
760 - srs_error("kafka decode response header failed. ret=%d", ret);  
761 - return ret;  
762 - }  
763 -  
764 - // TODO: FIXME: decode message.  
765 -  
766 return ret; 830 return ret;
767 } 831 }
768 832
@@ -339,7 +339,7 @@ private: @@ -339,7 +339,7 @@ private:
339 * the response by the server, unmodified. It is useful for matching 339 * the response by the server, unmodified. It is useful for matching
340 * request and response between the client and server. 340 * request and response between the client and server.
341 */ 341 */
342 - int32_t correlation_id; 342 + int32_t _correlation_id;
343 public: 343 public:
344 SrsKafkaResponseHeader(); 344 SrsKafkaResponseHeader();
345 virtual ~SrsKafkaResponseHeader(); 345 virtual ~SrsKafkaResponseHeader();
@@ -361,8 +361,9 @@ private: @@ -361,8 +361,9 @@ private:
361 * the size of message, the bytes left after the header. 361 * the size of message, the bytes left after the header.
362 */ 362 */
363 virtual int message_size(); 363 virtual int message_size();
  364 +public:
364 /** 365 /**
365 - * the total size of the request, includes the 4B size. 366 + * the total size of the request, includes the 4B size and message body.
366 */ 367 */
367 virtual int total_size(); 368 virtual int total_size();
368 public: 369 public:
@@ -371,6 +372,10 @@ public: @@ -371,6 +372,10 @@ public:
371 * @param s the whole message, including the 4 bytes size size. 372 * @param s the whole message, including the 4 bytes size size.
372 */ 373 */
373 virtual void set_total_size(int s); 374 virtual void set_total_size(int s);
  375 + /**
  376 + * get the correlation id of response message.
  377 + */
  378 + virtual int32_t correlation_id();
374 // interface ISrsCodec 379 // interface ISrsCodec
375 public: 380 public:
376 virtual int size(); 381 virtual int size();
@@ -567,9 +572,23 @@ private: @@ -567,9 +572,23 @@ private:
567 public: 572 public:
568 virtual ~SrsKafkaCorrelationPool(); 573 virtual ~SrsKafkaCorrelationPool();
569 public: 574 public:
  575 + /**
  576 + * generate a global correlation id.
  577 + */
570 virtual int32_t generate_correlation_id(); 578 virtual int32_t generate_correlation_id();
571 - virtual void set(int32_t correlation_id, SrsKafkaApiKey request);  
572 - virtual void unset(int32_t correlation_id); 579 + /**
  580 + * set the correlation id to specified request key.
  581 + */
  582 + virtual SrsKafkaApiKey set(int32_t correlation_id, SrsKafkaApiKey request);
  583 + /**
  584 + * unset the correlation id.
  585 + * @return the previous api key; unknown if not set.
  586 + */
  587 + virtual SrsKafkaApiKey unset(int32_t correlation_id);
  588 + /**
  589 + * get the key by specified correlation id.
  590 + * @return the specified api key; unknown if no correlation id.
  591 + */
573 virtual SrsKafkaApiKey get(int32_t correlation_id); 592 virtual SrsKafkaApiKey get(int32_t correlation_id);
574 }; 593 };
575 594