winlin

implements the ingest framework

@@ -97,7 +97,8 @@ vhost ingest.srs.com { @@ -97,7 +97,8 @@ vhost ingest.srs.com {
97 # whether enable ingest features 97 # whether enable ingest features
98 # default: off 98 # default: off
99 enable on; 99 enable on;
100 - # input file/stream/device, can be multiple input. 100 + # input file/stream/device
  101 + # @remark only support one input.
101 input { 102 input {
102 # the type of input. 103 # the type of input.
103 # can be file/stream/device, that is, 104 # can be file/stream/device, that is,
@@ -1645,6 +1645,17 @@ string SrsConfig::get_ingest_ffmpeg(SrsConfDirective* ingest) @@ -1645,6 +1645,17 @@ string SrsConfig::get_ingest_ffmpeg(SrsConfDirective* ingest)
1645 return conf->arg0(); 1645 return conf->arg0();
1646 } 1646 }
1647 1647
  1648 +string SrsConfig::get_ingest_input(SrsConfDirective* ingest)
  1649 +{
  1650 + SrsConfDirective* conf = ingest->get("input");
  1651 +
  1652 + if (!conf) {
  1653 + return "";
  1654 + }
  1655 +
  1656 + return conf->arg0();
  1657 +}
  1658 +
1648 string SrsConfig::get_srs_log_file() 1659 string SrsConfig::get_srs_log_file()
1649 { 1660 {
1650 srs_assert(root); 1661 srs_assert(root);
@@ -190,6 +190,7 @@ public: @@ -190,6 +190,7 @@ public:
190 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters); 190 virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters);
191 virtual bool get_ingest_enabled(SrsConfDirective* ingest); 191 virtual bool get_ingest_enabled(SrsConfDirective* ingest);
192 virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest); 192 virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest);
  193 + virtual std::string get_ingest_input(SrsConfDirective* ingest);
193 // log section 194 // log section
194 public: 195 public:
195 virtual bool get_srs_log_tank_file(); 196 virtual bool get_srs_log_tank_file();
@@ -49,15 +49,10 @@ int SrsIngester::start() @@ -49,15 +49,10 @@ int SrsIngester::start()
49 { 49 {
50 int ret = ERROR_SUCCESS; 50 int ret = ERROR_SUCCESS;
51 51
52 - // parse ingesters  
53 - std::vector<SrsConfDirective*> vhosts;  
54 - _srs_config->get_vhosts(vhosts);  
55 -  
56 - for (int i = 0; i < (int)vhosts.size(); i++) {  
57 - SrsConfDirective* vhost = vhosts[i];  
58 - if ((ret = parse_ingesters(vhost)) != ERROR_SUCCESS) {  
59 - return ret;  
60 - } 52 + if ((ret = parse()) != ERROR_SUCCESS) {
  53 + clear_engines();
  54 + ret = ERROR_SUCCESS;
  55 + return ret;
61 } 56 }
62 57
63 return ret; 58 return ret;
@@ -77,34 +72,55 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost) @@ -77,34 +72,55 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
77 continue; 72 continue;
78 } 73 }
79 74
80 - std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);  
81 - if (ffmpeg_bin.empty()) {  
82 - srs_trace("ignore the empty ffmpeg ingest: %s", ingest->arg0().c_str());  
83 - continue; 75 + if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) {
  76 + return ret;
84 } 77 }
  78 + }
85 79
86 - // get all engines.  
87 - std::vector<SrsConfDirective*> engines;  
88 - _srs_config->get_transcode_engines(ingest, engines);  
89 - if (engines.empty()) {  
90 - srs_trace("ignore the empty transcode engine: %s", ingest->arg0().c_str());  
91 - continue;  
92 - } 80 + return ret;
  81 +}
  82 +
  83 +int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest)
  84 +{
  85 + int ret = ERROR_SUCCESS;
  86 +
  87 + std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
  88 + if (ffmpeg_bin.empty()) {
  89 + ret = ERROR_ENCODER_PARSE;
  90 + srs_trace("empty ffmpeg ret=%d", ret);
  91 + return ret;
  92 + }
93 93
94 - // create engine  
95 - for (int i = 0; i < (int)engines.size(); i++) {  
96 - SrsConfDirective* engine = engines[i];  
97 - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);  
98 - if ((ret = initialize_ffmpeg(ffmpeg, ingest, engine)) != ERROR_SUCCESS) {  
99 - srs_freep(ffmpeg);  
100 - if (ret != ERROR_ENCODER_LOOP) {  
101 - srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str());  
102 - }  
103 - return ret; 94 + // get all engines.
  95 + std::vector<SrsConfDirective*> engines;
  96 + _srs_config->get_transcode_engines(ingest, engines);
  97 + if (engines.empty()) {
  98 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  99 + if ((ret = initialize_ffmpeg(ffmpeg, ingest, NULL)) != ERROR_SUCCESS) {
  100 + srs_freep(ffmpeg);
  101 + if (ret != ERROR_ENCODER_LOOP) {
  102 + srs_error("invalid ingest engine. ret=%d", ret);
104 } 103 }
105 -  
106 - ffmpegs.push_back(ffmpeg); 104 + return ret;
  105 + }
  106 +
  107 + ffmpegs.push_back(ffmpeg);
  108 + return ret;
  109 + }
  110 +
  111 + // create engine
  112 + for (int i = 0; i < (int)engines.size(); i++) {
  113 + SrsConfDirective* engine = engines[i];
  114 + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
  115 + if ((ret = initialize_ffmpeg(ffmpeg, ingest, engine)) != ERROR_SUCCESS) {
  116 + srs_freep(ffmpeg);
  117 + if (ret != ERROR_ENCODER_LOOP) {
  118 + srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str());
  119 + }
  120 + return ret;
107 } 121 }
  122 +
  123 + ffmpegs.push_back(ffmpeg);
108 } 124 }
109 125
110 return ret; 126 return ret;
@@ -136,11 +152,36 @@ void SrsIngester::clear_engines() @@ -136,11 +152,36 @@ void SrsIngester::clear_engines()
136 ffmpegs.clear(); 152 ffmpegs.clear();
137 } 153 }
138 154
  155 +int SrsIngester::parse()
  156 +{
  157 + int ret = ERROR_SUCCESS;
  158 +
  159 + // parse ingesters
  160 + std::vector<SrsConfDirective*> vhosts;
  161 + _srs_config->get_vhosts(vhosts);
  162 +
  163 + for (int i = 0; i < (int)vhosts.size(); i++) {
  164 + SrsConfDirective* vhost = vhosts[i];
  165 + if ((ret = parse_ingesters(vhost)) != ERROR_SUCCESS) {
  166 + return ret;
  167 + }
  168 + }
  169 +
  170 + return ret;
  171 +}
  172 +
139 int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine) 173 int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine)
140 { 174 {
141 int ret = ERROR_SUCCESS; 175 int ret = ERROR_SUCCESS;
142 176
143 - if (!_srs_config->get_engine_enabled(engine)) { 177 + std::string input = _srs_config->get_ingest_input(ingest);
  178 + if (input.empty()) {
  179 + ret = ERROR_ENCODER_NO_INPUT;
  180 + srs_trace("empty ingest intput. ret=%d", ret);
  181 + return ret;
  182 + }
  183 +
  184 + if (!engine || !_srs_config->get_engine_enabled(engine)) {
144 } 185 }
145 186
146 return ret; 187 return ret;
@@ -61,7 +61,9 @@ public: @@ -61,7 +61,9 @@ public:
61 virtual void on_thread_stop(); 61 virtual void on_thread_stop();
62 private: 62 private:
63 virtual void clear_engines(); 63 virtual void clear_engines();
  64 + virtual int parse();
64 virtual int parse_ingesters(SrsConfDirective* vhost); 65 virtual int parse_ingesters(SrsConfDirective* vhost);
  66 + virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest);
65 virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine); 67 virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine);
66 }; 68 };
67 69
@@ -153,6 +153,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -153,6 +153,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
153 #define ERROR_ENCODER_LOOP 714 153 #define ERROR_ENCODER_LOOP 714
154 #define ERROR_ENCODER_OPEN 715 154 #define ERROR_ENCODER_OPEN 715
155 #define ERROR_ENCODER_DUP2 716 155 #define ERROR_ENCODER_DUP2 716
  156 +#define ERROR_ENCODER_PARSE 717
  157 +#define ERROR_ENCODER_NO_INPUT 718
156 158
157 #define ERROR_HTTP_PARSE_URI 800 159 #define ERROR_HTTP_PARSE_URI 800
158 #define ERROR_HTTP_DATA_INVLIAD 801 160 #define ERROR_HTTP_DATA_INVLIAD 801