winlin

notify kafka when client close

@@ -383,6 +383,13 @@ void SrsKafkaProducer::stop() @@ -383,6 +383,13 @@ void SrsKafkaProducer::stop()
383 383
384 int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) 384 int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
385 { 385 {
  386 + int ret = ERROR_SUCCESS;
  387 +
  388 + bool enabled = _srs_config->get_kafka_enabled();
  389 + if (!enabled) {
  390 + return ret;
  391 + }
  392 +
386 SrsJsonObject* obj = SrsJsonAny::object(); 393 SrsJsonObject* obj = SrsJsonAny::object();
387 394
388 obj->set("msg", SrsJsonAny::str("accept")); 395 obj->set("msg", SrsJsonAny::str("accept"));
@@ -392,6 +399,22 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) @@ -392,6 +399,22 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
392 return worker->execute(new SrsKafkaMessage(this, key, obj)); 399 return worker->execute(new SrsKafkaMessage(this, key, obj));
393 } 400 }
394 401
  402 +int SrsKafkaProducer::on_close(int key)
  403 +{
  404 + int ret = ERROR_SUCCESS;
  405 +
  406 + bool enabled = _srs_config->get_kafka_enabled();
  407 + if (!enabled) {
  408 + return ret;
  409 + }
  410 +
  411 + SrsJsonObject* obj = SrsJsonAny::object();
  412 +
  413 + obj->set("msg", SrsJsonAny::str("close"));
  414 +
  415 + return worker->execute(new SrsKafkaMessage(this, key, obj));
  416 +}
  417 +
395 int SrsKafkaProducer::send(int key, SrsJsonObject* obj) 418 int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
396 { 419 {
397 int ret = ERROR_SUCCESS; 420 int ret = ERROR_SUCCESS;
@@ -137,11 +137,16 @@ public: @@ -137,11 +137,16 @@ public:
137 public: 137 public:
138 /** 138 /**
139 * when got any client connect to SRS, notify kafka. 139 * when got any client connect to SRS, notify kafka.
140 - * @param key the partition map key, a id or hash. 140 + * @param key the partition map key, the client id or hash(ip).
141 * @param type the type of client. 141 * @param type the type of client.
142 * @param ip the peer ip of client. 142 * @param ip the peer ip of client.
143 */ 143 */
144 virtual int on_client(int key, SrsListenerType type, std::string ip) = 0; 144 virtual int on_client(int key, SrsListenerType type, std::string ip) = 0;
  145 + /**
  146 + * when client close or disconnect for error.
  147 + * @param key the partition map key, the client id or hash(ip).
  148 + */
  149 + virtual int on_close(int key) = 0;
145 }; 150 };
146 151
147 /** 152 /**
@@ -168,11 +173,10 @@ public: @@ -168,11 +173,10 @@ public:
168 virtual int initialize(); 173 virtual int initialize();
169 virtual int start(); 174 virtual int start();
170 virtual void stop(); 175 virtual void stop();
  176 +// interface ISrsKafkaCluster
171 public: 177 public:
172 - /**  
173 - * when got any client connect to SRS, notify kafka.  
174 - */  
175 virtual int on_client(int key, SrsListenerType type, std::string ip); 178 virtual int on_client(int key, SrsListenerType type, std::string ip);
  179 + virtual int on_close(int key);
176 // for worker to call task to send object. 180 // for worker to call task to send object.
177 public: 181 public:
178 /** 182 /**
@@ -1555,6 +1555,13 @@ int SrsRtmpConn::on_disconnect() @@ -1555,6 +1555,13 @@ int SrsRtmpConn::on_disconnect()
1555 int ret = ERROR_SUCCESS; 1555 int ret = ERROR_SUCCESS;
1556 1556
1557 http_hooks_on_close(); 1557 http_hooks_on_close();
  1558 +
  1559 +#ifdef SRS_AUTO_KAFKA
  1560 + if ((ret = kafka->on_close(srs_id())) != ERROR_SUCCESS) {
  1561 + srs_error("notify kafka failed. ret=%d", ret);
  1562 + return ret;
  1563 + }
  1564 +#endif
1558 1565
1559 // TODO: implements it. 1566 // TODO: implements it.
1560 1567