winlin

kafka convert json to producer message.

@@ -37,6 +37,8 @@ using namespace std; @@ -37,6 +37,8 @@ using namespace std;
37 37
38 #ifdef SRS_AUTO_KAFKA 38 #ifdef SRS_AUTO_KAFKA
39 39
  40 +#define SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS 300000
  41 +
40 SrsKafkaString::SrsKafkaString() 42 SrsKafkaString::SrsKafkaString()
41 { 43 {
42 _size = -1; 44 _size = -1;
@@ -45,11 +47,10 @@ SrsKafkaString::SrsKafkaString() @@ -45,11 +47,10 @@ SrsKafkaString::SrsKafkaString()
45 47
46 SrsKafkaString::SrsKafkaString(string v) 48 SrsKafkaString::SrsKafkaString(string v)
47 { 49 {
48 - _size = (int16_t)v.length(); 50 + _size = -1;
  51 + data = NULL;
49 52
50 - srs_assert(_size > 0);  
51 - data = new char[_size];  
52 - memcpy(data, v.data(), _size); 53 + set_value(v);
53 } 54 }
54 55
55 SrsKafkaString::~SrsKafkaString() 56 SrsKafkaString::~SrsKafkaString()
@@ -76,6 +77,19 @@ string SrsKafkaString::to_str() @@ -76,6 +77,19 @@ string SrsKafkaString::to_str()
76 return ret; 77 return ret;
77 } 78 }
78 79
  80 +void SrsKafkaString::set_value(string v)
  81 +{
  82 + // free previous data.
  83 + srs_freep(data);
  84 +
  85 + // copy new value to data.
  86 + _size = (int16_t)v.length();
  87 +
  88 + srs_assert(_size > 0);
  89 + data = new char[_size];
  90 + memcpy(data, v.data(), _size);
  91 +}
  92 +
79 int SrsKafkaString::nb_bytes() 93 int SrsKafkaString::nb_bytes()
80 { 94 {
81 return _size == -1? 2 : 2 + _size; 95 return _size == -1? 2 : 2 + _size;
@@ -149,11 +163,10 @@ SrsKafkaBytes::SrsKafkaBytes() @@ -149,11 +163,10 @@ SrsKafkaBytes::SrsKafkaBytes()
149 163
150 SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) 164 SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
151 { 165 {
152 - _size = (int16_t)nb_v; 166 + _size = -1;
  167 + data = NULL;
153 168
154 - srs_assert(_size > 0);  
155 - data = new char[_size];  
156 - memcpy(data, v, _size); 169 + set_value(v, nb_v);
157 } 170 }
158 171
159 SrsKafkaBytes::~SrsKafkaBytes() 172 SrsKafkaBytes::~SrsKafkaBytes()
@@ -171,6 +184,24 @@ bool SrsKafkaBytes::empty() @@ -171,6 +184,24 @@ bool SrsKafkaBytes::empty()
171 return _size <= 0; 184 return _size <= 0;
172 } 185 }
173 186
  187 +void SrsKafkaBytes::set_value(string v)
  188 +{
  189 + set_value(v.data(), (int)v.length());
  190 +}
  191 +
  192 +void SrsKafkaBytes::set_value(const char* v, int nb_v)
  193 +{
  194 + // free previous data.
  195 + srs_freep(data);
  196 +
  197 + // copy new value to data.
  198 + _size = (int16_t)nb_v;
  199 +
  200 + srs_assert(_size > 0);
  201 + data = new char[_size];
  202 + memcpy(data, v, _size);
  203 +}
  204 +
174 int SrsKafkaBytes::nb_bytes() 205 int SrsKafkaBytes::nb_bytes()
175 { 206 {
176 return 4 + (_size == -1? 0 : _size); 207 return 4 + (_size == -1? 0 : _size);
@@ -479,6 +510,32 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage() @@ -479,6 +510,32 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage()
479 srs_freep(value); 510 srs_freep(value);
480 } 511 }
481 512
  513 +int SrsKafkaRawMessage::create(SrsJsonObject* obj)
  514 +{
  515 + int ret = ERROR_SUCCESS;
  516 +
  517 + // current must be 0.
  518 + magic_byte = 0;
  519 +
  520 + // no compression codec.
  521 + attributes = 0;
  522 +
  523 + // dumps the json to string.
  524 + value->set_value(obj->dumps());
  525 +
  526 + // TODO: FIXME: implements it.
  527 + crc = 0;
  528 +
  529 + message_size = raw_message_size();
  530 +
  531 + return ret;
  532 +}
  533 +
  534 +int SrsKafkaRawMessage::raw_message_size()
  535 +{
  536 + return 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
  537 +}
  538 +
482 int SrsKafkaRawMessage::nb_bytes() 539 int SrsKafkaRawMessage::nb_bytes()
483 { 540 {
484 return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); 541 return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
@@ -1352,6 +1409,42 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** @@ -1352,6 +1409,42 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse**
1352 int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<SrsJsonObject*>& msgs) 1409 int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<SrsJsonObject*>& msgs)
1353 { 1410 {
1354 int ret = ERROR_SUCCESS; 1411 int ret = ERROR_SUCCESS;
  1412 +
  1413 + SrsKafkaProducerRequest* req = new SrsKafkaProducerRequest();
  1414 +
  1415 + // 0 the server will not send any response.
  1416 + req->required_acks = 0;
  1417 + // timeout of producer message.
  1418 + req->timeout = SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS;
  1419 +
  1420 + // create the topic and partition to write message to.
  1421 + SrsKafkaProducerTopicMessages* topics = new SrsKafkaProducerTopicMessages();
  1422 + SrsKafkaProducerPartitionMessages* partitions = new SrsKafkaProducerPartitionMessages();
  1423 +
  1424 + topics->partitions.append(partitions);
  1425 + req->topics.append(topics);
  1426 +
  1427 + topics->topic_name.set_value(topic);
  1428 + partitions->partition = partition;
  1429 +
  1430 + // convert json objects to kafka raw messages.
  1431 + vector<SrsJsonObject*>::iterator it;
  1432 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  1433 + SrsJsonObject* obj = *it;
  1434 + SrsKafkaRawMessage* msg = new SrsKafkaRawMessage();
  1435 +
  1436 + if ((ret = msg->create(obj)) != ERROR_SUCCESS) {
  1437 + srs_freep(msg);
  1438 + srs_freep(req);
  1439 + srs_error("kafka write messages failed. ret=%d", ret);
  1440 + return ret;
  1441 + }
  1442 +
  1443 + partitions->messages.append(msg);
  1444 + }
  1445 +
  1446 + partitions->message_set_size = partitions->messages.nb_bytes();
  1447 +
1355 // TODO: FIXME: implements it. 1448 // TODO: FIXME: implements it.
1356 return ret; 1449 return ret;
1357 } 1450 }
@@ -79,6 +79,7 @@ public: @@ -79,6 +79,7 @@ public:
79 virtual bool null(); 79 virtual bool null();
80 virtual bool empty(); 80 virtual bool empty();
81 virtual std::string to_str(); 81 virtual std::string to_str();
  82 + virtual void set_value(std::string v);
82 // interface ISrsCodec 83 // interface ISrsCodec
83 public: 84 public:
84 virtual int nb_bytes(); 85 virtual int nb_bytes();
@@ -103,6 +104,8 @@ public: @@ -103,6 +104,8 @@ public:
103 public: 104 public:
104 virtual bool null(); 105 virtual bool null();
105 virtual bool empty(); 106 virtual bool empty();
  107 + virtual void set_value(std::string v);
  108 + virtual void set_value(const char* v, int nb_v);
106 // interface ISrsCodec 109 // interface ISrsCodec
107 public: 110 public:
108 virtual int nb_bytes(); 111 virtual int nb_bytes();
@@ -531,6 +534,16 @@ public: @@ -531,6 +534,16 @@ public:
531 public: 534 public:
532 SrsKafkaRawMessage(); 535 SrsKafkaRawMessage();
533 virtual ~SrsKafkaRawMessage(); 536 virtual ~SrsKafkaRawMessage();
  537 +public:
  538 + /**
  539 + * create message from json object.
  540 + */
  541 + virtual int create(SrsJsonObject* obj);
  542 +private:
  543 + /**
  544 + * get the raw message, bytes after the message_size.
  545 + */
  546 + virtual int raw_message_size();
534 // interface ISrsCodec 547 // interface ISrsCodec
535 public: 548 public:
536 virtual int nb_bytes(); 549 virtual int nb_bytes();
@@ -768,7 +781,7 @@ public: @@ -768,7 +781,7 @@ public:
768 */ 781 */
769 class SrsKafkaProducerRequest : public SrsKafkaRequest 782 class SrsKafkaProducerRequest : public SrsKafkaRequest
770 { 783 {
771 -private: 784 +public:
772 /** 785 /**
773 * This field indicates how many acknowledgements the servers should receive 786 * This field indicates how many acknowledgements the servers should receive
774 * before responding to the request. If it is 0 the server will not send any 787 * before responding to the request. If it is 0 the server will not send any