winlin

refs #1670: support decode the metadata response.

@@ -273,6 +273,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -273,6 +273,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
273 #define ERROR_KAFKA_CODEC_REQUEST 4032 273 #define ERROR_KAFKA_CODEC_REQUEST 4032
274 #define ERROR_KAFKA_CODEC_RESPONSE 4033 274 #define ERROR_KAFKA_CODEC_RESPONSE 4033
275 #define ERROR_KAFKA_CODEC_ARRAY 4034 275 #define ERROR_KAFKA_CODEC_ARRAY 4034
  276 +#define ERROR_KAFKA_CODEC_METADATA 4035
276 277
277 /////////////////////////////////////////////////////// 278 ///////////////////////////////////////////////////////
278 // HTTP API error. 279 // HTTP API error.
@@ -601,6 +601,200 @@ int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf) @@ -601,6 +601,200 @@ int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf)
601 return ret; 601 return ret;
602 } 602 }
603 603
  604 +SrsKafkaBroker::SrsKafkaBroker()
  605 +{
  606 + node_id = port = 0;
  607 +}
  608 +
  609 +SrsKafkaBroker::~SrsKafkaBroker()
  610 +{
  611 +}
  612 +
  613 +int SrsKafkaBroker::size()
  614 +{
  615 + return 4 + host.size() + 4;
  616 +}
  617 +
  618 +int SrsKafkaBroker::encode(SrsBuffer* buf)
  619 +{
  620 + int ret = ERROR_SUCCESS;
  621 +
  622 + if (!buf->require(4)) {
  623 + ret = ERROR_KAFKA_CODEC_METADATA;
  624 + srs_error("kafka encode broker node_id failed. ret=%d", ret);
  625 + return ret;
  626 + }
  627 + buf->write_4bytes(node_id);
  628 +
  629 + if ((ret = host.encode(buf)) != ERROR_SUCCESS) {
  630 + srs_error("kafka encode broker host failed. ret=%d", ret);
  631 + return ret;
  632 + }
  633 +
  634 + if (!buf->require(4)) {
  635 + ret = ERROR_KAFKA_CODEC_METADATA;
  636 + srs_error("kafka encode broker port failed. ret=%d", ret);
  637 + return ret;
  638 + }
  639 + buf->write_4bytes(port);
  640 +
  641 + return ret;
  642 +}
  643 +
  644 +int SrsKafkaBroker::decode(SrsBuffer* buf)
  645 +{
  646 + int ret = ERROR_SUCCESS;
  647 +
  648 + if (!buf->require(4)) {
  649 + ret = ERROR_KAFKA_CODEC_METADATA;
  650 + srs_error("kafka decode broker node_id failed. ret=%d", ret);
  651 + return ret;
  652 + }
  653 + node_id = buf->read_4bytes();
  654 +
  655 + if ((ret = host.decode(buf)) != ERROR_SUCCESS) {
  656 + srs_error("kafka decode broker host failed. ret=%d", ret);
  657 + return ret;
  658 + }
  659 +
  660 + if (!buf->require(4)) {
  661 + ret = ERROR_KAFKA_CODEC_METADATA;
  662 + srs_error("kafka decode broker port failed. ret=%d", ret);
  663 + return ret;
  664 + }
  665 + port = buf->read_4bytes();
  666 +
  667 + return ret;
  668 +}
  669 +
  670 +SrsKafkaPartitionMetadata::SrsKafkaPartitionMetadata()
  671 +{
  672 + error_code = 0;
  673 + partition_id = 0;
  674 + leader = 0;
  675 +}
  676 +
  677 +SrsKafkaPartitionMetadata::~SrsKafkaPartitionMetadata()
  678 +{
  679 +}
  680 +
  681 +int SrsKafkaPartitionMetadata::size()
  682 +{
  683 + return 2 + 4 + 4 + replicas.size() + isr.size();
  684 +}
  685 +
  686 +int SrsKafkaPartitionMetadata::encode(SrsBuffer* buf)
  687 +{
  688 + int ret = ERROR_SUCCESS;
  689 +
  690 + if (!buf->require(2 + 4 + 4)) {
  691 + ret = ERROR_KAFKA_CODEC_METADATA;
  692 + srs_error("kafka encode partition metadata failed. ret=%d", ret);
  693 + return ret;
  694 + }
  695 + buf->write_2bytes(error_code);
  696 + buf->write_4bytes(partition_id);
  697 + buf->write_4bytes(leader);
  698 +
  699 + if ((ret = replicas.encode(buf)) != ERROR_SUCCESS) {
  700 + srs_error("kafka encode partition metadata replicas failed. ret=%d", ret);
  701 + return ret;
  702 + }
  703 + if ((ret = isr.encode(buf)) != ERROR_SUCCESS) {
  704 + srs_error("kafka encode partition metadata isr failed. ret=%d", ret);
  705 + return ret;
  706 + }
  707 +
  708 + return ret;
  709 +}
  710 +
  711 +int SrsKafkaPartitionMetadata::decode(SrsBuffer* buf)
  712 +{
  713 + int ret = ERROR_SUCCESS;
  714 +
  715 + if (!buf->require(2 + 4 + 4)) {
  716 + ret = ERROR_KAFKA_CODEC_METADATA;
  717 + srs_error("kafka decode partition metadata failed. ret=%d", ret);
  718 + return ret;
  719 + }
  720 + error_code = buf->read_2bytes();
  721 + partition_id = buf->read_4bytes();
  722 + leader = buf->read_4bytes();
  723 +
  724 + if ((ret = replicas.decode(buf)) != ERROR_SUCCESS) {
  725 + srs_error("kafka decode partition metadata replicas failed. ret=%d", ret);
  726 + return ret;
  727 + }
  728 + if ((ret = isr.decode(buf)) != ERROR_SUCCESS) {
  729 + srs_error("kafka decode partition metadata isr failed. ret=%d", ret);
  730 + return ret;
  731 + }
  732 +
  733 + return ret;
  734 +}
  735 +
  736 +SrsKafkaTopicMetadata::SrsKafkaTopicMetadata()
  737 +{
  738 + error_code = 0;
  739 +}
  740 +
  741 +SrsKafkaTopicMetadata::~SrsKafkaTopicMetadata()
  742 +{
  743 +}
  744 +
  745 +int SrsKafkaTopicMetadata::size()
  746 +{
  747 + return 2 + name.size() + metadatas.size();
  748 +}
  749 +
  750 +int SrsKafkaTopicMetadata::encode(SrsBuffer* buf)
  751 +{
  752 + int ret = ERROR_SUCCESS;
  753 +
  754 + if (!buf->require(2)) {
  755 + ret = ERROR_KAFKA_CODEC_METADATA;
  756 + srs_error("kafka encode topic metadata failed. ret=%d", ret);
  757 + return ret;
  758 + }
  759 + buf->write_2bytes(error_code);
  760 +
  761 + if ((ret = name.encode(buf)) != ERROR_SUCCESS) {
  762 + srs_error("kafka encode topic name failed. ret=%d", ret);
  763 + return ret;
  764 + }
  765 +
  766 + if ((ret = metadatas.encode(buf)) != ERROR_SUCCESS) {
  767 + srs_error("kafka encode topic metadatas failed. ret=%d", ret);
  768 + return ret;
  769 + }
  770 +
  771 + return ret;
  772 +}
  773 +
  774 +int SrsKafkaTopicMetadata::decode(SrsBuffer* buf)
  775 +{
  776 + int ret = ERROR_SUCCESS;
  777 +
  778 + if (!buf->require(2)) {
  779 + ret = ERROR_KAFKA_CODEC_METADATA;
  780 + srs_error("kafka decode topic metadata failed. ret=%d", ret);
  781 + return ret;
  782 + }
  783 + error_code = buf->read_2bytes();
  784 +
  785 + if ((ret = name.decode(buf)) != ERROR_SUCCESS) {
  786 + srs_error("kafka decode topic name failed. ret=%d", ret);
  787 + return ret;
  788 + }
  789 +
  790 + if ((ret = metadatas.decode(buf)) != ERROR_SUCCESS) {
  791 + srs_error("kafka decode topic metadatas failed. ret=%d", ret);
  792 + return ret;
  793 + }
  794 +
  795 + return ret;
  796 +}
  797 +
604 SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() 798 SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse()
605 { 799 {
606 } 800 }
@@ -611,8 +805,7 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() @@ -611,8 +805,7 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
611 805
612 int SrsKafkaTopicMetadataResponse::size() 806 int SrsKafkaTopicMetadataResponse::size()
613 { 807 {
614 - // TODO: FIXME: implements it.  
615 - return SrsKafkaResponse::size(); 808 + return SrsKafkaResponse::size() + brokers.size() + metadatas.size();
616 } 809 }
617 810
618 int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) 811 int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
@@ -624,7 +817,16 @@ int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) @@ -624,7 +817,16 @@ int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
624 return ret; 817 return ret;
625 } 818 }
626 819
627 - // TODO: FIXME: implements it. 820 + if ((ret = brokers.encode(buf)) != ERROR_SUCCESS) {
  821 + srs_error("kafka encode metadata brokers failed. ret=%d", ret);
  822 + return ret;
  823 + }
  824 +
  825 + if ((ret = metadatas.encode(buf)) != ERROR_SUCCESS) {
  826 + srs_error("kafka encode metadatas failed. ret=%d", ret);
  827 + return ret;
  828 + }
  829 +
628 return ret; 830 return ret;
629 } 831 }
630 832
@@ -637,7 +839,16 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) @@ -637,7 +839,16 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
637 return ret; 839 return ret;
638 } 840 }
639 841
640 - // TODO: FIXME: implements it. 842 + if ((ret = brokers.decode(buf)) != ERROR_SUCCESS) {
  843 + srs_error("kafka decode metadata brokers failed. ret=%d", ret);
  844 + return ret;
  845 + }
  846 +
  847 + if ((ret = metadatas.decode(buf)) != ERROR_SUCCESS) {
  848 + srs_error("kafka decode metadatas failed. ret=%d", ret);
  849 + return ret;
  850 + }
  851 +
641 return ret; 852 return ret;
642 } 853 }
643 854
@@ -190,7 +190,7 @@ public: @@ -190,7 +190,7 @@ public:
190 srs_error("kafka decode array failed. ret=%d", ret); 190 srs_error("kafka decode array failed. ret=%d", ret);
191 return ret; 191 return ret;
192 } 192 }
193 - length = buf->read_2bytes(); 193 + length = buf->read_4bytes();
194 194
195 for (int i = 0; i < length; i++) { 195 for (int i = 0; i < length; i++) {
196 T* elem = new T(); 196 T* elem = new T();
@@ -206,6 +206,78 @@ public: @@ -206,6 +206,78 @@ public:
206 return ret; 206 return ret;
207 } 207 }
208 }; 208 };
  209 +template<>
  210 +class SrsKafkaArray<int32_t> : public ISrsCodec
  211 +{
  212 +private:
  213 + int32_t length;
  214 + std::vector<int32_t> elems;
  215 + typedef std::vector<int32_t>::iterator SrsIterator;
  216 +public:
  217 + SrsKafkaArray()
  218 + {
  219 + length = 0;
  220 + }
  221 + virtual ~SrsKafkaArray()
  222 + {
  223 + elems.clear();
  224 + }
  225 +public:
  226 + virtual void append(int32_t elem)
  227 + {
  228 + length++;
  229 + elems.push_back(elem);
  230 + }
  231 + // interface ISrsCodec
  232 +public:
  233 + virtual int size()
  234 + {
  235 + return 4 + sizeof(int32_t) * (int)elems.size();
  236 + }
  237 + virtual int encode(SrsBuffer* buf)
  238 + {
  239 + int ret = ERROR_SUCCESS;
  240 +
  241 + if (!buf->require(4 + sizeof(int32_t) * (int)elems.size())) {
  242 + ret = ERROR_KAFKA_CODEC_ARRAY;
  243 + srs_error("kafka encode array failed. ret=%d", ret);
  244 + return ret;
  245 + }
  246 + buf->write_4bytes(length);
  247 +
  248 + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
  249 + int32_t elem = *it;
  250 + buf->write_4bytes(elem);
  251 + }
  252 +
  253 + return ret;
  254 + }
  255 + virtual int decode(SrsBuffer* buf)
  256 + {
  257 + int ret = ERROR_SUCCESS;
  258 +
  259 + if (!buf->require(4)) {
  260 + ret = ERROR_KAFKA_CODEC_ARRAY;
  261 + srs_error("kafka decode array failed. ret=%d", ret);
  262 + return ret;
  263 + }
  264 + length = buf->read_4bytes();
  265 +
  266 + for (int i = 0; i < length; i++) {
  267 + if (!buf->require(sizeof(int32_t))) {
  268 + ret = ERROR_KAFKA_CODEC_ARRAY;
  269 + srs_error("kafka decode array elem failed. ret=%d", ret);
  270 + return ret;
  271 +
  272 + }
  273 +
  274 + int32_t elem = buf->read_4bytes();
  275 + elems.push_back(elem);
  276 + }
  277 +
  278 + return ret;
  279 + }
  280 +};
209 281
210 /** 282 /**
211 * the header of request, includes the size of request. 283 * the header of request, includes the size of request.
@@ -533,6 +605,58 @@ public: @@ -533,6 +605,58 @@ public:
533 }; 605 };
534 606
535 /** 607 /**
  608 + * the metadata response data.
  609 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
  610 + */
  611 +struct SrsKafkaBroker : public ISrsCodec
  612 +{
  613 +public:
  614 + int32_t node_id;
  615 + SrsKafkaString host;
  616 + int32_t port;
  617 +public:
  618 + SrsKafkaBroker();
  619 + virtual ~SrsKafkaBroker();
  620 +// interface ISrsCodec
  621 +public:
  622 + virtual int size();
  623 + virtual int encode(SrsBuffer* buf);
  624 + virtual int decode(SrsBuffer* buf);
  625 +};
  626 +struct SrsKafkaPartitionMetadata : public ISrsCodec
  627 +{
  628 +public:
  629 + int16_t error_code;
  630 + int32_t partition_id;
  631 + int32_t leader;
  632 + SrsKafkaArray<int32_t> replicas;
  633 + SrsKafkaArray<int32_t> isr;
  634 +public:
  635 + SrsKafkaPartitionMetadata();
  636 + virtual ~SrsKafkaPartitionMetadata();
  637 +// interface ISrsCodec
  638 +public:
  639 + virtual int size();
  640 + virtual int encode(SrsBuffer* buf);
  641 + virtual int decode(SrsBuffer* buf);
  642 +};
  643 +struct SrsKafkaTopicMetadata : public ISrsCodec
  644 +{
  645 +public:
  646 + int16_t error_code;
  647 + SrsKafkaString name;
  648 + SrsKafkaArray<SrsKafkaPartitionMetadata> metadatas;
  649 +public:
  650 + SrsKafkaTopicMetadata();
  651 + virtual ~SrsKafkaTopicMetadata();
  652 +// interface ISrsCodec
  653 +public:
  654 + virtual int size();
  655 + virtual int encode(SrsBuffer* buf);
  656 + virtual int decode(SrsBuffer* buf);
  657 +};
  658 +
  659 +/**
536 * response for the metadata request from broker. 660 * response for the metadata request from broker.
537 * The response contains metadata for each partition, 661 * The response contains metadata for each partition,
538 * with partitions grouped together by topic. This 662 * with partitions grouped together by topic. This
@@ -542,6 +666,9 @@ public: @@ -542,6 +666,9 @@ public:
542 */ 666 */
543 class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse 667 class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
544 { 668 {
  669 +private:
  670 + SrsKafkaArray<SrsKafkaBroker> brokers;
  671 + SrsKafkaArray<SrsKafkaTopicMetadata> metadatas;
545 public: 672 public:
546 SrsKafkaTopicMetadataResponse(); 673 SrsKafkaTopicMetadataResponse();
547 virtual ~SrsKafkaTopicMetadataResponse(); 674 virtual ~SrsKafkaTopicMetadataResponse();