winlin

support show the summary of kafka metadata.

@@ -2128,6 +2128,8 @@ int SrsConfig::global_to_json(SrsJsonObject* obj) @@ -2128,6 +2128,8 @@ int SrsConfig::global_to_json(SrsJsonObject* obj)
2128 sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); 2128 sobj->set(sdir->name, sdir->dumps_arg0_to_boolean());
2129 } else if (sdir->name == "brokers") { 2129 } else if (sdir->name == "brokers") {
2130 sobj->set(sdir->name, sdir->dumps_args()); 2130 sobj->set(sdir->name, sdir->dumps_args());
  2131 + } else if (sdir->name == "topic") {
  2132 + sobj->set(sdir->name, sdir->dumps_arg0_to_str());
2131 } 2133 }
2132 } 2134 }
2133 obj->set(dir->name, sobj); 2135 obj->set(dir->name, sobj);
@@ -3546,7 +3548,7 @@ int SrsConfig::check_config() @@ -3546,7 +3548,7 @@ int SrsConfig::check_config()
3546 SrsConfDirective* conf = root->get("kafka"); 3548 SrsConfDirective* conf = root->get("kafka");
3547 for (int i = 0; conf && i < (int)conf->directives.size(); i++) { 3549 for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
3548 string n = conf->at(i)->name; 3550 string n = conf->at(i)->name;
3549 - if (n != "enabled" && n != "brokers") { 3551 + if (n != "enabled" && n != "brokers" && n != "topic") {
3550 ret = ERROR_SYSTEM_CONFIG_INVALID; 3552 ret = ERROR_SYSTEM_CONFIG_INVALID;
3551 srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret); 3553 srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret);
3552 return ret; 3554 return ret;
@@ -32,6 +32,7 @@ using namespace std; @@ -32,6 +32,7 @@ using namespace std;
32 #include <srs_app_async_call.hpp> 32 #include <srs_app_async_call.hpp>
33 #include <srs_app_utility.hpp> 33 #include <srs_app_utility.hpp>
34 #include <srs_kernel_utility.hpp> 34 #include <srs_kernel_utility.hpp>
  35 +#include <srs_protocol_utility.hpp>
35 #include <srs_kernel_balance.hpp> 36 #include <srs_kernel_balance.hpp>
36 #include <srs_kafka_stack.hpp> 37 #include <srs_kafka_stack.hpp>
37 #include <srs_core_autofree.hpp> 38 #include <srs_core_autofree.hpp>
@@ -201,6 +202,48 @@ int SrsKafkaProducer::request_metadata() @@ -201,6 +202,48 @@ int SrsKafkaProducer::request_metadata()
201 } 202 }
202 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); 203 SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
203 204
  205 + // show kafka metadata.
  206 + string summary;
  207 + if (true) {
  208 + vector<string> bs;
  209 + for (int i = 0; i < metadata->brokers.size(); i++) {
  210 + SrsKafkaBroker* broker = metadata->brokers.at(i);
  211 +
  212 + string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str();
  213 + if (broker->port > 0) {
  214 + hostport += ":" + srs_int2str(broker->port);
  215 + }
  216 +
  217 + bs.push_back(hostport);
  218 + }
  219 +
  220 + vector<string> ps;
  221 + for (int i = 0; i < metadata->metadatas.size(); i++) {
  222 + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
  223 +
  224 + string desc = "topic=" + topic->name.to_str();
  225 +
  226 + for (int j = 0; j < topic->metadatas.size(); j++) {
  227 + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
  228 +
  229 + desc += ", partition" + srs_int2str(partition->partition_id) +"=";
  230 + desc += srs_int2str(partition->leader) + "/";
  231 +
  232 + vector<string> replicas = srs_kafka_array2vector(&partition->replicas);
  233 + desc += srs_join_vector_string(replicas, ",");
  234 + }
  235 +
  236 + ps.push_back(desc);
  237 + }
  238 +
  239 + std::stringstream ss;
  240 + ss << "brokers=" << srs_join_vector_string(bs, ",");
  241 + ss << ", " << srs_join_vector_string(ps, ",");
  242 +
  243 + summary = ss.str();
  244 + }
  245 + srs_trace("kafka metadata: %s", summary.c_str());
  246 +
204 meatadata_ok = true; 247 meatadata_ok = true;
205 248
206 return ret; 249 return ret;
@@ -1467,17 +1467,3 @@ void srs_api_dump_summaries(SrsJsonObject* obj) @@ -1467,17 +1467,3 @@ void srs_api_dump_summaries(SrsJsonObject* obj)
1467 sys->set("conn_srs", SrsJsonAny::integer(nrs->nb_conn_srs)); 1467 sys->set("conn_srs", SrsJsonAny::integer(nrs->nb_conn_srs));
1468 } 1468 }
1469 1469
1470 -string srs_join_vector_string(vector<string>& vs, string separator)  
1471 -{  
1472 - string str = "";  
1473 -  
1474 - for (int i = 0; i < (int)vs.size(); i++) {  
1475 - str += vs.at(i);  
1476 - if (i != (int)vs.size() - 1) {  
1477 - str += separator;  
1478 - }  
1479 - }  
1480 -  
1481 - return str;  
1482 -}  
1483 -  
@@ -677,8 +677,5 @@ extern bool srs_is_boolean(const std::string& str); @@ -677,8 +677,5 @@ extern bool srs_is_boolean(const std::string& str);
677 // dump summaries for /api/v1/summaries. 677 // dump summaries for /api/v1/summaries.
678 extern void srs_api_dump_summaries(SrsJsonObject* obj); 678 extern void srs_api_dump_summaries(SrsJsonObject* obj);
679 679
680 -// join string in vector with indicated separator  
681 -extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);  
682 -  
683 #endif 680 #endif
684 681
@@ -23,7 +23,7 @@ @@ -23,7 +23,7 @@
23 23
24 #include <srs_kafka_stack.hpp> 24 #include <srs_kafka_stack.hpp>
25 25
26 -#include <string> 26 +#include <sstream>
27 using namespace std; 27 using namespace std;
28 28
29 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
@@ -31,6 +31,8 @@ using namespace std; @@ -31,6 +31,8 @@ using namespace std;
31 #include <srs_kernel_log.hpp> 31 #include <srs_kernel_log.hpp>
32 #include <srs_protocol_io.hpp> 32 #include <srs_protocol_io.hpp>
33 #include <srs_protocol_stream.hpp> 33 #include <srs_protocol_stream.hpp>
  34 +#include <srs_kernel_utility.hpp>
  35 +#include <srs_protocol_utility.hpp>
34 36
35 #ifdef SRS_AUTO_KAFKA 37 #ifdef SRS_AUTO_KAFKA
36 38
@@ -64,6 +66,15 @@ bool SrsKafkaString::empty() @@ -64,6 +66,15 @@ bool SrsKafkaString::empty()
64 return _size <= 0; 66 return _size <= 0;
65 } 67 }
66 68
  69 +string SrsKafkaString::to_str()
  70 +{
  71 + string ret;
  72 + if (_size > 0) {
  73 + ret.append(data, _size);
  74 + }
  75 + return ret;
  76 +}
  77 +
67 int SrsKafkaString::nb_bytes() 78 int SrsKafkaString::nb_bytes()
68 { 79 {
69 return _size == -1? 2 : 2 + _size; 80 return _size == -1? 2 : 2 + _size;
@@ -1077,8 +1088,24 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** @@ -1077,8 +1088,24 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse**
1077 vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr) 1088 vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr)
1078 { 1089 {
1079 vector<string> strs; 1090 vector<string> strs;
1080 - for (int i = 0; i < arr->nb_bytes(); i++) { 1091 +
  1092 + for (int i = 0; i < arr->size(); i++) {
  1093 + SrsKafkaString* elem = arr->at(i);
  1094 + strs.push_back(elem->to_str());
1081 } 1095 }
  1096 +
  1097 + return strs;
  1098 +}
  1099 +
  1100 +vector<string> srs_kafka_array2vector(SrsKafkaArray<int32_t>* arr)
  1101 +{
  1102 + vector<string> strs;
  1103 +
  1104 + for (int i = 0; i < arr->size(); i++) {
  1105 + int32_t elem = arr->at(i);
  1106 + strs.push_back(srs_int2str(elem));
  1107 + }
  1108 +
1082 return strs; 1109 return strs;
1083 } 1110 }
1084 1111
@@ -77,6 +77,7 @@ public: @@ -77,6 +77,7 @@ public:
77 public: 77 public:
78 virtual bool null(); 78 virtual bool null();
79 virtual bool empty(); 79 virtual bool empty();
  80 + virtual std::string to_str();
80 // interface ISrsCodec 81 // interface ISrsCodec
81 public: 82 public:
82 virtual int nb_bytes(); 83 virtual int nb_bytes();
@@ -147,6 +148,14 @@ public: @@ -147,6 +148,14 @@ public:
147 length++; 148 length++;
148 elems.push_back(elem); 149 elems.push_back(elem);
149 } 150 }
  151 + virtual int size()
  152 + {
  153 + return length;
  154 + }
  155 + virtual T* at(int index)
  156 + {
  157 + return elems.at(index);
  158 + }
150 // interface ISrsCodec 159 // interface ISrsCodec
151 public: 160 public:
152 virtual int nb_bytes() 161 virtual int nb_bytes()
@@ -228,6 +237,14 @@ public: @@ -228,6 +237,14 @@ public:
228 length++; 237 length++;
229 elems.push_back(elem); 238 elems.push_back(elem);
230 } 239 }
  240 + virtual int size()
  241 + {
  242 + return length;
  243 + }
  244 + virtual int32_t at(int index)
  245 + {
  246 + return elems.at(index);
  247 + }
231 // interface ISrsCodec 248 // interface ISrsCodec
232 public: 249 public:
233 virtual int nb_bytes() 250 virtual int nb_bytes()
@@ -792,6 +809,7 @@ public: @@ -792,6 +809,7 @@ public:
792 809
793 // convert kafka array[string] to vector[string] 810 // convert kafka array[string] to vector[string]
794 extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr); 811 extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr);
  812 +extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<int32_t>* arr);
795 813
796 #endif 814 #endif
797 815
@@ -312,3 +312,17 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s @@ -312,3 +312,17 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s
312 return ret; 312 return ret;
313 } 313 }
314 314
  315 +string srs_join_vector_string(vector<string>& vs, string separator)
  316 +{
  317 + string str = "";
  318 +
  319 + for (int i = 0; i < (int)vs.size(); i++) {
  320 + str += vs.at(i);
  321 + if (i != (int)vs.size() - 1) {
  322 + str += separator;
  323 + }
  324 + }
  325 +
  326 + return str;
  327 +}
  328 +
@@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #endif 35 #endif
36 36
37 #include <string> 37 #include <string>
  38 +#include <vector>
38 39
39 #include <srs_kernel_consts.hpp> 40 #include <srs_kernel_consts.hpp>
40 41
@@ -130,5 +131,8 @@ extern int srs_write_large_iovs( @@ -130,5 +131,8 @@ extern int srs_write_large_iovs(
130 ssize_t* pnwrite = NULL 131 ssize_t* pnwrite = NULL
131 ); 132 );
132 133
  134 +// join string in vector with indicated separator
  135 +extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);
  136 +
133 #endif 137 #endif
134 138