winlin

add structures for kafka

@@ -23,3 +23,125 @@ @@ -23,3 +23,125 @@
23 23
24 #include <srs_kafka_stack.hpp> 24 #include <srs_kafka_stack.hpp>
25 25
  26 +using namespace std;
  27 +
  28 +SrsKafkaString::SrsKafkaString()
  29 +{
  30 + size = -1;
  31 + data = NULL;
  32 +}
  33 +
  34 +SrsKafkaString::~SrsKafkaString()
  35 +{
  36 + srs_freep(data);
  37 +}
  38 +
  39 +bool SrsKafkaString::null()
  40 +{
  41 + return size == -1;
  42 +}
  43 +
  44 +bool SrsKafkaString::empty()
  45 +{
  46 + return size <= 0;
  47 +}
  48 +
  49 +int SrsKafkaString::total_size()
  50 +{
  51 + return 2 + (size == -1? 0 : size);
  52 +}
  53 +
  54 +SrsKafkaBytes::SrsKafkaBytes()
  55 +{
  56 + size = -1;
  57 + data = NULL;
  58 +}
  59 +
  60 +SrsKafkaBytes::~SrsKafkaBytes()
  61 +{
  62 + srs_freep(data);
  63 +}
  64 +
  65 +bool SrsKafkaBytes::null()
  66 +{
  67 + return size == -1;
  68 +}
  69 +
  70 +bool SrsKafkaBytes::empty()
  71 +{
  72 + return size <= 0;
  73 +}
  74 +
  75 +int SrsKafkaBytes::total_size()
  76 +{
  77 + return 4 + (size == -1? 0 : size);
  78 +}
  79 +
  80 +SrsKafkaRequestHeader::SrsKafkaRequestHeader()
  81 +{
  82 + size = 0;
  83 + api_key = api_version = 0;
  84 + correlation_id = 0;
  85 + client_id = new SrsKafkaString();
  86 +}
  87 +
  88 +SrsKafkaRequestHeader::~SrsKafkaRequestHeader()
  89 +{
  90 + srs_freep(client_id);
  91 +}
  92 +
  93 +int SrsKafkaRequestHeader::header_size()
  94 +{
  95 + return 2 + 2 + 4 + client_id->total_size();
  96 +}
  97 +
  98 +int SrsKafkaRequestHeader::message_size()
  99 +{
  100 + return size - header_size();
  101 +}
  102 +
  103 +int SrsKafkaRequestHeader::total_size()
  104 +{
  105 + return 4 + size;
  106 +}
  107 +
  108 +SrsKafkaResponse::SrsKafkaResponse()
  109 +{
  110 + correlation_id = 0;
  111 +}
  112 +
  113 +SrsKafkaResponse::~SrsKafkaResponse()
  114 +{
  115 +}
  116 +
  117 +SrsKafkaMessage::SrsKafkaMessage()
  118 +{
  119 + offset = 0;
  120 + message_size = 0;
  121 +
  122 + crc = 0;
  123 + magic_byte = attributes = 0;
  124 + key = new SrsKafkaBytes();
  125 + value = new SrsKafkaBytes();
  126 +}
  127 +
  128 +SrsKafkaMessage::~SrsKafkaMessage()
  129 +{
  130 + srs_freep(key);
  131 + srs_freep(value);
  132 +}
  133 +
  134 +SrsKafkaMessageSet::SrsKafkaMessageSet()
  135 +{
  136 +}
  137 +
  138 +SrsKafkaMessageSet::~SrsKafkaMessageSet()
  139 +{
  140 + vector<SrsKafkaMessage*>::iterator it;
  141 + for (it = messages.begin(); it != messages.end(); ++it) {
  142 + SrsKafkaMessage* message = *it;
  143 + srs_freep(message);
  144 + }
  145 + messages.clear();
  146 +}
  147 +
@@ -29,5 +29,212 @@ @@ -29,5 +29,212 @@
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
  32 +#include <vector>
  33 +
  34 +/**
  35 + * These types consist of a signed integer giving a length N followed by N bytes of content.
  36 + * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
  37 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
  38 + */
  39 +class SrsKafkaString
  40 +{
  41 +private:
  42 + int16_t size;
  43 + char* data;
  44 +public:
  45 + SrsKafkaString();
  46 + virtual ~SrsKafkaString();
  47 +public:
  48 + virtual bool null();
  49 + virtual bool empty();
  50 + virtual int total_size();
  51 +};
  52 +
  53 +/**
  54 + * These types consist of a signed integer giving a length N followed by N bytes of content.
  55 + * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
  56 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
  57 + */
  58 +class SrsKafkaBytes
  59 +{
  60 +private:
  61 + int32_t size;
  62 + char* data;
  63 +public:
  64 + SrsKafkaBytes();
  65 + virtual ~SrsKafkaBytes();
  66 +public:
  67 + virtual bool null();
  68 + virtual bool empty();
  69 + virtual int total_size();
  70 +};
  71 +
  72 +/**
  73 + * the header of request, includes the size of request.
  74 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
  75 + */
  76 +class SrsKafkaRequestHeader
  77 +{
  78 +private:
  79 + /**
  80 + * The MessageSize field gives the size of the subsequent request or response
  81 + * message in bytes. The client can read requests by first reading this 4 byte
  82 + * size as an integer N, and then reading and parsing the subsequent N bytes
  83 + * of the request.
  84 + */
  85 + int32_t size;
  86 +private:
  87 + /**
  88 + * This is a numeric id for the API being invoked (i.e. is it
  89 + * a metadata request, a produce request, a fetch request, etc).
  90 + * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
  91 + */
  92 + int16_t api_key;
  93 + /**
  94 + * This is a numeric version number for this api. We version each API and
  95 + * this version number allows the server to properly interpret the request
  96 + * as the protocol evolves. Responses will always be in the format corresponding
  97 + * to the request version. Currently the supported version for all APIs is 0.
  98 + */
  99 + int16_t api_version;
  100 + /**
  101 + * This is a user-supplied integer. It will be passed back in
  102 + * the response by the server, unmodified. It is useful for matching
  103 + * request and response between the client and server.
  104 + */
  105 + int32_t correlation_id;
  106 + /**
  107 + * This is a user supplied identifier for the client application.
  108 + * The user can use any identifier they like and it will be used
  109 + * when logging errors, monitoring aggregates, etc. For example,
  110 + * one might want to monitor not just the requests per second overall,
  111 + * but the number coming from each client application (each of
  112 + * which could reside on multiple servers). This id acts as a
  113 + * logical grouping across all requests from a particular client.
  114 + */
  115 + SrsKafkaString* client_id;
  116 +public:
  117 + SrsKafkaRequestHeader();
  118 + virtual ~SrsKafkaRequestHeader();
  119 +public:
  120 + /**
  121 + * the size of header, exclude the 4bytes size.
  122 + * @remark total_size = 4 + header_size + message_size.
  123 + */
  124 + virtual int header_size();
  125 + /**
  126 + * the size of message, the left bytes left after the header.
  127 + * @remark total_size = 4 + header_size + message_size.
  128 + */
  129 + virtual int message_size();
  130 + /**
  131 + * the total size of the request, 4bytes + size of header and message.
  132 + * @remark total_size = 4 + header_size + message_size.
  133 + */
  134 + virtual int total_size();
  135 +};
  136 +
  137 +/**
  138 + * the common kafka response.
  139 + * The response will always match the paired request (e.g. we will
  140 + * send a MetadataResponse in return to a MetadataRequest).
  141 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses
  142 + */
  143 +class SrsKafkaResponse
  144 +{
  145 +protected:
  146 + /**
  147 + * The server passes back whatever integer the client supplied as the correlation in the request.
  148 + */
  149 + int32_t correlation_id;
  150 +public:
  151 + SrsKafkaResponse();
  152 + virtual ~SrsKafkaResponse();
  153 +};
  154 +
  155 +/**
  156 + * the kafka message in message set.
  157 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
  158 + */
  159 +struct SrsKafkaMessage
  160 +{
  161 +// metadata.
  162 +public:
  163 + /**
  164 + * This is the offset used in kafka as the log sequence number. When the
  165 + * producer is sending messages it doesn't actually know the offset and
  166 + * can fill in any value here it likes.
  167 + */
  168 + int64_t offset;
  169 + /**
  170 + * the size of this message.
  171 + */
  172 + int32_t message_size;
  173 +// message.
  174 +public:
  175 + /**
  176 + * The CRC is the CRC32 of the remainder of the message bytes.
  177 + * This is used to check the integrity of the message on the broker and consumer.
  178 + */
  179 + int32_t crc;
  180 + /**
  181 + * This is a version id used to allow backwards compatible evolution
  182 + * of the message binary format. The current value is 0.
  183 + */
  184 + int8_t magic_byte;
  185 + /**
  186 + * This byte holds metadata attributes about the message.
  187 + * The lowest 2 bits contain the compression codec used
  188 + * for the message. The other bits should be set to 0.
  189 + */
  190 + int8_t attributes;
  191 + /**
  192 + * The key is an optional message key that was used for
  193 + * partition assignment. The key can be null.
  194 + */
  195 + SrsKafkaBytes* key;
  196 + /**
  197 + * The value is the actual message contents as an opaque byte array.
  198 + * Kafka supports recursive messages in which case this may itself
  199 + * contain a message set. The message can be null.
  200 + */
  201 + SrsKafkaBytes* value;
  202 +public:
  203 + SrsKafkaMessage();
  204 + virtual ~SrsKafkaMessage();
  205 +};
  206 +
  207 +/**
  208 + * a set of kafka message.
  209 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
  210 + */
  211 +class SrsKafkaMessageSet
  212 +{
  213 +private:
  214 + std::vector<SrsKafkaMessage*> messages;
  215 +public:
  216 + SrsKafkaMessageSet();
  217 + virtual ~SrsKafkaMessageSet();
  218 +};
  219 +
  220 +/**
  221 + * request the metadata from broker.
  222 + * This API answers the following questions:
  223 + * What topics exist?
  224 + * How many partitions does each topic have?
  225 + * Which broker is currently the leader for each partition?
  226 + * What is the host and port for each of these brokers?
  227 + * This is the only request that can be addressed to any broker in the cluster.
  228 + *
  229 + * Since there may be many topics the client can give an optional list of topic
  230 + * names in order to only return metadata for a subset of topics.
  231 + *
  232 + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
  233 + */
  234 +class SrsKafkaTopicMetadataRequest
  235 +{
  236 +public:
  237 +};
  238 +
32 #endif 239 #endif
33 240