winlin

kafka refine array, to decode and create object.

@@ -272,6 +272,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -272,6 +272,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
272 #define ERROR_KAFKA_CODEC_BYTES 4031 272 #define ERROR_KAFKA_CODEC_BYTES 4031
273 #define ERROR_KAFKA_CODEC_REQUEST 4032 273 #define ERROR_KAFKA_CODEC_REQUEST 4032
274 #define ERROR_KAFKA_CODEC_RESPONSE 4033 274 #define ERROR_KAFKA_CODEC_RESPONSE 4033
  275 +#define ERROR_KAFKA_CODEC_ARRAY 4034
275 276
276 /////////////////////////////////////////////////////// 277 ///////////////////////////////////////////////////////
277 // HTTP API error. 278 // HTTP API error.
@@ -30,6 +30,7 @@ using namespace std; @@ -30,6 +30,7 @@ using namespace std;
30 #include <srs_core_autofree.hpp> 30 #include <srs_core_autofree.hpp>
31 #include <srs_kernel_log.hpp> 31 #include <srs_kernel_log.hpp>
32 #include <srs_protocol_io.hpp> 32 #include <srs_protocol_io.hpp>
  33 +#include <srs_protocol_stream.hpp>
33 34
34 #ifdef SRS_AUTO_KAFKA 35 #ifdef SRS_AUTO_KAFKA
35 36
@@ -682,10 +683,12 @@ SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) @@ -682,10 +683,12 @@ SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id)
682 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) 683 SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io)
683 { 684 {
684 skt = io; 685 skt = io;
  686 + reader = new SrsFastStream();
685 } 687 }
686 688
687 SrsKafkaProtocol::~SrsKafkaProtocol() 689 SrsKafkaProtocol::~SrsKafkaProtocol()
688 { 690 {
  691 + srs_freep(reader);
689 } 692 }
690 693
691 int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) 694 int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
@@ -733,6 +736,36 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) @@ -733,6 +736,36 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
733 return ret; 736 return ret;
734 } 737 }
735 738
  739 +int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg)
  740 +{
  741 + *pmsg = NULL;
  742 +
  743 + int ret = ERROR_SUCCESS;
  744 +
  745 + SrsKafkaResponseHeader header;
  746 + while (reader->size() < header.size()) {
  747 + if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) {
  748 + srs_error("kafka recv message failed. ret=%d", ret);
  749 + return ret;
  750 + }
  751 + }
  752 +
  753 + SrsBuffer buffer;
  754 + if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) {
  755 + return ret;
  756 + }
  757 +
  758 + SrsBuffer* buf = &buffer;
  759 + if ((ret = header.decode(buf)) != ERROR_SUCCESS) {
  760 + srs_error("kafka decode response header failed. ret=%d", ret);
  761 + return ret;
  762 + }
  763 +
  764 + // TODO: FIXME: decode message.
  765 +
  766 + return ret;
  767 +}
  768 +
736 SrsKafkaClient::SrsKafkaClient(ISrsProtocolReaderWriter* io) 769 SrsKafkaClient::SrsKafkaClient(ISrsProtocolReaderWriter* io)
737 { 770 {
738 protocol = new SrsKafkaProtocol(io); 771 protocol = new SrsKafkaProtocol(io);
@@ -756,6 +789,13 @@ int SrsKafkaClient::fetch_metadata(string topic) @@ -756,6 +789,13 @@ int SrsKafkaClient::fetch_metadata(string topic)
756 return ret; 789 return ret;
757 } 790 }
758 791
  792 + SrsKafkaResponse* res = NULL;
  793 + if ((ret = protocol->recv_message(&res)) != ERROR_SUCCESS) {
  794 + srs_error("kafka recv response failed. ret=%d", ret);
  795 + return ret;
  796 + }
  797 + SrsAutoFree(SrsKafkaResponse, res);
  798 +
759 // TODO: FIXME: implements it. 799 // TODO: FIXME: implements it.
760 800
761 return ret; 801 return ret;
@@ -35,7 +35,9 @@ @@ -35,7 +35,9 @@
35 35
36 #include <srs_kernel_buffer.hpp> 36 #include <srs_kernel_buffer.hpp>
37 #include <srs_kernel_error.hpp> 37 #include <srs_kernel_error.hpp>
  38 +#include <srs_kernel_log.hpp>
38 39
  40 +class SrsFastStream;
39 class ISrsProtocolReaderWriter; 41 class ISrsProtocolReaderWriter;
40 42
41 #ifdef SRS_AUTO_KAFKA 43 #ifdef SRS_AUTO_KAFKA
@@ -113,9 +115,9 @@ public: @@ -113,9 +115,9 @@ public:
113 * array of a structure foo as [foo]. 115 * array of a structure foo as [foo].
114 * 116 *
115 * Usage: 117 * Usage:
116 - * SrsKafkaArray<SrsKafkaBytes*> body; 118 + * SrsKafkaArray<SrsKafkaBytes> body;
117 * body.append(new SrsKafkaBytes()); 119 * body.append(new SrsKafkaBytes());
118 - * @remark the typename T must be a ISrsCodec* 120 + * @remark array elem is the T*, which must be ISrsCodec*
119 * 121 *
120 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests 122 * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
121 */ 123 */
@@ -123,9 +125,9 @@ template<typename T> @@ -123,9 +125,9 @@ template<typename T>
123 class SrsKafkaArray : public ISrsCodec 125 class SrsKafkaArray : public ISrsCodec
124 { 126 {
125 private: 127 private:
126 - int length;  
127 - std::vector<T> elems;  
128 - typedef typename std::vector<T>::iterator SrsIterator; 128 + int32_t length;
  129 + std::vector<T*> elems;
  130 + typedef typename std::vector<T*>::iterator SrsIterator;
129 public: 131 public:
130 SrsKafkaArray() 132 SrsKafkaArray()
131 { 133 {
@@ -134,13 +136,13 @@ public: @@ -134,13 +136,13 @@ public:
134 virtual ~SrsKafkaArray() 136 virtual ~SrsKafkaArray()
135 { 137 {
136 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { 138 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
137 - T elem = *it; 139 + T* elem = *it;
138 srs_freep(elem); 140 srs_freep(elem);
139 } 141 }
140 elems.clear(); 142 elems.clear();
141 } 143 }
142 public: 144 public:
143 - virtual void append(T elem) 145 + virtual void append(T* elem)
144 { 146 {
145 length++; 147 length++;
146 elems.push_back(elem); 148 elems.push_back(elem);
@@ -149,10 +151,10 @@ public: @@ -149,10 +151,10 @@ public:
149 public: 151 public:
150 virtual int size() 152 virtual int size()
151 { 153 {
152 - int s = 0; 154 + int s = 4;
153 155
154 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { 156 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
155 - T elem = *it; 157 + T* elem = *it;
156 s += elem->size(); 158 s += elem->size();
157 } 159 }
158 160
@@ -162,9 +164,17 @@ public: @@ -162,9 +164,17 @@ public:
162 { 164 {
163 int ret = ERROR_SUCCESS; 165 int ret = ERROR_SUCCESS;
164 166
  167 + if (!buf->require(4)) {
  168 + ret = ERROR_KAFKA_CODEC_ARRAY;
  169 + srs_error("kafka encode array failed. ret=%d", ret);
  170 + return ret;
  171 + }
  172 + buf->write_4bytes(length);
  173 +
165 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { 174 for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
166 - T elem = *it; 175 + T* elem = *it;
167 if ((ret = elem->encode(buf)) != ERROR_SUCCESS) { 176 if ((ret = elem->encode(buf)) != ERROR_SUCCESS) {
  177 + srs_error("kafka encode array elem failed. ret=%d", ret);
168 return ret; 178 return ret;
169 } 179 }
170 } 180 }
@@ -175,11 +185,22 @@ public: @@ -175,11 +185,22 @@ public:
175 { 185 {
176 int ret = ERROR_SUCCESS; 186 int ret = ERROR_SUCCESS;
177 187
178 - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {  
179 - T elem = *it; 188 + if (!buf->require(4)) {
  189 + ret = ERROR_KAFKA_CODEC_ARRAY;
  190 + srs_error("kafka decode array failed. ret=%d", ret);
  191 + return ret;
  192 + }
  193 + length = buf->read_2bytes();
  194 +
  195 + for (int i = 0; i < length; i++) {
  196 + T* elem = new T();
180 if ((ret = elem->decode(buf)) != ERROR_SUCCESS) { 197 if ((ret = elem->decode(buf)) != ERROR_SUCCESS) {
  198 + srs_error("kafka decode array elem failed. ret=%d", ret);
  199 + srs_freep(elem);
181 return ret; 200 return ret;
182 } 201 }
  202 +
  203 + elems.push_back(elem);
183 } 204 }
184 205
185 return ret; 206 return ret;
@@ -493,7 +514,7 @@ public: @@ -493,7 +514,7 @@ public:
493 class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest 514 class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest
494 { 515 {
495 private: 516 private:
496 - SrsKafkaArray<SrsKafkaString*> topics; 517 + SrsKafkaArray<SrsKafkaString> topics;
497 public: 518 public:
498 SrsKafkaTopicMetadataRequest(); 519 SrsKafkaTopicMetadataRequest();
499 virtual ~SrsKafkaTopicMetadataRequest(); 520 virtual ~SrsKafkaTopicMetadataRequest();
@@ -559,6 +580,7 @@ class SrsKafkaProtocol @@ -559,6 +580,7 @@ class SrsKafkaProtocol
559 { 580 {
560 private: 581 private:
561 ISrsProtocolReaderWriter* skt; 582 ISrsProtocolReaderWriter* skt;
  583 + SrsFastStream* reader;
562 public: 584 public:
563 SrsKafkaProtocol(ISrsProtocolReaderWriter* io); 585 SrsKafkaProtocol(ISrsProtocolReaderWriter* io);
564 virtual ~SrsKafkaProtocol(); 586 virtual ~SrsKafkaProtocol();
@@ -568,6 +590,11 @@ public: @@ -568,6 +590,11 @@ public:
568 * @param msg the msg to send. user must not free it again. 590 * @param msg the msg to send. user must not free it again.
569 */ 591 */
570 virtual int send_and_free_message(SrsKafkaRequest* msg); 592 virtual int send_and_free_message(SrsKafkaRequest* msg);
  593 + /**
  594 + * read the message from kafka server.
  595 + * @param pmsg output the received message. user must free it.
  596 + */
  597 + virtual int recv_message(SrsKafkaResponse** pmsg);
571 }; 598 };
572 599
573 /** 600 /**