winlin

refine ingester, add vhost/id info to ingester

@@ -25,6 +25,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #ifdef SRS_INGEST 26 #ifdef SRS_INGEST
27 27
  28 +using namespace std;
  29 +
28 #include <srs_kernel_error.hpp> 30 #include <srs_kernel_error.hpp>
29 #include <srs_app_config.hpp> 31 #include <srs_app_config.hpp>
30 #include <srs_kernel_log.hpp> 32 #include <srs_kernel_log.hpp>
@@ -35,6 +37,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +37,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 // ingest never sleep a long time, for we must start the stream ASAP. 37 // ingest never sleep a long time, for we must start the stream ASAP.
36 #define SRS_INGESTER_SLEEP_US (int64_t)(6*100*1000LL) 38 #define SRS_INGESTER_SLEEP_US (int64_t)(6*100*1000LL)
37 39
  40 +SrsIngesterFFMPEG::SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, string _vhost, string _id)
  41 +{
  42 + ffmpeg = _ffmpeg;
  43 + vhost = _vhost;
  44 + id = _id;
  45 +}
  46 +
  47 +SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
  48 +{
  49 + srs_freep(ffmpeg);
  50 +}
  51 +
38 SrsIngester::SrsIngester() 52 SrsIngester::SrsIngester()
39 { 53 {
40 // TODO: FIXME: support reload. 54 // TODO: FIXME: support reload.
@@ -59,7 +73,7 @@ int SrsIngester::start() @@ -59,7 +73,7 @@ int SrsIngester::start()
59 } 73 }
60 74
61 // return for error or no engine. 75 // return for error or no engine.
62 - if (ffmpegs.empty()) { 76 + if (ingesters.empty()) {
63 return ret; 77 return ret;
64 } 78 }
65 79
@@ -118,7 +132,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -118,7 +132,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
118 return ret; 132 return ret;
119 } 133 }
120 134
121 - ffmpegs.push_back(ffmpeg); 135 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0());
  136 + ingesters.push_back(ingester);
122 return ret; 137 return ret;
123 } 138 }
124 139
@@ -135,7 +150,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -135,7 +150,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
135 return ret; 150 return ret;
136 } 151 }
137 152
138 - ffmpegs.push_back(ffmpeg); 153 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0());
  154 + ingesters.push_back(ingester);
139 } 155 }
140 156
141 return ret; 157 return ret;
@@ -151,18 +167,18 @@ int SrsIngester::cycle() @@ -151,18 +167,18 @@ int SrsIngester::cycle()
151 { 167 {
152 int ret = ERROR_SUCCESS; 168 int ret = ERROR_SUCCESS;
153 169
154 - std::vector<SrsFFMPEG*>::iterator it;  
155 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
156 - SrsFFMPEG* ffmpeg = *it; 170 + std::vector<SrsIngesterFFMPEG*>::iterator it;
  171 + for (it = ingesters.begin(); it != ingesters.end(); ++it) {
  172 + SrsIngesterFFMPEG* ingester = *it;
157 173
158 // start all ffmpegs. 174 // start all ffmpegs.
159 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) { 175 + if ((ret = ingester->ffmpeg->start()) != ERROR_SUCCESS) {
160 srs_error("ingest ffmpeg start failed. ret=%d", ret); 176 srs_error("ingest ffmpeg start failed. ret=%d", ret);
161 return ret; 177 return ret;
162 } 178 }
163 179
164 // check ffmpeg status. 180 // check ffmpeg status.
165 - if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) { 181 + if ((ret = ingester->ffmpeg->cycle()) != ERROR_SUCCESS) {
166 srs_error("ingest ffmpeg cycle failed. ret=%d", ret); 182 srs_error("ingest ffmpeg cycle failed. ret=%d", ret);
167 return ret; 183 return ret;
168 } 184 }
@@ -181,14 +197,14 @@ void SrsIngester::on_thread_stop() @@ -181,14 +197,14 @@ void SrsIngester::on_thread_stop()
181 197
182 void SrsIngester::clear_engines() 198 void SrsIngester::clear_engines()
183 { 199 {
184 - std::vector<SrsFFMPEG*>::iterator it; 200 + std::vector<SrsIngesterFFMPEG*>::iterator it;
185 201
186 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
187 - SrsFFMPEG* ffmpeg = *it;  
188 - srs_freep(ffmpeg); 202 + for (it = ingesters.begin(); it != ingesters.end(); ++it) {
  203 + SrsIngesterFFMPEG* ingester = *it;
  204 + srs_freep(ingester);
189 } 205 }
190 206
191 - ffmpegs.clear(); 207 + ingesters.clear();
192 } 208 }
193 209
194 int SrsIngester::parse() 210 int SrsIngester::parse()
@@ -333,7 +349,7 @@ void SrsIngester::ingester() @@ -333,7 +349,7 @@ void SrsIngester::ingester()
333 if (pithy_print->can_print()) { 349 if (pithy_print->can_print()) {
334 // TODO: FIXME: show more info. 350 // TODO: FIXME: show more info.
335 srs_trace("-> time=%"PRId64", ingesters=%d, input=%s", 351 srs_trace("-> time=%"PRId64", ingesters=%d, input=%s",
336 - pithy_print->get_age(), (int)ffmpegs.size(), input_stream_name.c_str()); 352 + pithy_print->get_age(), (int)ingesters.size(), input_stream_name.c_str());
337 } 353 }
338 } 354 }
339 355
@@ -40,6 +40,19 @@ class SrsConfDirective; @@ -40,6 +40,19 @@ class SrsConfDirective;
40 class SrsPithyPrint; 40 class SrsPithyPrint;
41 41
42 /** 42 /**
  43 +* ingester ffmpeg object.
  44 +*/
  45 +struct SrsIngesterFFMPEG
  46 +{
  47 + std::string vhost;
  48 + std::string id;
  49 + SrsFFMPEG* ffmpeg;
  50 +
  51 + SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, std::string _vhost, std::string _id);
  52 + virtual ~SrsIngesterFFMPEG();
  53 +};
  54 +
  55 +/**
43 * ingest file/stream/device, 56 * ingest file/stream/device,
44 * encode with FFMPEG(optional), 57 * encode with FFMPEG(optional),
45 * push to SRS(or any RTMP server) over RTMP. 58 * push to SRS(or any RTMP server) over RTMP.
@@ -48,7 +61,7 @@ class SrsIngester : public ISrsThreadHandler @@ -48,7 +61,7 @@ class SrsIngester : public ISrsThreadHandler
48 { 61 {
49 private: 62 private:
50 std::string input_stream_name; 63 std::string input_stream_name;
51 - std::vector<SrsFFMPEG*> ffmpegs; 64 + std::vector<SrsIngesterFFMPEG*> ingesters;
52 private: 65 private:
53 SrsThread* pthread; 66 SrsThread* pthread;
54 SrsPithyPrint* pithy_print; 67 SrsPithyPrint* pithy_print;