winlin

create the metadata request message

@@ -105,6 +105,46 @@ int SrsKafkaRequestHeader::total_size() @@ -105,6 +105,46 @@ int SrsKafkaRequestHeader::total_size()
105 return 4 + size; 105 return 4 + size;
106 } 106 }
107 107
  108 +bool SrsKafkaRequestHeader::is_producer_request()
  109 +{
  110 + return api_key == SrsKafkaApiKeyProduceRequest;
  111 +}
  112 +
  113 +bool SrsKafkaRequestHeader::is_fetch_request()
  114 +{
  115 + return api_key == SrsKafkaApiKeyFetchRequest;
  116 +}
  117 +
  118 +bool SrsKafkaRequestHeader::is_offset_request()
  119 +{
  120 + return api_key == SrsKafkaApiKeyOffsetRequest;
  121 +}
  122 +
  123 +bool SrsKafkaRequestHeader::is_metadata_request()
  124 +{
  125 + return api_key == SrsKafkaApiKeyMetadataRequest;
  126 +}
  127 +
  128 +bool SrsKafkaRequestHeader::is_offset_commit_request()
  129 +{
  130 + return api_key == SrsKafkaApiKeyOffsetCommitRequest;
  131 +}
  132 +
  133 +bool SrsKafkaRequestHeader::is_offset_fetch_request()
  134 +{
  135 + return api_key == SrsKafkaApiKeyOffsetFetchRequest;
  136 +}
  137 +
  138 +bool SrsKafkaRequestHeader::is_consumer_metadata_request()
  139 +{
  140 + return api_key == SrsKafkaApiKeyConsumerMetadataRequest;
  141 +}
  142 +
  143 +void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key)
  144 +{
  145 + api_key = (int16_t)key;
  146 +}
  147 +
108 SrsKafkaResponse::SrsKafkaResponse() 148 SrsKafkaResponse::SrsKafkaResponse()
109 { 149 {
110 correlation_id = 0; 150 correlation_id = 0;
@@ -145,3 +185,12 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet() @@ -145,3 +185,12 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet()
145 messages.clear(); 185 messages.clear();
146 } 186 }
147 187
  188 +SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest()
  189 +{
  190 + header.set_api_key(SrsKafkaApiKeyMetadataRequest);
  191 +}
  192 +
  193 +SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest()
  194 +{
  195 +}
  196 +
@@ -31,6 +31,19 @@ @@ -31,6 +31,19 @@
31 31
32 #include <vector> 32 #include <vector>
33 33
  34 +// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
  35 +enum SrsKafkaApiKey
  36 +{
  37 + SrsKafkaApiKeyProduceRequest = 0,
  38 + SrsKafkaApiKeyFetchRequest = 1,
  39 + SrsKafkaApiKeyOffsetRequest = 2,
  40 + SrsKafkaApiKeyMetadataRequest = 3,
  41 + /* Non-user facing control APIs 4-7 */
  42 + SrsKafkaApiKeyOffsetCommitRequest = 8,
  43 + SrsKafkaApiKeyOffsetFetchRequest = 9,
  44 + SrsKafkaApiKeyConsumerMetadataRequest = 10,
  45 +};
  46 +
34 /** 47 /**
35 * These types consist of a signed integer giving a length N followed by N bytes of content. 48 * These types consist of a signed integer giving a length N followed by N bytes of content.
36 * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. 49 * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
@@ -74,6 +87,11 @@ public: @@ -74,6 +87,11 @@ public:
74 * int32 size containing the length N followed by N repetitions of the structure which can 87 * int32 size containing the length N followed by N repetitions of the structure which can
75 * itself be made up of other primitive types. In the BNF grammars below we will show an 88 * itself be made up of other primitive types. In the BNF grammars below we will show an
76 * array of a structure foo as [foo]. 89 * array of a structure foo as [foo].
  90 + *
  91 + * Usage:
  92 + * SrsKafkaArray<SrsKafkaBytes> body;
  93 + * body.append(new SrsKafkaBytes());
  94 + *
77 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests 95 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
78 */ 96 */
79 template<typename T> 97 template<typename T>
@@ -96,6 +114,12 @@ public: @@ -96,6 +114,12 @@ public:
96 } 114 }
97 elems.clear(); 115 elems.clear();
98 } 116 }
  117 +public:
  118 + virtual void append(T* elem)
  119 + {
  120 + length++;
  121 + elems.push_back(elem);
  122 + }
99 }; 123 };
100 124
101 /** 125 /**
@@ -161,6 +185,20 @@ public: @@ -161,6 +185,20 @@ public:
161 * @remark total_size = 4 + header_size + message_size. 185 * @remark total_size = 4 + header_size + message_size.
162 */ 186 */
163 virtual int total_size(); 187 virtual int total_size();
  188 +public:
  189 + /**
  190 + * the api key enumeration.
  191 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
  192 + */
  193 + virtual bool is_producer_request();
  194 + virtual bool is_fetch_request();
  195 + virtual bool is_offset_request();
  196 + virtual bool is_metadata_request();
  197 + virtual bool is_offset_commit_request();
  198 + virtual bool is_offset_fetch_request();
  199 + virtual bool is_consumer_metadata_request();
  200 + // set the api key.
  201 + virtual void set_api_key(SrsKafkaApiKey key);
164 }; 202 };
165 203
166 /** 204 /**
@@ -262,7 +300,12 @@ public: @@ -262,7 +300,12 @@ public:
262 */ 300 */
263 class SrsKafkaTopicMetadataRequest 301 class SrsKafkaTopicMetadataRequest
264 { 302 {
  303 +private:
  304 + SrsKafkaRequestHeader header;
  305 + SrsKafkaArray<SrsKafkaString> request;
265 public: 306 public:
  307 + SrsKafkaTopicMetadataRequest();
  308 + virtual ~SrsKafkaTopicMetadataRequest();
266 }; 309 };
267 310
268 #endif 311 #endif