winlin

add function to convert kafka array to vector

@@ -34,6 +34,7 @@ using namespace std; @@ -34,6 +34,7 @@ using namespace std;
34 #include <srs_kernel_utility.hpp> 34 #include <srs_kernel_utility.hpp>
35 #include <srs_kernel_balance.hpp> 35 #include <srs_kernel_balance.hpp>
36 #include <srs_kafka_stack.hpp> 36 #include <srs_kafka_stack.hpp>
  37 +#include <srs_core_autofree.hpp>
37 38
38 #ifdef SRS_AUTO_KAFKA 39 #ifdef SRS_AUTO_KAFKA
39 40
@@ -178,6 +179,14 @@ int SrsKafkaProducer::request_metadata() @@ -178,6 +179,14 @@ int SrsKafkaProducer::request_metadata()
178 srs_parse_endpoint(broker, server, port); 179 srs_parse_endpoint(broker, server, port);
179 } 180 }
180 181
  182 + std::string topic = _srs_config->get_kafka_topic();
  183 + if (true) {
  184 + std::string senabled = srs_bool2switch(enabled);
  185 + std::string sbrokers = srs_join_vector_string(brokers->args, ",");
  186 + srs_trace("kafka request enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s",
  187 + senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
  188 + }
  189 +
181 // connect to kafka server. 190 // connect to kafka server.
182 if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { 191 if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) {
183 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); 192 srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
@@ -185,19 +194,12 @@ int SrsKafkaProducer::request_metadata() @@ -185,19 +194,12 @@ int SrsKafkaProducer::request_metadata()
185 } 194 }
186 195
187 // do fetch medata from broker. 196 // do fetch medata from broker.
188 - std::string topic = _srs_config->get_kafka_topic();  
189 - if ((ret = kafka->fetch_metadata(topic)) != ERROR_SUCCESS) { 197 + SrsKafkaTopicMetadataResponse* metadata = NULL;
  198 + if ((ret = kafka->fetch_metadata(topic, &metadata)) != ERROR_SUCCESS) {
190 srs_error("kafka fetch metadata failed. ret=%d", ret); 199 srs_error("kafka fetch metadata failed. ret=%d", ret);
191 return ret; 200 return ret;
192 } 201 }
193 -  
194 - // log when completed.  
195 - if (true) {  
196 - std::string senabled = srs_bool2switch(enabled);  
197 - std::string sbrokers = srs_join_vector_string(brokers->args, ",");  
198 - srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s",  
199 - senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());  
200 - } 202 + SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
201 203
202 meatadata_ok = true; 204 meatadata_ok = true;
203 205
@@ -1051,8 +1051,10 @@ SrsKafkaClient::~SrsKafkaClient() @@ -1051,8 +1051,10 @@ SrsKafkaClient::~SrsKafkaClient()
1051 srs_freep(protocol); 1051 srs_freep(protocol);
1052 } 1052 }
1053 1053
1054 -int SrsKafkaClient::fetch_metadata(string topic) 1054 +int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** pmsg)
1055 { 1055 {
  1056 + *pmsg = NULL;
  1057 +
1056 int ret = ERROR_SUCCESS; 1058 int ret = ERROR_SUCCESS;
1057 1059
1058 SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); 1060 SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest();
@@ -1064,17 +1066,21 @@ int SrsKafkaClient::fetch_metadata(string topic) @@ -1064,17 +1066,21 @@ int SrsKafkaClient::fetch_metadata(string topic)
1064 return ret; 1066 return ret;
1065 } 1067 }
1066 1068
1067 - SrsKafkaResponse* res = NULL;  
1068 - if ((ret = protocol->recv_message(&res)) != ERROR_SUCCESS) { 1069 + if ((ret = protocol->expect_message(pmsg)) != ERROR_SUCCESS) {
1069 srs_error("kafka recv response failed. ret=%d", ret); 1070 srs_error("kafka recv response failed. ret=%d", ret);
1070 return ret; 1071 return ret;
1071 } 1072 }
1072 - SrsAutoFree(SrsKafkaResponse, res);  
1073 -  
1074 - // TODO: FIXME: implements it.  
1075 1073
1076 return ret; 1074 return ret;
1077 } 1075 }
1078 1076
  1077 +vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr)
  1078 +{
  1079 + vector<string> strs;
  1080 + for (int i = 0; i < arr->size(); i++) {
  1081 + }
  1082 + return strs;
  1083 +}
  1084 +
1079 #endif 1085 #endif
1080 1086
@@ -666,7 +666,7 @@ public: @@ -666,7 +666,7 @@ public:
666 */ 666 */
667 class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse 667 class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
668 { 668 {
669 -private: 669 +public:
670 SrsKafkaArray<SrsKafkaBroker> brokers; 670 SrsKafkaArray<SrsKafkaBroker> brokers;
671 SrsKafkaArray<SrsKafkaTopicMetadata> metadatas; 671 SrsKafkaArray<SrsKafkaTopicMetadata> metadatas;
672 public: 672 public:
@@ -741,6 +741,36 @@ public: @@ -741,6 +741,36 @@ public:
741 * @param pmsg output the received message. user must free it. 741 * @param pmsg output the received message. user must free it.
742 */ 742 */
743 virtual int recv_message(SrsKafkaResponse** pmsg); 743 virtual int recv_message(SrsKafkaResponse** pmsg);
  744 +public:
  745 + /**
  746 + * expect specified message.
  747 + */
  748 + template<typename T>
  749 + int expect_message(T** pmsg)
  750 + {
  751 + int ret = ERROR_SUCCESS;
  752 +
  753 + while (true) {
  754 + SrsKafkaResponse* res = NULL;
  755 + if ((ret = recv_message(&res)) != ERROR_SUCCESS) {
  756 + srs_error("recv response failed. ret=%d", ret);
  757 + return ret;
  758 + }
  759 +
  760 + // drop not matched.
  761 + T* msg = dynamic_cast<T*>(res);
  762 + if (!msg) {
  763 + srs_info("kafka drop response.");
  764 + srs_freep(res);
  765 + continue;
  766 + }
  767 +
  768 + *pmsg = msg;
  769 + break;
  770 + }
  771 +
  772 + return ret;
  773 + }
744 }; 774 };
745 775
746 /** 776 /**
@@ -757,9 +787,12 @@ public: @@ -757,9 +787,12 @@ public:
757 /** 787 /**
758 * fetch the metadata from broker for topic. 788 * fetch the metadata from broker for topic.
759 */ 789 */
760 - virtual int fetch_metadata(std::string topic); 790 + virtual int fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
761 }; 791 };
762 792
  793 +// convert kafka array[string] to vector[string]
  794 +extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr);
  795 +
763 #endif 796 #endif
764 797
765 #endif 798 #endif
@@ -997,7 +997,7 @@ public: @@ -997,7 +997,7 @@ public:
997 * for example: 997 * for example:
998 * SrsCommonMessage* msg = NULL; 998 * SrsCommonMessage* msg = NULL;
999 * SrsConnectAppResPacket* pkt = NULL; 999 * SrsConnectAppResPacket* pkt = NULL;
1000 - * if ((ret = server->expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) { 1000 + * if ((ret = server->expect_message<SrsConnectAppResPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
1001 * return ret; 1001 * return ret;
1002 * } 1002 * }
1003 * // use then free msg and pkt 1003 * // use then free msg and pkt