winlin

refine kafka

  1 +console.conf
  2 +doc/frozen.2Mbps.1644x1028.flv
  3 +doc/frozen.500Kbps.766x480.flv
  4 +doc/kungfupanda3-tlr1_h1080p.200kbps.flv
  5 +doc/kungfupanda3-tlr1_h1080p.300kbps.flv
  6 +doc/kungfupanda3-tlr1_h1080p.400kbps.flv
  7 +doc/kungfupanda3-tlr1_h1080p.500kbps.flv
  8 +doc/kungfupanda3-tlr1_h1080p.600kbps.flv
  9 +doc/kungfupanda3-tlr1_h1080p.700kbps.flv
  10 +doc/kungfupanda3-tlr1_h1080p.800kbps.flv
  11 +doc/kungfupanda3-tlr1_h1080p.8mbps.flv
  12 +doc/kungfupanda3-tlr1_h1080p.900kbps.flv
  13 +doc/time.300kbps.flv
  14 +edge.conf
  15 +html
  16 +ide/srs_xcode/srs_xcode.xcodeproj/project.xcworkspace/xcshareddata/
  17 +ide/srs_xcode/srs_xcode.xcodeproj/project.xcworkspace/xcuserdata/
  18 +ide/srs_xcode/srs_xcode.xcodeproj/xcuserdata/
  19 +ingest.conf
  20 +origin.conf
  21 +research/aac/
  22 +research/api-server/.idea/
  23 +research/api-server/static-dir/mse
  24 +research/bat/
  25 +research/big/
  26 +research/bitch/
  27 +research/bott/
  28 +research/cgo/
  29 +research/dns/
  30 +research/empty/
  31 +research/golang/golang
  32 +research/golang/temp.flv
  33 +research/librtmp/720p.h264.raw
  34 +research/librtmp/test.h264
  35 +research/licenser/
  36 +research/players/.idea/
  37 +research/players/fls_player/
  38 +research/players/mic/
  39 +research/players/srs_player/.idea/
  40 +research/proxy/
  41 +research/redis-ocluster/
  42 +research/rtmfp/
  43 +research/snap/
  44 +research/speex/
  45 +test/
  46 +
@@ -321,6 +321,35 @@ ISrsKafkaCluster::~ISrsKafkaCluster() @@ -321,6 +321,35 @@ ISrsKafkaCluster::~ISrsKafkaCluster()
321 { 321 {
322 } 322 }
323 323
  324 +// @global kafka event producer, user must use srs_initialize_kafka to initialize it.
  325 +ISrsKafkaCluster* _srs_kafka = NULL;
  326 +
  327 +int srs_initialize_kafka()
  328 +{
  329 + int ret = ERROR_SUCCESS;
  330 +
  331 + SrsKafkaProducer* kafka = new SrsKafkaProducer();
  332 + _srs_kafka = kafka;
  333 +
  334 + if ((ret = kafka->initialize()) != ERROR_SUCCESS) {
  335 + srs_error("initialize the kafka producer failed. ret=%d", ret);
  336 + return ret;
  337 + }
  338 +
  339 + if ((ret = kafka->start()) != ERROR_SUCCESS) {
  340 + srs_error("start kafka failed. ret=%d", ret);
  341 + return ret;
  342 + }
  343 +
  344 + return ret;
  345 +}
  346 +
  347 +void srs_dispose_kafka()
  348 +{
  349 + SrsKafkaProducer* kafka = dynamic_cast<SrsKafkaProducer*>(_srs_kafka);
  350 + kafka->stop();
  351 +}
  352 +
324 SrsKafkaProducer::SrsKafkaProducer() 353 SrsKafkaProducer::SrsKafkaProducer()
325 { 354 {
326 metadata_ok = false; 355 metadata_ok = false;
@@ -362,6 +391,10 @@ int SrsKafkaProducer::start() @@ -362,6 +391,10 @@ int SrsKafkaProducer::start()
362 { 391 {
363 int ret = ERROR_SUCCESS; 392 int ret = ERROR_SUCCESS;
364 393
  394 + if (!enabled) {
  395 + return ret;
  396 + }
  397 +
365 if ((ret = worker->start()) != ERROR_SUCCESS) { 398 if ((ret = worker->start()) != ERROR_SUCCESS) {
366 srs_error("start kafka worker failed. ret=%d", ret); 399 srs_error("start kafka worker failed. ret=%d", ret);
367 return ret; 400 return ret;
@@ -378,6 +411,10 @@ int SrsKafkaProducer::start() @@ -378,6 +411,10 @@ int SrsKafkaProducer::start()
378 411
379 void SrsKafkaProducer::stop() 412 void SrsKafkaProducer::stop()
380 { 413 {
  414 + if (!enabled) {
  415 + return;
  416 + }
  417 +
381 pthread->stop(); 418 pthread->stop();
382 worker->stop(); 419 worker->stop();
383 } 420 }
@@ -149,6 +149,12 @@ public: @@ -149,6 +149,12 @@ public:
149 virtual int on_close(int key) = 0; 149 virtual int on_close(int key) = 0;
150 }; 150 };
151 151
  152 +// @global kafka event producer.
  153 +extern ISrsKafkaCluster* _srs_kafka;
  154 +// kafka initialize and disposer for global object.
  155 +extern int srs_initialize_kafka();
  156 +extern void srs_dispose_kafka();
  157 +
152 /** 158 /**
153 * the kafka producer used to save log to kafka cluster. 159 * the kafka producer used to save log to kafka cluster.
154 */ 160 */
@@ -309,17 +309,10 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) @@ -309,17 +309,10 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
309 transport->set_recv_timeout(timeout); 309 transport->set_recv_timeout(timeout);
310 } 310 }
311 311
312 -#ifdef SRS_AUTO_KAFKA  
313 -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, string cip)  
314 -#else  
315 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) 312 SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
316 -#endif  
317 : SrsConnection(svr, c, cip) 313 : SrsConnection(svr, c, cip)
318 { 314 {
319 server = svr; 315 server = svr;
320 -#ifdef SRS_AUTO_KAFKA  
321 - kafka = k;  
322 -#endif  
323 316
324 req = new SrsRequest(); 317 req = new SrsRequest();
325 res = new SrsResponse(); 318 res = new SrsResponse();
@@ -375,7 +368,7 @@ int SrsRtmpConn::do_cycle() @@ -375,7 +368,7 @@ int SrsRtmpConn::do_cycle()
375 368
376 // notify kafka cluster. 369 // notify kafka cluster.
377 #ifdef SRS_AUTO_KAFKA 370 #ifdef SRS_AUTO_KAFKA
378 - if ((ret = kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) { 371 + if ((ret = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) {
379 srs_error("kafka handler on_client failed. ret=%d", ret); 372 srs_error("kafka handler on_client failed. ret=%d", ret);
380 return ret; 373 return ret;
381 } 374 }
@@ -1558,7 +1551,7 @@ int SrsRtmpConn::on_disconnect() @@ -1558,7 +1551,7 @@ int SrsRtmpConn::on_disconnect()
1558 http_hooks_on_close(); 1551 http_hooks_on_close();
1559 1552
1560 #ifdef SRS_AUTO_KAFKA 1553 #ifdef SRS_AUTO_KAFKA
1561 - if ((ret = kafka->on_close(srs_id())) != ERROR_SUCCESS) { 1554 + if ((ret = _srs_kafka->on_close(srs_id())) != ERROR_SUCCESS) {
1562 srs_error("notify kafka failed. ret=%d", ret); 1555 srs_error("notify kafka failed. ret=%d", ret);
1563 return ret; 1556 return ret;
1564 } 1557 }
@@ -138,16 +138,8 @@ private: @@ -138,16 +138,8 @@ private:
138 int publish_normal_timeout; 138 int publish_normal_timeout;
139 // whether enable the tcp_nodelay. 139 // whether enable the tcp_nodelay.
140 bool tcp_nodelay; 140 bool tcp_nodelay;
141 - // the kafka cluster  
142 -#ifdef SRS_AUTO_KAFKA  
143 - ISrsKafkaCluster* kafka;  
144 -#endif  
145 public: 141 public:
146 -#ifdef SRS_AUTO_KAFKA  
147 - SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, std::string cip);  
148 -#else  
149 SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); 142 SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip);
150 -#endif  
151 virtual ~SrsRtmpConn(); 143 virtual ~SrsRtmpConn();
152 public: 144 public:
153 virtual void dispose(); 145 virtual void dispose();
@@ -511,9 +511,6 @@ SrsServer::SrsServer() @@ -511,9 +511,6 @@ SrsServer::SrsServer()
511 #ifdef SRS_AUTO_INGEST 511 #ifdef SRS_AUTO_INGEST
512 ingester = NULL; 512 ingester = NULL;
513 #endif 513 #endif
514 -#ifdef SRS_AUTO_KAFKA  
515 - kafka = new SrsKafkaProducer();  
516 -#endif  
517 } 514 }
518 515
519 SrsServer::~SrsServer() 516 SrsServer::~SrsServer()
@@ -543,10 +540,6 @@ void SrsServer::destroy() @@ -543,10 +540,6 @@ void SrsServer::destroy()
543 srs_freep(ingester); 540 srs_freep(ingester);
544 #endif 541 #endif
545 542
546 -#ifdef SRS_AUTO_KAFKA  
547 - srs_freep(kafka);  
548 -#endif  
549 -  
550 if (pid_fd > 0) { 543 if (pid_fd > 0) {
551 ::close(pid_fd); 544 ::close(pid_fd);
552 pid_fd = -1; 545 pid_fd = -1;
@@ -570,7 +563,7 @@ void SrsServer::dispose() @@ -570,7 +563,7 @@ void SrsServer::dispose()
570 // @remark don't dispose ingesters, for too slow. 563 // @remark don't dispose ingesters, for too slow.
571 564
572 #ifdef SRS_AUTO_KAFKA 565 #ifdef SRS_AUTO_KAFKA
573 - kafka->stop(); 566 + srs_dispose_kafka();
574 #endif 567 #endif
575 568
576 // dispose the source for hls and dvr. 569 // dispose the source for hls and dvr.
@@ -655,6 +648,14 @@ int SrsServer::initialize_st() @@ -655,6 +648,14 @@ int SrsServer::initialize_st()
655 // set current log id. 648 // set current log id.
656 _srs_context->generate_id(); 649 _srs_context->generate_id();
657 650
  651 + // initialize the conponents that depends on st.
  652 +#ifdef SRS_AUTO_KAFKA
  653 + if ((ret = srs_initialize_kafka()) != ERROR_SUCCESS) {
  654 + srs_error("initialize kafka failed, ret=%d", ret);
  655 + return ret;
  656 + }
  657 +#endif
  658 +
658 // check asprocess. 659 // check asprocess.
659 bool asprocess = _srs_config->get_asprocess(); 660 bool asprocess = _srs_config->get_asprocess();
660 if (asprocess && ppid == 1) { 661 if (asprocess && ppid == 1) {
@@ -876,25 +877,6 @@ int SrsServer::ingest() @@ -876,25 +877,6 @@ int SrsServer::ingest()
876 return ret; 877 return ret;
877 } 878 }
878 879
879 -int SrsServer::start_kafka()  
880 -{  
881 - int ret = ERROR_SUCCESS;  
882 -  
883 -#ifdef SRS_AUTO_KAFKA  
884 - if ((ret = kafka->initialize()) != ERROR_SUCCESS) {  
885 - srs_error("initialize the kafka producer failed. ret=%d", ret);  
886 - return ret;  
887 - }  
888 -  
889 - if ((ret = kafka->start()) != ERROR_SUCCESS) {  
890 - srs_error("start kafka failed. ret=%d", ret);  
891 - return ret;  
892 - }  
893 -#endif  
894 -  
895 - return ret;  
896 -}  
897 -  
898 int SrsServer::cycle() 880 int SrsServer::cycle()
899 { 881 {
900 int ret = ERROR_SUCCESS; 882 int ret = ERROR_SUCCESS;
@@ -1338,7 +1320,7 @@ SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd) @@ -1338,7 +1320,7 @@ SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd)
1338 SrsConnection* conn = NULL; 1320 SrsConnection* conn = NULL;
1339 1321
1340 if (type == SrsListenerRtmpStream) { 1322 if (type == SrsListenerRtmpStream) {
1341 - conn = new SrsRtmpConn(this, kafka, stfd, ip); 1323 + conn = new SrsRtmpConn(this, stfd, ip);
1342 } else if (type == SrsListenerHttpApi) { 1324 } else if (type == SrsListenerHttpApi) {
1343 #ifdef SRS_AUTO_HTTP_API 1325 #ifdef SRS_AUTO_HTTP_API
1344 conn = new SrsHttpApi(this, stfd, http_api_mux, ip); 1326 conn = new SrsHttpApi(this, stfd, http_api_mux, ip);
@@ -254,9 +254,6 @@ private: @@ -254,9 +254,6 @@ private:
254 #ifdef SRS_AUTO_INGEST 254 #ifdef SRS_AUTO_INGEST
255 SrsIngester* ingester; 255 SrsIngester* ingester;
256 #endif 256 #endif
257 -#ifdef SRS_AUTO_KAFKA  
258 - SrsKafkaProducer* kafka;  
259 -#endif  
260 private: 257 private:
261 /** 258 /**
262 * the pid file fd, lock the file write when server is running. 259 * the pid file fd, lock the file write when server is running.
@@ -319,7 +316,6 @@ public: @@ -319,7 +316,6 @@ public:
319 virtual int register_signal(); 316 virtual int register_signal();
320 virtual int http_handle(); 317 virtual int http_handle();
321 virtual int ingest(); 318 virtual int ingest();
322 - virtual int start_kafka();  
323 virtual int cycle(); 319 virtual int cycle();
324 // server utilities. 320 // server utilities.
325 public: 321 public:
@@ -405,10 +405,6 @@ int run_master() @@ -405,10 +405,6 @@ int run_master()
405 return ret; 405 return ret;
406 } 406 }
407 407
408 - if ((ret = _srs_server->start_kafka()) != ERROR_SUCCESS) {  
409 - return ret;  
410 - }  
411 -  
412 if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) { 408 if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
413 return ret; 409 return ret;
414 } 410 }