winlin

kafka producer use async interface to request metadata.

@@ -37,10 +37,18 @@ using namespace std; @@ -37,10 +37,18 @@ using namespace std;
37 37
38 #ifdef SRS_AUTO_KAFKA 38 #ifdef SRS_AUTO_KAFKA
39 39
  40 +#define SRS_KAKFA_CYCLE_INTERVAL_MS 3000
  41 +
40 SrsKafkaProducer::SrsKafkaProducer() 42 SrsKafkaProducer::SrsKafkaProducer()
41 { 43 {
42 - lb = new SrsLbRoundRobin(); 44 + meatadata_ok = false;
  45 + metadata_expired = st_cond_new();
  46 +
  47 + lock = st_mutex_new();
  48 + pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000);
43 worker = new SrsAsyncCallWorker(); 49 worker = new SrsAsyncCallWorker();
  50 +
  51 + lb = new SrsLbRoundRobin();
44 transport = new SrsTcpClient(); 52 transport = new SrsTcpClient();
45 kafka = new SrsKafkaClient(transport); 53 kafka = new SrsKafkaClient(transport);
46 } 54 }
@@ -48,21 +56,21 @@ SrsKafkaProducer::SrsKafkaProducer() @@ -48,21 +56,21 @@ SrsKafkaProducer::SrsKafkaProducer()
48 SrsKafkaProducer::~SrsKafkaProducer() 56 SrsKafkaProducer::~SrsKafkaProducer()
49 { 57 {
50 srs_freep(lb); 58 srs_freep(lb);
51 - srs_freep(worker);  
52 srs_freep(kafka); 59 srs_freep(kafka);
53 srs_freep(transport); 60 srs_freep(transport);
  61 +
  62 + srs_freep(worker);
  63 + srs_freep(pthread);
  64 +
  65 + st_mutex_destroy(lock);
  66 + st_cond_destroy(metadata_expired);
54 } 67 }
55 68
56 int SrsKafkaProducer::initialize() 69 int SrsKafkaProducer::initialize()
57 { 70 {
58 int ret = ERROR_SUCCESS; 71 int ret = ERROR_SUCCESS;
59 72
60 - // when kafka enabled, request metadata when startup.  
61 - if ((ret = request_metadata()) != ERROR_SUCCESS) {  
62 - srs_error("request kafka metadata failed. ret=%d", ret);  
63 - return ret;  
64 - }  
65 - 73 + meatadata_ok = false;
66 srs_info("initialize kafka producer ok."); 74 srs_info("initialize kafka producer ok.");
67 75
68 return ret; 76 return ret;
@@ -73,20 +81,78 @@ int SrsKafkaProducer::start() @@ -73,20 +81,78 @@ int SrsKafkaProducer::start()
73 int ret = ERROR_SUCCESS; 81 int ret = ERROR_SUCCESS;
74 82
75 if ((ret = worker->start()) != ERROR_SUCCESS) { 83 if ((ret = worker->start()) != ERROR_SUCCESS) {
76 - srs_error("start kafka failed. ret=%d", ret); 84 + srs_error("start kafka worker failed. ret=%d", ret);
77 return ret; 85 return ret;
78 } 86 }
79 87
80 - srs_info("kafka worker ok"); 88 + if ((ret = pthread->start()) != ERROR_SUCCESS) {
  89 + srs_error("start kafka thread failed. ret=%d", ret);
  90 + }
  91 +
  92 + meatadata_ok = false;
  93 + st_cond_signal(metadata_expired);
  94 + srs_trace("kafka work in background");
81 95
82 return ret; 96 return ret;
83 } 97 }
84 98
85 void SrsKafkaProducer::stop() 99 void SrsKafkaProducer::stop()
86 { 100 {
  101 + pthread->stop();
87 worker->stop(); 102 worker->stop();
88 } 103 }
89 104
  105 +int SrsKafkaProducer::cycle()
  106 +{
  107 + int ret = ERROR_SUCCESS;
  108 +
  109 + if ((ret = do_cycle()) != ERROR_SUCCESS) {
  110 + srs_warn("ignore kafka error. ret=%d", ret);
  111 + }
  112 +
  113 + return ret;
  114 +}
  115 +
  116 +int SrsKafkaProducer::on_before_cycle()
  117 +{
  118 + // wait for the metadata expired.
  119 + // when metadata is ok, wait for it expired.
  120 + if (meatadata_ok) {
  121 + st_cond_wait(metadata_expired);
  122 + }
  123 +
  124 + // request to lock to acquire the socket.
  125 + st_mutex_lock(lock);
  126 +
  127 + return ERROR_SUCCESS;
  128 +}
  129 +
  130 +int SrsKafkaProducer::on_end_cycle()
  131 +{
  132 + st_mutex_unlock(lock);
  133 +
  134 + return ERROR_SUCCESS;
  135 +}
  136 +
  137 +int SrsKafkaProducer::do_cycle()
  138 +{
  139 + int ret = ERROR_SUCCESS;
  140 +
  141 + // ignore when disabled.
  142 + bool enabled = _srs_config->get_kafka_enabled();
  143 + if (!enabled) {
  144 + return ret;
  145 + }
  146 +
  147 + // when kafka enabled, request metadata when startup.
  148 + if ((ret = request_metadata()) != ERROR_SUCCESS) {
  149 + srs_error("request kafka metadata failed. ret=%d", ret);
  150 + return ret;
  151 + }
  152 +
  153 + return ret;
  154 +}
  155 +
90 int SrsKafkaProducer::request_metadata() 156 int SrsKafkaProducer::request_metadata()
91 { 157 {
92 int ret = ERROR_SUCCESS; 158 int ret = ERROR_SUCCESS;
@@ -133,6 +199,8 @@ int SrsKafkaProducer::request_metadata() @@ -133,6 +199,8 @@ int SrsKafkaProducer::request_metadata()
133 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); 199 senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
134 } 200 }
135 201
  202 + meatadata_ok = true;
  203 +
136 return ret; 204 return ret;
137 } 205 }
138 206
@@ -34,14 +34,22 @@ class SrsAsyncCallWorker; @@ -34,14 +34,22 @@ class SrsAsyncCallWorker;
34 class SrsTcpClient; 34 class SrsTcpClient;
35 class SrsKafkaClient; 35 class SrsKafkaClient;
36 36
  37 +#include <srs_app_thread.hpp>
  38 +
37 #ifdef SRS_AUTO_KAFKA 39 #ifdef SRS_AUTO_KAFKA
38 40
39 /** 41 /**
40 * the kafka producer used to save log to kafka cluster. 42 * the kafka producer used to save log to kafka cluster.
41 */ 43 */
42 -class SrsKafkaProducer 44 +class SrsKafkaProducer : public ISrsReusableThreadHandler
43 { 45 {
44 private: 46 private:
  47 + st_mutex_t lock;
  48 + SrsReusableThread* pthread;
  49 +private:
  50 + bool meatadata_ok;
  51 + st_cond_t metadata_expired;
  52 +private:
45 SrsLbRoundRobin* lb; 53 SrsLbRoundRobin* lb;
46 SrsAsyncCallWorker* worker; 54 SrsAsyncCallWorker* worker;
47 SrsTcpClient* transport; 55 SrsTcpClient* transport;
@@ -53,7 +61,13 @@ public: @@ -53,7 +61,13 @@ public:
53 virtual int initialize(); 61 virtual int initialize();
54 virtual int start(); 62 virtual int start();
55 virtual void stop(); 63 virtual void stop();
  64 +// interface ISrsReusableThreadHandler
  65 +public:
  66 + virtual int cycle();
  67 + virtual int on_before_cycle();
  68 + virtual int on_end_cycle();
56 private: 69 private:
  70 + virtual int do_cycle();
57 virtual int request_metadata(); 71 virtual int request_metadata();
58 }; 72 };
59 73