winlin

kafka send the accept message.

@@ -36,10 +36,12 @@ using namespace std; @@ -36,10 +36,12 @@ using namespace std;
36 #include <srs_kernel_balance.hpp> 36 #include <srs_kernel_balance.hpp>
37 #include <srs_kafka_stack.hpp> 37 #include <srs_kafka_stack.hpp>
38 #include <srs_core_autofree.hpp> 38 #include <srs_core_autofree.hpp>
  39 +#include <srs_protocol_json.hpp>
39 40
40 #ifdef SRS_AUTO_KAFKA 41 #ifdef SRS_AUTO_KAFKA
41 42
42 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 43 #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000
  44 +#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 10
43 45
44 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) 46 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
45 { 47 {
@@ -144,6 +146,33 @@ string SrsKafkaPartition::hostport() @@ -144,6 +146,33 @@ string SrsKafkaPartition::hostport()
144 return ep; 146 return ep;
145 } 147 }
146 148
  149 +SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, string i)
  150 +{
  151 + producer = p;
  152 + type = t;
  153 + ip = i;
  154 +}
  155 +
  156 +SrsKafkaMessageOnClient::~SrsKafkaMessageOnClient()
  157 +{
  158 +}
  159 +
  160 +int SrsKafkaMessageOnClient::call()
  161 +{
  162 + SrsJsonObject* obj = SrsJsonAny::object();
  163 +
  164 + obj->set("msg", SrsJsonAny::str("accept"));
  165 + obj->set("type", SrsJsonAny::integer(type));
  166 + obj->set("ip", SrsJsonAny::str(ip.c_str()));
  167 +
  168 + return producer->send(obj);
  169 +}
  170 +
  171 +string SrsKafkaMessageOnClient::to_string()
  172 +{
  173 + return ip;
  174 +}
  175 +
147 SrsKafkaProducer::SrsKafkaProducer() 176 SrsKafkaProducer::SrsKafkaProducer()
148 { 177 {
149 metadata_ok = false; 178 metadata_ok = false;
@@ -211,6 +240,41 @@ void SrsKafkaProducer::stop() @@ -211,6 +240,41 @@ void SrsKafkaProducer::stop()
211 worker->stop(); 240 worker->stop();
212 } 241 }
213 242
  243 +int SrsKafkaProducer::on_client(SrsListenerType type, st_netfd_t stfd)
  244 +{
  245 + return worker->execute(new SrsKafkaMessageOnClient(this, type, srs_get_peer_ip(st_netfd_fileno(stfd))));
  246 +}
  247 +
  248 +int SrsKafkaProducer::send(SrsJsonObject* obj)
  249 +{
  250 + int ret = ERROR_SUCCESS;
  251 +
  252 + // cache the json object.
  253 + objects.push_back(obj);
  254 +
  255 + // too few messages, ignore.
  256 + if (objects.size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
  257 + return ret;
  258 + }
  259 +
  260 + // too many messages, warn user.
  261 + if (objects.size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {
  262 + srs_warn("kafka cache too many messages: %d", objects.size());
  263 + }
  264 +
  265 + // sync with backgound metadata worker.
  266 + st_mutex_lock(lock);
  267 +
  268 + // flush message when metadata is ok.
  269 + if (metadata_ok) {
  270 + ret = flush();
  271 + }
  272 +
  273 + st_mutex_unlock(lock);
  274 +
  275 + return ret;
  276 +}
  277 +
214 int SrsKafkaProducer::cycle() 278 int SrsKafkaProducer::cycle()
215 { 279 {
216 int ret = ERROR_SUCCESS; 280 int ret = ERROR_SUCCESS;
@@ -329,5 +393,12 @@ void SrsKafkaProducer::refresh_metadata() @@ -329,5 +393,12 @@ void SrsKafkaProducer::refresh_metadata()
329 srs_trace("kafka async refresh metadata in background"); 393 srs_trace("kafka async refresh metadata in background");
330 } 394 }
331 395
  396 +int SrsKafkaProducer::flush()
  397 +{
  398 + int ret = ERROR_SUCCESS;
  399 + // TODO: FIXME: implements it.
  400 + return ret;
  401 +}
  402 +
332 #endif 403 #endif
333 404
@@ -35,8 +35,12 @@ class SrsLbRoundRobin; @@ -35,8 +35,12 @@ class SrsLbRoundRobin;
35 class SrsAsyncCallWorker; 35 class SrsAsyncCallWorker;
36 class SrsTcpClient; 36 class SrsTcpClient;
37 class SrsKafkaClient; 37 class SrsKafkaClient;
  38 +class SrsJsonObject;
  39 +class SrsKafkaProducer;
38 40
39 #include <srs_app_thread.hpp> 41 #include <srs_app_thread.hpp>
  42 +#include <srs_app_server.hpp>
  43 +#include <srs_app_async_call.hpp>
40 44
41 #ifdef SRS_AUTO_KAFKA 45 #ifdef SRS_AUTO_KAFKA
42 46
@@ -61,6 +65,24 @@ public: @@ -61,6 +65,24 @@ public:
61 }; 65 };
62 66
63 /** 67 /**
  68 + * the following is all types of kafka messages.
  69 + */
  70 +struct SrsKafkaMessageOnClient : public ISrsAsyncCallTask
  71 +{
  72 +public:
  73 + SrsKafkaProducer* producer;
  74 + SrsListenerType type;
  75 + std::string ip;
  76 +public:
  77 + SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, std::string i);
  78 + virtual ~SrsKafkaMessageOnClient();
  79 +// interface ISrsAsyncCallTask
  80 +public:
  81 + virtual int call();
  82 + virtual std::string to_string();
  83 +};
  84 +
  85 +/**
64 * the kafka producer used to save log to kafka cluster. 86 * the kafka producer used to save log to kafka cluster.
65 */ 87 */
66 class SrsKafkaProducer : public ISrsReusableThreadHandler 88 class SrsKafkaProducer : public ISrsReusableThreadHandler
@@ -73,6 +95,7 @@ private: @@ -73,6 +95,7 @@ private:
73 st_cond_t metadata_expired; 95 st_cond_t metadata_expired;
74 public: 96 public:
75 std::vector<SrsKafkaPartition*> partitions; 97 std::vector<SrsKafkaPartition*> partitions;
  98 + std::vector<SrsJsonObject*> objects;
76 private: 99 private:
77 SrsLbRoundRobin* lb; 100 SrsLbRoundRobin* lb;
78 SrsAsyncCallWorker* worker; 101 SrsAsyncCallWorker* worker;
@@ -85,6 +108,17 @@ public: @@ -85,6 +108,17 @@ public:
85 virtual int initialize(); 108 virtual int initialize();
86 virtual int start(); 109 virtual int start();
87 virtual void stop(); 110 virtual void stop();
  111 +public:
  112 + /**
  113 + * when got any client connect to SRS, notify kafka.
  114 + */
  115 + virtual int on_client(SrsListenerType type, st_netfd_t stfd);
  116 + /**
  117 + * send json object to kafka cluster.
  118 + * the producer will aggregate message and send in kafka message set.
  119 + * @param obj the json object; user must never free it again.
  120 + */
  121 + virtual int send(SrsJsonObject* obj);
88 // interface ISrsReusableThreadHandler 122 // interface ISrsReusableThreadHandler
89 public: 123 public:
90 virtual int cycle(); 124 virtual int cycle();
@@ -95,6 +129,7 @@ private: @@ -95,6 +129,7 @@ private:
95 virtual int request_metadata(); 129 virtual int request_metadata();
96 // set the metadata to invalid and refresh it. 130 // set the metadata to invalid and refresh it.
97 virtual void refresh_metadata(); 131 virtual void refresh_metadata();
  132 + virtual int flush();
98 }; 133 };
99 134
100 #endif 135 #endif
@@ -1289,6 +1289,14 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1289,6 +1289,14 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1289 } 1289 }
1290 srs_assert(conn); 1290 srs_assert(conn);
1291 1291
  1292 +#ifdef SRS_AUTO_KAFKA
  1293 + // notify kafka cluster.
  1294 + if ((ret = kafka->on_client(type, client_stfd)) != ERROR_SUCCESS) {
  1295 + srs_error("kafka handler on_client failed. ret=%d", ret);
  1296 + return ret;
  1297 + }
  1298 +#endif
  1299 +
1292 // directly enqueue, the cycle thread will remove the client. 1300 // directly enqueue, the cycle thread will remove the client.
1293 conns.push_back(conn); 1301 conns.push_back(conn);
1294 srs_verbose("add conn to vector."); 1302 srs_verbose("add conn to vector.");