winlin

add ingest config

@@ -913,6 +913,21 @@ SrsConfDirective* SrsConfig::get_vhost(string vhost) @@ -913,6 +913,21 @@ SrsConfDirective* SrsConfig::get_vhost(string vhost)
913 return NULL; 913 return NULL;
914 } 914 }
915 915
  916 +void SrsConfig::get_vhosts(std::vector<SrsConfDirective*>& vhosts)
  917 +{
  918 + srs_assert(root);
  919 +
  920 + for (int i = 0; i < (int)root->directives.size(); i++) {
  921 + SrsConfDirective* conf = root->at(i);
  922 +
  923 + if (!conf->is_vhost()) {
  924 + continue;
  925 + }
  926 +
  927 + vhosts.push_back(conf);
  928 + }
  929 +}
  930 +
916 SrsConfDirective* SrsConfig::get_vhost_on_connect(string vhost) 931 SrsConfDirective* SrsConfig::get_vhost_on_connect(string vhost)
917 { 932 {
918 SrsConfDirective* conf = get_vhost(vhost); 933 SrsConfDirective* conf = get_vhost(vhost);
@@ -1608,6 +1623,28 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& @@ -1608,6 +1623,28 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector<SrsConfDirective*>&
1608 return; 1623 return;
1609 } 1624 }
1610 1625
  1626 +bool SrsConfig::get_ingest_enabled(SrsConfDirective* ingest)
  1627 +{
  1628 + SrsConfDirective* conf = ingest->get("enable");
  1629 +
  1630 + if (!conf || conf->arg0() != "on") {
  1631 + return false;
  1632 + }
  1633 +
  1634 + return true;
  1635 +}
  1636 +
  1637 +string SrsConfig::get_ingest_ffmpeg(SrsConfDirective* ingest)
  1638 +{
  1639 + SrsConfDirective* conf = ingest->get("ffmpeg");
  1640 +
  1641 + if (!conf) {
  1642 + return "";
  1643 + }
  1644 +
  1645 + return conf->arg0();
  1646 +}
  1647 +
1611 string SrsConfig::get_srs_log_file() 1648 string SrsConfig::get_srs_log_file()
1612 { 1649 {
1613 srs_assert(root); 1650 srs_assert(root);
@@ -139,6 +139,7 @@ public: @@ -139,6 +139,7 @@ public:
139 // vhost section 139 // vhost section
140 public: 140 public:
141 virtual SrsConfDirective* get_vhost(std::string vhost); 141 virtual SrsConfDirective* get_vhost(std::string vhost);
  142 + virtual void get_vhosts(std::vector<SrsConfDirective*>& vhosts);
142 virtual bool get_vhost_enabled(std::string vhost); 143 virtual bool get_vhost_enabled(std::string vhost);
143 virtual bool get_vhost_enabled(SrsConfDirective* vhost); 144 virtual bool get_vhost_enabled(SrsConfDirective* vhost);
144 virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); 145 virtual SrsConfDirective* get_vhost_on_connect(std::string vhost);
@@ -184,9 +185,11 @@ public: @@ -184,9 +185,11 @@ public:
184 virtual int get_engine_achannels(SrsConfDirective* engine); 185 virtual int get_engine_achannels(SrsConfDirective* engine);
185 virtual void get_engine_aparams(SrsConfDirective* engine, std::vector<std::string>& aparams); 186 virtual void get_engine_aparams(SrsConfDirective* engine, std::vector<std::string>& aparams);
186 virtual std::string get_engine_output(SrsConfDirective* engine); 187 virtual std::string get_engine_output(SrsConfDirective* engine);
187 -// vhost ingest section 188 +// ingest section
188 public: 189 public:
189 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters); 190 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters);
  191 + virtual bool get_ingest_enabled(SrsConfDirective* ingest);
  192 + virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest);
190 // log section 193 // log section
191 public: 194 public:
192 virtual bool get_srs_log_tank_file(); 195 virtual bool get_srs_log_tank_file();
@@ -309,7 +309,10 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir @@ -309,7 +309,10 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir
309 } 309 }
310 _transcoded_url.push_back(output); 310 _transcoded_url.push_back(output);
311 311
312 - if ((ret = ffmpeg->initialize(input, output, log_file, engine)) != ERROR_SUCCESS) { 312 + if ((ret = ffmpeg->initialize(input, output, log_file)) != ERROR_SUCCESS) {
  313 + return ret;
  314 + }
  315 + if ((ret = ffmpeg->initialize_transcode(engine)) != ERROR_SUCCESS) {
313 return ret; 316 return ret;
314 } 317 }
315 318
@@ -78,7 +78,18 @@ string SrsFFMPEG::output() @@ -78,7 +78,18 @@ string SrsFFMPEG::output()
78 return _output; 78 return _output;
79 } 79 }
80 80
81 -int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* engine) 81 +int SrsFFMPEG::initialize(string in, string out, string log)
  82 +{
  83 + int ret = ERROR_SUCCESS;
  84 +
  85 + input = in;
  86 + _output = out;
  87 + log_file = log;
  88 +
  89 + return ret;
  90 +}
  91 +
  92 +int SrsFFMPEG::initialize_transcode(SrsConfDirective* engine)
82 { 93 {
83 int ret = ERROR_SUCCESS; 94 int ret = ERROR_SUCCESS;
84 95
@@ -102,10 +113,6 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e @@ -102,10 +113,6 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e
102 vwidth -= vwidth % 2; 113 vwidth -= vwidth % 2;
103 vheight -= vheight % 2; 114 vheight -= vheight % 2;
104 115
105 - input = in;  
106 - _output = out;  
107 - log_file = log;  
108 -  
109 if (vcodec == SRS_ENCODER_NO_VIDEO && acodec == SRS_ENCODER_NO_AUDIO) { 116 if (vcodec == SRS_ENCODER_NO_VIDEO && acodec == SRS_ENCODER_NO_AUDIO) {
110 ret = ERROR_ENCODER_VCODEC; 117 ret = ERROR_ENCODER_VCODEC;
111 srs_warn("video and audio disabled. ret=%d", ret); 118 srs_warn("video and audio disabled. ret=%d", ret);
@@ -191,6 +198,22 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e @@ -191,6 +198,22 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e
191 return ret; 198 return ret;
192 } 199 }
193 200
  201 +int SrsFFMPEG::initialize_copy()
  202 +{
  203 + int ret = ERROR_SUCCESS;
  204 +
  205 + vcodec = SRS_ENCODER_COPY;
  206 + acodec = SRS_ENCODER_COPY;
  207 +
  208 + if (_output.empty()) {
  209 + ret = ERROR_ENCODER_OUTPUT;
  210 + srs_error("invalid empty output, ret=%d", ret);
  211 + return ret;
  212 + }
  213 +
  214 + return ret;
  215 +}
  216 +
194 int SrsFFMPEG::start() 217 int SrsFFMPEG::start()
195 { 218 {
196 int ret = ERROR_SUCCESS; 219 int ret = ERROR_SUCCESS;
@@ -74,7 +74,9 @@ public: @@ -74,7 +74,9 @@ public:
74 public: 74 public:
75 virtual std::string output(); 75 virtual std::string output();
76 public: 76 public:
77 - virtual int initialize(std::string in, std::string out, std::string log, SrsConfDirective* engine); 77 + virtual int initialize(std::string in, std::string out, std::string log);
  78 + virtual int initialize_transcode(SrsConfDirective* engine);
  79 + virtual int initialize_copy();
78 virtual int start(); 80 virtual int start();
79 virtual int cycle(); 81 virtual int cycle();
80 virtual void stop(); 82 virtual void stop();
@@ -26,6 +26,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,6 +26,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #ifdef SRS_INGEST 26 #ifdef SRS_INGEST
27 27
28 #include <srs_kernel_error.hpp> 28 #include <srs_kernel_error.hpp>
  29 +#include <srs_app_config.hpp>
  30 +#include <srs_kernel_log.hpp>
  31 +#include <srs_app_ffmpeg.hpp>
29 32
30 // when error, ingester sleep for a while and retry. 33 // when error, ingester sleep for a while and retry.
31 #define SRS_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) 34 #define SRS_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -39,11 +42,71 @@ SrsIngester::SrsIngester() @@ -39,11 +42,71 @@ SrsIngester::SrsIngester()
39 SrsIngester::~SrsIngester() 42 SrsIngester::~SrsIngester()
40 { 43 {
41 srs_freep(pthread); 44 srs_freep(pthread);
  45 + clear_engines();
42 } 46 }
43 47
44 int SrsIngester::start() 48 int SrsIngester::start()
45 { 49 {
46 int ret = ERROR_SUCCESS; 50 int ret = ERROR_SUCCESS;
  51 +
  52 + // parse ingesters
  53 + std::vector<SrsConfDirective*> vhosts;
  54 + _srs_config->get_vhosts(vhosts);
  55 +
  56 + for (int i = 0; i < (int)vhosts.size(); i++) {
  57 + SrsConfDirective* vhost = vhosts[i];
  58 + if ((ret = parse_ingesters(vhost)) != ERROR_SUCCESS) {
  59 + return ret;
  60 + }
  61 + }
  62 +
  63 + return ret;
  64 +}
  65 +
  66 +int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
  67 +{
  68 + int ret = ERROR_SUCCESS;
  69 +
  70 + std::vector<SrsConfDirective*> ingesters;
  71 + _srs_config->get_ingesters(vhost->arg0(), ingesters);
  72 +
  73 + // create engine
  74 + for (int i = 0; i < (int)ingesters.size(); i++) {
  75 + SrsConfDirective* ingest = ingesters[i];
  76 + if (!_srs_config->get_ingest_enabled(ingest)) {
  77 + continue;
  78 + }
  79 +
  80 + std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
  81 + if (ffmpeg_bin.empty()) {
  82 + srs_trace("ignore the empty ffmpeg ingest: %s", ingest->arg0().c_str());
  83 + continue;
  84 + }
  85 +
  86 + // get all engines.
  87 + std::vector<SrsConfDirective*> engines;
  88 + _srs_config->get_transcode_engines(ingest, engines);
  89 + if (engines.empty()) {
  90 + srs_trace("ignore the empty transcode engine: %s", ingest->arg0().c_str());
  91 + continue;
  92 + }
  93 +
  94 + // create engine
  95 + for (int i = 0; i < (int)engines.size(); i++) {
  96 + SrsConfDirective* engine = engines[i];
  97 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  98 + if ((ret = initialize_ffmpeg(ffmpeg, ingest, engine)) != ERROR_SUCCESS) {
  99 + srs_freep(ffmpeg);
  100 + if (ret != ERROR_ENCODER_LOOP) {
  101 + srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str());
  102 + }
  103 + return ret;
  104 + }
  105 +
  106 + ffmpegs.push_back(ffmpeg);
  107 + }
  108 + }
  109 +
47 return ret; 110 return ret;
48 } 111 }
49 112
@@ -61,4 +124,26 @@ void SrsIngester::on_thread_stop() @@ -61,4 +124,26 @@ void SrsIngester::on_thread_stop()
61 { 124 {
62 } 125 }
63 126
  127 +void SrsIngester::clear_engines()
  128 +{
  129 + std::vector<SrsFFMPEG*>::iterator it;
  130 +
  131 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  132 + SrsFFMPEG* ffmpeg = *it;
  133 + srs_freep(ffmpeg);
  134 + }
  135 +
  136 + ffmpegs.clear();
  137 +}
  138 +
  139 +int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine)
  140 +{
  141 + int ret = ERROR_SUCCESS;
  142 +
  143 + if (!_srs_config->get_engine_enabled(engine)) {
  144 + }
  145 +
  146 + return ret;
  147 +}
  148 +
64 #endif 149 #endif
@@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 #include <srs_app_thread.hpp> 36 #include <srs_app_thread.hpp>
37 37
38 class SrsFFMPEG; 38 class SrsFFMPEG;
  39 +class SrsConfDirective;
39 40
40 /** 41 /**
41 * ingest file/stream/device, 42 * ingest file/stream/device,
@@ -58,6 +59,10 @@ public: @@ -58,6 +59,10 @@ public:
58 public: 59 public:
59 virtual int cycle(); 60 virtual int cycle();
60 virtual void on_thread_stop(); 61 virtual void on_thread_stop();
  62 +private:
  63 + virtual void clear_engines();
  64 + virtual int parse_ingesters(SrsConfDirective* vhost);
  65 + virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine);
61 }; 66 };
62 67
63 #endif 68 #endif
@@ -373,7 +373,7 @@ int SrsServer::listen() @@ -373,7 +373,7 @@ int SrsServer::listen()
373 return ret; 373 return ret;
374 } 374 }
375 375
376 -int SrsServer::cycle() 376 +int SrsServer::ingest()
377 { 377 {
378 int ret = ERROR_SUCCESS; 378 int ret = ERROR_SUCCESS;
379 379
@@ -383,6 +383,13 @@ int SrsServer::cycle() @@ -383,6 +383,13 @@ int SrsServer::cycle()
383 return ret; 383 return ret;
384 } 384 }
385 #endif 385 #endif
  386 +
  387 + return ret;
  388 +}
  389 +
  390 +int SrsServer::cycle()
  391 +{
  392 + int ret = ERROR_SUCCESS;
386 393
387 // the deamon thread, update the time cache 394 // the deamon thread, update the time cache
388 while (true) { 395 while (true) {
@@ -100,6 +100,7 @@ public: @@ -100,6 +100,7 @@ public:
100 virtual int acquire_pid_file(); 100 virtual int acquire_pid_file();
101 virtual int initialize_st(); 101 virtual int initialize_st();
102 virtual int listen(); 102 virtual int listen();
  103 + virtual int ingest();
103 virtual int cycle(); 104 virtual int cycle();
104 virtual void remove(SrsConnection* conn); 105 virtual void remove(SrsConnection* conn);
105 virtual void on_signal(int signo); 106 virtual void on_signal(int signo);
@@ -73,6 +73,10 @@ int run_master() @@ -73,6 +73,10 @@ int run_master()
73 return ret; 73 return ret;
74 } 74 }
75 75
  76 + if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
  77 + return ret;
  78 + }
  79 +
76 if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) { 80 if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
77 return ret; 81 return ret;
78 } 82 }