winlin

support without kafka

@@ -28,6 +28,8 @@ @@ -28,6 +28,8 @@
28 #include <srs_app_config.hpp> 28 #include <srs_app_config.hpp>
29 #include <srs_app_async_call.hpp> 29 #include <srs_app_async_call.hpp>
30 30
  31 +#ifdef SRS_AUTO_KAFKA
  32 +
31 SrsKafkaProducer::SrsKafkaProducer() 33 SrsKafkaProducer::SrsKafkaProducer()
32 { 34 {
33 worker = new SrsAsyncCallWorker(); 35 worker = new SrsAsyncCallWorker();
@@ -66,3 +68,5 @@ void SrsKafkaProducer::stop() @@ -66,3 +68,5 @@ void SrsKafkaProducer::stop()
66 worker->stop(); 68 worker->stop();
67 } 69 }
68 70
  71 +#endif
  72 +
@@ -31,6 +31,11 @@ @@ -31,6 +31,11 @@
31 31
32 class SrsAsyncCallWorker; 32 class SrsAsyncCallWorker;
33 33
  34 +#ifdef SRS_AUTO_KAFKA
  35 +
  36 +/**
  37 + * the kafka producer used to save log to kafka cluster.
  38 + */
34 class SrsKafkaProducer 39 class SrsKafkaProducer
35 { 40 {
36 private: 41 private:
@@ -45,3 +50,6 @@ public: @@ -45,3 +50,6 @@ public:
45 }; 50 };
46 51
47 #endif 52 #endif
  53 +
  54 +#endif
  55 +
@@ -509,7 +509,9 @@ SrsServer::SrsServer() @@ -509,7 +509,9 @@ SrsServer::SrsServer()
509 #ifdef SRS_AUTO_INGEST 509 #ifdef SRS_AUTO_INGEST
510 ingester = NULL; 510 ingester = NULL;
511 #endif 511 #endif
  512 +#ifdef SRS_AUTO_KAFKA
512 kafka = new SrsKafkaProducer(); 513 kafka = new SrsKafkaProducer();
  514 +#endif
513 } 515 }
514 516
515 SrsServer::~SrsServer() 517 SrsServer::~SrsServer()
@@ -539,7 +541,9 @@ void SrsServer::destroy() @@ -539,7 +541,9 @@ void SrsServer::destroy()
539 srs_freep(ingester); 541 srs_freep(ingester);
540 #endif 542 #endif
541 543
  544 +#ifdef SRS_AUTO_KAFKA
542 srs_freep(kafka); 545 srs_freep(kafka);
  546 +#endif
543 547
544 if (pid_fd > 0) { 548 if (pid_fd > 0) {
545 ::close(pid_fd); 549 ::close(pid_fd);
@@ -565,7 +569,9 @@ void SrsServer::dispose() @@ -565,7 +569,9 @@ void SrsServer::dispose()
565 ingester->dispose(); 569 ingester->dispose();
566 #endif 570 #endif
567 571
  572 +#ifdef SRS_AUTO_KAFKA
568 kafka->stop(); 573 kafka->stop();
  574 +#endif
569 575
570 SrsSource::dispose_all(); 576 SrsSource::dispose_all();
571 577
@@ -874,12 +880,19 @@ int SrsServer::start_kafka() @@ -874,12 +880,19 @@ int SrsServer::start_kafka()
874 { 880 {
875 int ret = ERROR_SUCCESS; 881 int ret = ERROR_SUCCESS;
876 882
  883 +#ifdef SRS_AUTO_KAFKA
877 if ((ret = kafka->initialize()) != ERROR_SUCCESS) { 884 if ((ret = kafka->initialize()) != ERROR_SUCCESS) {
878 srs_error("initialize the kafka producer failed. ret=%d", ret); 885 srs_error("initialize the kafka producer failed. ret=%d", ret);
879 return ret; 886 return ret;
880 } 887 }
881 888
882 - return kafka->start(); 889 + if ((ret = kafka->start()) != ERROR_SUCCESS) {
  890 + srs_error("start kafka failed. ret=%d", ret);
  891 + return ret;
  892 + }
  893 +#endif
  894 +
  895 + return ret;
883 } 896 }
884 897
885 int SrsServer::cycle() 898 int SrsServer::cycle()
@@ -55,7 +55,9 @@ class SrsTcpListener; @@ -55,7 +55,9 @@ class SrsTcpListener;
55 #ifdef SRS_AUTO_STREAM_CASTER 55 #ifdef SRS_AUTO_STREAM_CASTER
56 class SrsAppCasterFlv; 56 class SrsAppCasterFlv;
57 #endif 57 #endif
  58 +#ifdef SRS_AUTO_KAFKA
58 class SrsKafkaProducer; 59 class SrsKafkaProducer;
  60 +#endif
59 61
60 // listener type for server to identify the connection, 62 // listener type for server to identify the connection,
61 // that is, use different type to process the connection. 63 // that is, use different type to process the connection.
@@ -248,7 +250,9 @@ private: @@ -248,7 +250,9 @@ private:
248 #ifdef SRS_AUTO_INGEST 250 #ifdef SRS_AUTO_INGEST
249 SrsIngester* ingester; 251 SrsIngester* ingester;
250 #endif 252 #endif
  253 +#ifdef SRS_AUTO_KAFKA
251 SrsKafkaProducer* kafka; 254 SrsKafkaProducer* kafka;
  255 +#endif
252 private: 256 private:
253 /** 257 /**
254 * the pid file fd, lock the file write when server is running. 258 * the pid file fd, lock the file write when server is running.
@@ -25,6 +25,8 @@ @@ -25,6 +25,8 @@
25 25
26 using namespace std; 26 using namespace std;
27 27
  28 +#ifdef SRS_AUTO_KAFKA
  29 +
28 SrsKafkaString::SrsKafkaString() 30 SrsKafkaString::SrsKafkaString()
29 { 31 {
30 size = -1; 32 size = -1;
@@ -194,3 +196,5 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() @@ -194,3 +196,5 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest()
194 { 196 {
195 } 197 }
196 198
  199 +#endif
  200 +
@@ -31,6 +31,8 @@ @@ -31,6 +31,8 @@
31 31
32 #include <vector> 32 #include <vector>
33 33
  34 +#ifdef SRS_AUTO_KAFKA
  35 +
34 // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys 36 // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
35 enum SrsKafkaApiKey 37 enum SrsKafkaApiKey
36 { 38 {
@@ -310,3 +312,5 @@ public: @@ -310,3 +312,5 @@ public:
310 312
311 #endif 313 #endif
312 314
  315 +#endif
  316 +