winlin

refine code.

@@ -381,25 +381,37 @@ void SrsKafkaProducer::stop() @@ -381,25 +381,37 @@ void SrsKafkaProducer::stop()
381 worker->stop(); 381 worker->stop();
382 } 382 }
383 383
384 -int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) 384 +int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
385 { 385 {
386 int ret = ERROR_SUCCESS; 386 int ret = ERROR_SUCCESS;
387 387
388 - bool enabled = _srs_config->get_kafka_enabled();  
389 - if (!enabled) { 388 + // cache the json object.
  389 + cache->append(key, obj);
  390 +
  391 + // too few messages, ignore.
  392 + if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
390 return ret; 393 return ret;
391 } 394 }
392 395
393 - SrsJsonObject* obj = SrsJsonAny::object(); 396 + // too many messages, warn user.
  397 + if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {
  398 + srs_warn("kafka cache too many messages: %d", cache->size());
  399 + }
394 400
395 - obj->set("msg", SrsJsonAny::str("accept"));  
396 - obj->set("type", SrsJsonAny::integer(type));  
397 - obj->set("ip", SrsJsonAny::str(ip.c_str())); 401 + // sync with backgound metadata worker.
  402 + st_mutex_lock(lock);
398 403
399 - return worker->execute(new SrsKafkaMessage(this, key, obj)); 404 + // flush message when metadata is ok.
  405 + if (metadata_ok) {
  406 + ret = flush();
  407 + }
  408 +
  409 + st_mutex_unlock(lock);
  410 +
  411 + return ret;
400 } 412 }
401 413
402 -int SrsKafkaProducer::on_close(int key) 414 +int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
403 { 415 {
404 int ret = ERROR_SUCCESS; 416 int ret = ERROR_SUCCESS;
405 417
@@ -410,39 +422,27 @@ int SrsKafkaProducer::on_close(int key) @@ -410,39 +422,27 @@ int SrsKafkaProducer::on_close(int key)
410 422
411 SrsJsonObject* obj = SrsJsonAny::object(); 423 SrsJsonObject* obj = SrsJsonAny::object();
412 424
413 - obj->set("msg", SrsJsonAny::str("close")); 425 + obj->set("msg", SrsJsonAny::str("accept"));
  426 + obj->set("type", SrsJsonAny::integer(type));
  427 + obj->set("ip", SrsJsonAny::str(ip.c_str()));
414 428
415 return worker->execute(new SrsKafkaMessage(this, key, obj)); 429 return worker->execute(new SrsKafkaMessage(this, key, obj));
416 } 430 }
417 431
418 -int SrsKafkaProducer::send(int key, SrsJsonObject* obj) 432 +int SrsKafkaProducer::on_close(int key)
419 { 433 {
420 int ret = ERROR_SUCCESS; 434 int ret = ERROR_SUCCESS;
421 435
422 - // cache the json object.  
423 - cache->append(key, obj);  
424 -  
425 - // too few messages, ignore.  
426 - if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { 436 + bool enabled = _srs_config->get_kafka_enabled();
  437 + if (!enabled) {
427 return ret; 438 return ret;
428 } 439 }
429 440
430 - // too many messages, warn user.  
431 - if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {  
432 - srs_warn("kafka cache too many messages: %d", cache->size());  
433 - }  
434 -  
435 - // sync with backgound metadata worker.  
436 - st_mutex_lock(lock);  
437 -  
438 - // flush message when metadata is ok.  
439 - if (metadata_ok) {  
440 - ret = flush();  
441 - } 441 + SrsJsonObject* obj = SrsJsonAny::object();
442 442
443 - st_mutex_unlock(lock); 443 + obj->set("msg", SrsJsonAny::str("close"));
444 444
445 - return ret; 445 + return worker->execute(new SrsKafkaMessage(this, key, obj));
446 } 446 }
447 447
448 int SrsKafkaProducer::cycle() 448 int SrsKafkaProducer::cycle()
@@ -173,11 +173,7 @@ public: @@ -173,11 +173,7 @@ public:
173 virtual int initialize(); 173 virtual int initialize();
174 virtual int start(); 174 virtual int start();
175 virtual void stop(); 175 virtual void stop();
176 -// interface ISrsKafkaCluster  
177 -public:  
178 - virtual int on_client(int key, SrsListenerType type, std::string ip);  
179 - virtual int on_close(int key);  
180 -// for worker to call task to send object. 176 +// internal: for worker to call task to send object.
181 public: 177 public:
182 /** 178 /**
183 * send json object to kafka cluster. 179 * send json object to kafka cluster.
@@ -186,6 +182,10 @@ public: @@ -186,6 +182,10 @@ public:
186 * @param obj the json object; user must never free it again. 182 * @param obj the json object; user must never free it again.
187 */ 183 */
188 virtual int send(int key, SrsJsonObject* obj); 184 virtual int send(int key, SrsJsonObject* obj);
  185 +// interface ISrsKafkaCluster
  186 +public:
  187 + virtual int on_client(int key, SrsListenerType type, std::string ip);
  188 + virtual int on_close(int key);
189 // interface ISrsReusableThreadHandler 189 // interface ISrsReusableThreadHandler
190 public: 190 public:
191 virtual int cycle(); 191 virtual int cycle();