winlin

kafka use topic and partition cache

@@ -146,7 +146,17 @@ string SrsKafkaPartition::hostport() @@ -146,7 +146,17 @@ string SrsKafkaPartition::hostport()
146 return ep; 146 return ep;
147 } 147 }
148 148
149 -SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, string i) 149 +SrsKafkaMessage::SrsKafkaMessage(int k)
  150 +{
  151 + key = k;
  152 +}
  153 +
  154 +SrsKafkaMessage::~SrsKafkaMessage()
  155 +{
  156 +}
  157 +
  158 +SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, string i)
  159 + : SrsKafkaMessage(k)
150 { 160 {
151 producer = p; 161 producer = p;
152 type = t; 162 type = t;
@@ -165,7 +175,7 @@ int SrsKafkaMessageOnClient::call() @@ -165,7 +175,7 @@ int SrsKafkaMessageOnClient::call()
165 obj->set("type", SrsJsonAny::integer(type)); 175 obj->set("type", SrsJsonAny::integer(type));
166 obj->set("ip", SrsJsonAny::str(ip.c_str())); 176 obj->set("ip", SrsJsonAny::str(ip.c_str()));
167 177
168 - return producer->send(obj); 178 + return producer->send(key, obj);
169 } 179 }
170 180
171 string SrsKafkaMessageOnClient::to_string() 181 string SrsKafkaMessageOnClient::to_string()
@@ -173,6 +183,87 @@ string SrsKafkaMessageOnClient::to_string() @@ -173,6 +183,87 @@ string SrsKafkaMessageOnClient::to_string()
173 return ip; 183 return ip;
174 } 184 }
175 185
  186 +SrsKafkaCache::SrsKafkaCache()
  187 +{
  188 + count = 0;
  189 + nb_partitions = 0;
  190 +}
  191 +
  192 +SrsKafkaCache::~SrsKafkaCache()
  193 +{
  194 + map<int32_t, SrsKafkaPartitionCache*>::iterator it;
  195 + for (it = cache.begin(); it != cache.end(); ++it) {
  196 + SrsKafkaPartitionCache* pc = it->second;
  197 +
  198 + for (vector<SrsJsonObject*>::iterator it2 = pc->begin(); it2 != pc->end(); ++it2) {
  199 + SrsJsonObject* obj = *it2;
  200 + srs_freep(obj);
  201 + }
  202 + pc->clear();
  203 +
  204 + srs_freep(pc);
  205 + }
  206 + cache.clear();
  207 +}
  208 +
  209 +void SrsKafkaCache::append(int key, SrsJsonObject* obj)
  210 +{
  211 + count++;
  212 +
  213 + int partition = 0;
  214 + if (nb_partitions > 0) {
  215 + partition = key % nb_partitions;
  216 + }
  217 +
  218 + SrsKafkaPartitionCache* pc = NULL;
  219 + map<int32_t, SrsKafkaPartitionCache*>::iterator it = cache.find(partition);
  220 + if (it == cache.end()) {
  221 + pc = new SrsKafkaPartitionCache();
  222 + cache[partition] = pc;
  223 + } else {
  224 + pc = it->second;
  225 + }
  226 +
  227 + pc->push_back(obj);
  228 +}
  229 +
  230 +int SrsKafkaCache::size()
  231 +{
  232 + return count;
  233 +}
  234 +
  235 +bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc)
  236 +{
  237 + map<int32_t, SrsKafkaPartitionCache*>::iterator it;
  238 + for (it = cache.begin(); it != cache.end(); ++it) {
  239 + int32_t key = it->first;
  240 + SrsKafkaPartitionCache* pc = it->second;
  241 +
  242 + if (!pc->empty()) {
  243 + *pkey = (int)key;
  244 + *ppc = pc;
  245 + return true;
  246 + }
  247 + }
  248 +
  249 + return false;
  250 +}
  251 +
  252 +int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
  253 +{
  254 + int ret = ERROR_SUCCESS;
  255 + // TODO: FIXME: implements it.
  256 + return ret;
  257 +}
  258 +
  259 +ISrsKafkaCluster::ISrsKafkaCluster()
  260 +{
  261 +}
  262 +
  263 +ISrsKafkaCluster::~ISrsKafkaCluster()
  264 +{
  265 +}
  266 +
176 SrsKafkaProducer::SrsKafkaProducer() 267 SrsKafkaProducer::SrsKafkaProducer()
177 { 268 {
178 metadata_ok = false; 269 metadata_ok = false;
@@ -181,6 +272,7 @@ SrsKafkaProducer::SrsKafkaProducer() @@ -181,6 +272,7 @@ SrsKafkaProducer::SrsKafkaProducer()
181 lock = st_mutex_new(); 272 lock = st_mutex_new();
182 pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000); 273 pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000);
183 worker = new SrsAsyncCallWorker(); 274 worker = new SrsAsyncCallWorker();
  275 + cache = new SrsKafkaCache();
184 276
185 lb = new SrsLbRoundRobin(); 277 lb = new SrsLbRoundRobin();
186 transport = new SrsTcpClient(); 278 transport = new SrsTcpClient();
@@ -189,12 +281,7 @@ SrsKafkaProducer::SrsKafkaProducer() @@ -189,12 +281,7 @@ SrsKafkaProducer::SrsKafkaProducer()
189 281
190 SrsKafkaProducer::~SrsKafkaProducer() 282 SrsKafkaProducer::~SrsKafkaProducer()
191 { 283 {
192 - vector<SrsKafkaPartition*>::iterator it;  
193 - for (it = partitions.begin(); it != partitions.end(); ++it) {  
194 - SrsKafkaPartition* partition = *it;  
195 - srs_freep(partition);  
196 - }  
197 - partitions.clear(); 284 + clear_metadata();
198 285
199 srs_freep(lb); 286 srs_freep(lb);
200 srs_freep(kafka); 287 srs_freep(kafka);
@@ -202,6 +289,7 @@ SrsKafkaProducer::~SrsKafkaProducer() @@ -202,6 +289,7 @@ SrsKafkaProducer::~SrsKafkaProducer()
202 289
203 srs_freep(worker); 290 srs_freep(worker);
204 srs_freep(pthread); 291 srs_freep(pthread);
  292 + srs_freep(cache);
205 293
206 st_mutex_destroy(lock); 294 st_mutex_destroy(lock);
207 st_cond_destroy(metadata_expired); 295 st_cond_destroy(metadata_expired);
@@ -240,26 +328,26 @@ void SrsKafkaProducer::stop() @@ -240,26 +328,26 @@ void SrsKafkaProducer::stop()
240 worker->stop(); 328 worker->stop();
241 } 329 }
242 330
243 -int SrsKafkaProducer::on_client(SrsListenerType type, st_netfd_t stfd) 331 +int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
244 { 332 {
245 - return worker->execute(new SrsKafkaMessageOnClient(this, type, srs_get_peer_ip(st_netfd_fileno(stfd)))); 333 + return worker->execute(new SrsKafkaMessageOnClient(this, key, type, ip));
246 } 334 }
247 335
248 -int SrsKafkaProducer::send(SrsJsonObject* obj) 336 +int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
249 { 337 {
250 int ret = ERROR_SUCCESS; 338 int ret = ERROR_SUCCESS;
251 339
252 // cache the json object. 340 // cache the json object.
253 - objects.push_back(obj); 341 + cache->append(key, obj);
254 342
255 // too few messages, ignore. 343 // too few messages, ignore.
256 - if (objects.size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { 344 + if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
257 return ret; 345 return ret;
258 } 346 }
259 347
260 // too many messages, warn user. 348 // too many messages, warn user.
261 - if (objects.size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {  
262 - srs_warn("kafka cache too many messages: %d", objects.size()); 349 + if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {
  350 + srs_warn("kafka cache too many messages: %d", cache->size());
263 } 351 }
264 352
265 // sync with backgound metadata worker. 353 // sync with backgound metadata worker.
@@ -307,6 +395,18 @@ int SrsKafkaProducer::on_end_cycle() @@ -307,6 +395,18 @@ int SrsKafkaProducer::on_end_cycle()
307 return ERROR_SUCCESS; 395 return ERROR_SUCCESS;
308 } 396 }
309 397
  398 +void SrsKafkaProducer::clear_metadata()
  399 +{
  400 + vector<SrsKafkaPartition*>::iterator it;
  401 +
  402 + for (it = partitions.begin(); it != partitions.end(); ++it) {
  403 + SrsKafkaPartition* partition = *it;
  404 + srs_freep(partition);
  405 + }
  406 +
  407 + partitions.clear();
  408 +}
  409 +
310 int SrsKafkaProducer::do_cycle() 410 int SrsKafkaProducer::do_cycle()
311 { 411 {
312 int ret = ERROR_SUCCESS; 412 int ret = ERROR_SUCCESS;
@@ -381,6 +481,9 @@ int SrsKafkaProducer::request_metadata() @@ -381,6 +481,9 @@ int SrsKafkaProducer::request_metadata()
381 srs_kafka_metadata2connector(metadata, partitions); 481 srs_kafka_metadata2connector(metadata, partitions);
382 srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); 482 srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str());
383 483
  484 + // update the total partition for cache.
  485 + cache->nb_partitions = (int)partitions.size();
  486 +
384 metadata_ok = true; 487 metadata_ok = true;
385 488
386 return ret; 489 return ret;
@@ -388,6 +491,8 @@ int SrsKafkaProducer::request_metadata() @@ -388,6 +491,8 @@ int SrsKafkaProducer::request_metadata()
388 491
389 void SrsKafkaProducer::refresh_metadata() 492 void SrsKafkaProducer::refresh_metadata()
390 { 493 {
  494 + clear_metadata();
  495 +
391 metadata_ok = false; 496 metadata_ok = false;
392 st_cond_signal(metadata_expired); 497 st_cond_signal(metadata_expired);
393 srs_trace("kafka async refresh metadata in background"); 498 srs_trace("kafka async refresh metadata in background");
@@ -396,7 +501,26 @@ void SrsKafkaProducer::refresh_metadata() @@ -396,7 +501,26 @@ void SrsKafkaProducer::refresh_metadata()
396 int SrsKafkaProducer::flush() 501 int SrsKafkaProducer::flush()
397 { 502 {
398 int ret = ERROR_SUCCESS; 503 int ret = ERROR_SUCCESS;
399 - // TODO: FIXME: implements it. 504 +
  505 + // flush all available partition caches.
  506 + while (true) {
  507 + int key = 0;
  508 + SrsKafkaPartitionCache* pc = NULL;
  509 +
  510 + // all flushed, or no kafka partition to write to.
  511 + if (!cache->fetch(&key, &pc) || partitions.empty()) {
  512 + break;
  513 + }
  514 +
  515 + // flush specified partition.
  516 + srs_assert(key && pc);
  517 + SrsKafkaPartition* partition = partitions.at(key % partitions.size());
  518 + if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) {
  519 + srs_error("flush partition failed. ret=%d", ret);
  520 + return ret;
  521 + }
  522 + }
  523 +
400 return ret; 524 return ret;
401 } 525 }
402 526
@@ -29,6 +29,7 @@ @@ -29,6 +29,7 @@
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
  32 +#include <map>
32 #include <vector> 33 #include <vector>
33 34
34 class SrsLbRoundRobin; 35 class SrsLbRoundRobin;
@@ -67,14 +68,22 @@ public: @@ -67,14 +68,22 @@ public:
67 /** 68 /**
68 * the following is all types of kafka messages. 69 * the following is all types of kafka messages.
69 */ 70 */
70 -struct SrsKafkaMessageOnClient : public ISrsAsyncCallTask 71 +class SrsKafkaMessage : public ISrsAsyncCallTask
  72 +{
  73 +protected:
  74 + int key;
  75 +public:
  76 + SrsKafkaMessage(int k);
  77 + virtual ~SrsKafkaMessage();
  78 +};
  79 +struct SrsKafkaMessageOnClient : public SrsKafkaMessage
71 { 80 {
72 public: 81 public:
73 SrsKafkaProducer* producer; 82 SrsKafkaProducer* producer;
74 SrsListenerType type; 83 SrsListenerType type;
75 std::string ip; 84 std::string ip;
76 public: 85 public:
77 - SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, std::string i); 86 + SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, std::string i);
78 virtual ~SrsKafkaMessageOnClient(); 87 virtual ~SrsKafkaMessageOnClient();
79 // interface ISrsAsyncCallTask 88 // interface ISrsAsyncCallTask
80 public: 89 public:
@@ -83,9 +92,65 @@ public: @@ -83,9 +92,65 @@ public:
83 }; 92 };
84 93
85 /** 94 /**
  95 + * the partition messages cache.
  96 + */
  97 +typedef std::vector<SrsJsonObject*> SrsKafkaPartitionCache;
  98 +
  99 +/**
  100 + * a message cache for kafka.
  101 + */
  102 +class SrsKafkaCache
  103 +{
  104 +public:
  105 + // the total partitions,
  106 + // for the key to map to the parition by key%nb_partitions.
  107 + int nb_partitions;
  108 +private:
  109 + // total messages for all partitions.
  110 + int count;
  111 + // key is the partition id, value is the message set to write to this partition.
  112 + // @remark, when refresh metadata, the partition will increase,
  113 + // so maybe some message will dispatch to new partition.
  114 + std::map< int32_t, SrsKafkaPartitionCache*> cache;
  115 +public:
  116 + SrsKafkaCache();
  117 + virtual ~SrsKafkaCache();
  118 +public:
  119 + virtual void append(int key, SrsJsonObject* obj);
  120 + virtual int size();
  121 + /**
  122 + * fetch out a available partition cache.
  123 + * @return true when got a key and pc; otherwise, false.
  124 + */
  125 + virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc);
  126 + /**
  127 + * flush the specified partition cache.
  128 + */
  129 + virtual int flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc);
  130 +};
  131 +
  132 +/**
  133 + * the kafka cluster interface.
  134 + */
  135 +class ISrsKafkaCluster
  136 +{
  137 +public:
  138 + ISrsKafkaCluster();
  139 + virtual ~ISrsKafkaCluster();
  140 +public:
  141 + /**
  142 + * when got any client connect to SRS, notify kafka.
  143 + * @param key the partition map key, a id or hash.
  144 + * @param type the type of client.
  145 + * @param ip the peer ip of client.
  146 + */
  147 + virtual int on_client(int key, SrsListenerType type, std::string ip) = 0;
  148 +};
  149 +
  150 +/**
86 * the kafka producer used to save log to kafka cluster. 151 * the kafka producer used to save log to kafka cluster.
87 */ 152 */
88 -class SrsKafkaProducer : public ISrsReusableThreadHandler 153 +class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster
89 { 154 {
90 private: 155 private:
91 st_mutex_t lock; 156 st_mutex_t lock;
@@ -95,7 +160,7 @@ private: @@ -95,7 +160,7 @@ private:
95 st_cond_t metadata_expired; 160 st_cond_t metadata_expired;
96 public: 161 public:
97 std::vector<SrsKafkaPartition*> partitions; 162 std::vector<SrsKafkaPartition*> partitions;
98 - std::vector<SrsJsonObject*> objects; 163 + SrsKafkaCache* cache;
99 private: 164 private:
100 SrsLbRoundRobin* lb; 165 SrsLbRoundRobin* lb;
101 SrsAsyncCallWorker* worker; 166 SrsAsyncCallWorker* worker;
@@ -112,19 +177,23 @@ public: @@ -112,19 +177,23 @@ public:
112 /** 177 /**
113 * when got any client connect to SRS, notify kafka. 178 * when got any client connect to SRS, notify kafka.
114 */ 179 */
115 - virtual int on_client(SrsListenerType type, st_netfd_t stfd); 180 + virtual int on_client(int key, SrsListenerType type, std::string ip);
  181 +// for worker to call task to send object.
  182 +public:
116 /** 183 /**
117 * send json object to kafka cluster. 184 * send json object to kafka cluster.
118 * the producer will aggregate message and send in kafka message set. 185 * the producer will aggregate message and send in kafka message set.
  186 + * @param key the key to map to the partition, user can use cid or hash.
119 * @param obj the json object; user must never free it again. 187 * @param obj the json object; user must never free it again.
120 */ 188 */
121 - virtual int send(SrsJsonObject* obj); 189 + virtual int send(int key, SrsJsonObject* obj);
122 // interface ISrsReusableThreadHandler 190 // interface ISrsReusableThreadHandler
123 public: 191 public:
124 virtual int cycle(); 192 virtual int cycle();
125 virtual int on_before_cycle(); 193 virtual int on_before_cycle();
126 virtual int on_end_cycle(); 194 virtual int on_end_cycle();
127 private: 195 private:
  196 + virtual void clear_metadata();
128 virtual int do_cycle(); 197 virtual int do_cycle();
129 virtual int request_metadata(); 198 virtual int request_metadata();
130 // set the metadata to invalid and refresh it. 199 // set the metadata to invalid and refresh it.
@@ -55,6 +55,7 @@ using namespace std; @@ -55,6 +55,7 @@ using namespace std;
55 #include <srs_app_statistic.hpp> 55 #include <srs_app_statistic.hpp>
56 #include <srs_protocol_utility.hpp> 56 #include <srs_protocol_utility.hpp>
57 #include <srs_protocol_json.hpp> 57 #include <srs_protocol_json.hpp>
  58 +#include <srs_app_kafka.hpp>
58 59
59 // when stream is busy, for example, streaming is already 60 // when stream is busy, for example, streaming is already
60 // publishing, when a new client to request to publish, 61 // publishing, when a new client to request to publish,
@@ -310,10 +311,18 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) @@ -310,10 +311,18 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
310 transport->set_recv_timeout(timeout); 311 transport->set_recv_timeout(timeout);
311 } 312 }
312 313
  314 +#ifdef SRS_AUTO_KAFKA
  315 +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c)
  316 +#else
313 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) 317 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
  318 +#endif
314 : SrsConnection(svr, c) 319 : SrsConnection(svr, c)
315 { 320 {
316 server = svr; 321 server = svr;
  322 +#ifdef SRS_AUTO_KAFKA
  323 + kafka = k;
  324 +#endif
  325 +
317 req = new SrsRequest(); 326 req = new SrsRequest();
318 res = new SrsResponse(); 327 res = new SrsResponse();
319 skt = new SrsStSocket(c); 328 skt = new SrsStSocket(c);
@@ -365,6 +374,14 @@ int SrsRtmpConn::do_cycle() @@ -365,6 +374,14 @@ int SrsRtmpConn::do_cycle()
365 int ret = ERROR_SUCCESS; 374 int ret = ERROR_SUCCESS;
366 375
367 srs_trace("RTMP client ip=%s", ip.c_str()); 376 srs_trace("RTMP client ip=%s", ip.c_str());
  377 +
  378 + // notify kafka cluster.
  379 +#ifdef SRS_AUTO_KAFKA
  380 + if ((ret = kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) {
  381 + srs_error("kafka handler on_client failed. ret=%d", ret);
  382 + return ret;
  383 + }
  384 +#endif
368 385
369 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); 386 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
370 rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); 387 rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
@@ -58,6 +58,9 @@ class SrsSecurity; @@ -58,6 +58,9 @@ class SrsSecurity;
58 class ISrsWakable; 58 class ISrsWakable;
59 class SrsCommonMessage; 59 class SrsCommonMessage;
60 class SrsPacket; 60 class SrsPacket;
  61 +#ifdef SRS_AUTO_KAFKA
  62 +class ISrsKafkaCluster;
  63 +#endif
61 64
62 /** 65 /**
63 * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. 66 * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs.
@@ -135,8 +138,16 @@ private: @@ -135,8 +138,16 @@ private:
135 int publish_normal_timeout; 138 int publish_normal_timeout;
136 // whether enable the tcp_nodelay. 139 // whether enable the tcp_nodelay.
137 bool tcp_nodelay; 140 bool tcp_nodelay;
  141 + // the kafka cluster
  142 +#ifdef SRS_AUTO_KAFKA
  143 + ISrsKafkaCluster* kafka;
  144 +#endif
138 public: 145 public:
  146 +#ifdef SRS_AUTO_KAFKA
  147 + SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c);
  148 +#else
139 SrsRtmpConn(SrsServer* svr, st_netfd_t c); 149 SrsRtmpConn(SrsServer* svr, st_netfd_t c);
  150 +#endif
140 virtual ~SrsRtmpConn(); 151 virtual ~SrsRtmpConn();
141 public: 152 public:
142 virtual void dispose(); 153 virtual void dispose();
@@ -1267,7 +1267,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1267,7 +1267,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1267 1267
1268 SrsConnection* conn = NULL; 1268 SrsConnection* conn = NULL;
1269 if (type == SrsListenerRtmpStream) { 1269 if (type == SrsListenerRtmpStream) {
1270 - conn = new SrsRtmpConn(this, client_stfd); 1270 + conn = new SrsRtmpConn(this, kafka, client_stfd);
1271 } else if (type == SrsListenerHttpApi) { 1271 } else if (type == SrsListenerHttpApi) {
1272 #ifdef SRS_AUTO_HTTP_API 1272 #ifdef SRS_AUTO_HTTP_API
1273 conn = new SrsHttpApi(this, client_stfd, http_api_mux); 1273 conn = new SrsHttpApi(this, client_stfd, http_api_mux);
@@ -1289,14 +1289,6 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1289,14 +1289,6 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1289 } 1289 }
1290 srs_assert(conn); 1290 srs_assert(conn);
1291 1291
1292 -#ifdef SRS_AUTO_KAFKA  
1293 - // notify kafka cluster.  
1294 - if ((ret = kafka->on_client(type, client_stfd)) != ERROR_SUCCESS) {  
1295 - srs_error("kafka handler on_client failed. ret=%d", ret);  
1296 - return ret;  
1297 - }  
1298 -#endif  
1299 -  
1300 // directly enqueue, the cycle thread will remove the client. 1292 // directly enqueue, the cycle thread will remove the client.
1301 conns.push_back(conn); 1293 conns.push_back(conn);
1302 srs_verbose("add conn to vector."); 1294 srs_verbose("add conn to vector.");