winlin

refine the pithy print of ingesters.

@@ -39,11 +39,9 @@ using namespace std; @@ -39,11 +39,9 @@ using namespace std;
39 // ingest never sleep a long time, for we must start the stream ASAP. 39 // ingest never sleep a long time, for we must start the stream ASAP.
40 #define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) 40 #define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
41 41
42 -SrsIngesterFFMPEG::SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, string _vhost, string _id) 42 +SrsIngesterFFMPEG::SrsIngesterFFMPEG()
43 { 43 {
44 - ffmpeg = _ffmpeg;  
45 - vhost = _vhost;  
46 - id = _id; 44 + ffmpeg = NULL;
47 } 45 }
48 46
49 SrsIngesterFFMPEG::~SrsIngesterFFMPEG() 47 SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
@@ -51,6 +49,53 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG() @@ -51,6 +49,53 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
51 srs_freep(ffmpeg); 49 srs_freep(ffmpeg);
52 } 50 }
53 51
  52 +int SrsIngesterFFMPEG::initialize(SrsFFMPEG* ff, string v, string i)
  53 +{
  54 + int ret = ERROR_SUCCESS;
  55 +
  56 + ffmpeg = ff;
  57 + vhost = v;
  58 + id = i;
  59 + starttime = srs_get_system_time_ms();
  60 +
  61 + return ret;
  62 +}
  63 +
  64 +string SrsIngesterFFMPEG::uri()
  65 +{
  66 + return vhost + "/" + id;
  67 +}
  68 +
  69 +int SrsIngesterFFMPEG::alive()
  70 +{
  71 + return (int)(srs_get_system_time_ms() - starttime);
  72 +}
  73 +
  74 +bool SrsIngesterFFMPEG::equals(string v)
  75 +{
  76 + return vhost == v;
  77 +}
  78 +
  79 +bool SrsIngesterFFMPEG::equals(string v, string i)
  80 +{
  81 + return vhost == v && id == i;
  82 +}
  83 +
  84 +int SrsIngesterFFMPEG::start()
  85 +{
  86 + return ffmpeg->start();
  87 +}
  88 +
  89 +void SrsIngesterFFMPEG::stop()
  90 +{
  91 + ffmpeg->stop();
  92 +}
  93 +
  94 +int SrsIngesterFFMPEG::cycle()
  95 +{
  96 + return ffmpeg->cycle();
  97 +}
  98 +
54 void SrsIngesterFFMPEG::fast_stop() 99 void SrsIngesterFFMPEG::fast_stop()
55 { 100 {
56 ffmpeg->fast_stop(); 101 ffmpeg->fast_stop();
@@ -129,6 +174,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -129,6 +174,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
129 174
130 // get all engines. 175 // get all engines.
131 std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest); 176 std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest);
  177 +
  178 + // create ingesters without engines.
132 if (engines.empty()) { 179 if (engines.empty()) {
133 SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); 180 SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
134 if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) { 181 if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {
@@ -139,12 +186,17 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -139,12 +186,17 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
139 return ret; 186 return ret;
140 } 187 }
141 188
142 - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); 189 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
  190 + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
  191 + srs_freep(ingester);
  192 + return ret;
  193 + }
  194 +
143 ingesters.push_back(ingester); 195 ingesters.push_back(ingester);
144 return ret; 196 return ret;
145 } 197 }
146 198
147 - // create engine 199 + // create ingesters with engine
148 for (int i = 0; i < (int)engines.size(); i++) { 200 for (int i = 0; i < (int)engines.size(); i++) {
149 SrsConfDirective* engine = engines[i]; 201 SrsConfDirective* engine = engines[i];
150 SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); 202 SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
@@ -157,7 +209,12 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest @@ -157,7 +209,12 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
157 return ret; 209 return ret;
158 } 210 }
159 211
160 - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); 212 + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
  213 + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
  214 + srs_freep(ingester);
  215 + return ret;
  216 + }
  217 +
161 ingesters.push_back(ingester); 218 ingesters.push_back(ingester);
162 } 219 }
163 220
@@ -196,13 +253,13 @@ int SrsIngester::cycle() @@ -196,13 +253,13 @@ int SrsIngester::cycle()
196 SrsIngesterFFMPEG* ingester = *it; 253 SrsIngesterFFMPEG* ingester = *it;
197 254
198 // start all ffmpegs. 255 // start all ffmpegs.
199 - if ((ret = ingester->ffmpeg->start()) != ERROR_SUCCESS) { 256 + if ((ret = ingester->start()) != ERROR_SUCCESS) {
200 srs_error("ingest ffmpeg start failed. ret=%d", ret); 257 srs_error("ingest ffmpeg start failed. ret=%d", ret);
201 return ret; 258 return ret;
202 } 259 }
203 260
204 // check ffmpeg status. 261 // check ffmpeg status.
205 - if ((ret = ingester->ffmpeg->cycle()) != ERROR_SUCCESS) { 262 + if ((ret = ingester->cycle()) != ERROR_SUCCESS) {
206 srs_error("ingest ffmpeg cycle failed. ret=%d", ret); 263 srs_error("ingest ffmpeg cycle failed. ret=%d", ret);
207 return ret; 264 return ret;
208 } 265 }
@@ -376,11 +433,14 @@ void SrsIngester::show_ingest_log_message() @@ -376,11 +433,14 @@ void SrsIngester::show_ingest_log_message()
376 return; 433 return;
377 } 434 }
378 435
  436 + // random choose one ingester to report.
  437 + int index = rand() % (int)ingesters.size();
  438 + SrsIngesterFFMPEG* ingester = ingesters.at(index);
  439 +
379 // reportable 440 // reportable
380 if (pprint->can_print()) { 441 if (pprint->can_print()) {
381 - // TODO: FIXME: show more info.  
382 - srs_trace("-> "SRS_CONSTS_LOG_INGESTER  
383 - " time=%"PRId64", ingesters=%d", pprint->age(), (int)ingesters.size()); 442 + srs_trace("-> "SRS_CONSTS_LOG_INGESTER" time=%"PRId64", ingesters=%d, #%d(alive=%ds, %s)",
  443 + pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str());
384 } 444 }
385 } 445 }
386 446
@@ -407,16 +467,15 @@ int SrsIngester::on_reload_vhost_removed(string vhost) @@ -407,16 +467,15 @@ int SrsIngester::on_reload_vhost_removed(string vhost)
407 for (it = ingesters.begin(); it != ingesters.end();) { 467 for (it = ingesters.begin(); it != ingesters.end();) {
408 SrsIngesterFFMPEG* ingester = *it; 468 SrsIngesterFFMPEG* ingester = *it;
409 469
410 - if (ingester->vhost != vhost) { 470 + if (!ingester->equals(vhost)) {
411 ++it; 471 ++it;
412 continue; 472 continue;
413 } 473 }
414 474
415 // stop the ffmpeg and free it. 475 // stop the ffmpeg and free it.
416 - ingester->ffmpeg->stop(); 476 + ingester->stop();
417 477
418 - srs_trace("reload stop ingester, "  
419 - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); 478 + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());
420 479
421 srs_freep(ingester); 480 srs_freep(ingester);
422 481
@@ -436,16 +495,15 @@ int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id) @@ -436,16 +495,15 @@ int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id)
436 for (it = ingesters.begin(); it != ingesters.end();) { 495 for (it = ingesters.begin(); it != ingesters.end();) {
437 SrsIngesterFFMPEG* ingester = *it; 496 SrsIngesterFFMPEG* ingester = *it;
438 497
439 - if (ingester->vhost != vhost || ingester->id != ingest_id) { 498 + if (!ingester->equals(vhost, ingest_id)) {
440 ++it; 499 ++it;
441 continue; 500 continue;
442 } 501 }
443 502
444 // stop the ffmpeg and free it. 503 // stop the ffmpeg and free it.
445 - ingester->ffmpeg->stop(); 504 + ingester->stop();
446 505
447 - srs_trace("reload stop ingester, "  
448 - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); 506 + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());
449 507
450 srs_freep(ingester); 508 srs_freep(ingester);
451 509
@@ -45,14 +45,26 @@ class SrsPithyPrint; @@ -45,14 +45,26 @@ class SrsPithyPrint;
45 */ 45 */
46 class SrsIngesterFFMPEG 46 class SrsIngesterFFMPEG
47 { 47 {
48 -public: 48 +private:
49 std::string vhost; 49 std::string vhost;
50 std::string id; 50 std::string id;
51 SrsFFMPEG* ffmpeg; 51 SrsFFMPEG* ffmpeg;
52 -  
53 - SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, std::string _vhost, std::string _id); 52 + int64_t starttime;
  53 +public:
  54 + SrsIngesterFFMPEG();
54 virtual ~SrsIngesterFFMPEG(); 55 virtual ~SrsIngesterFFMPEG();
55 - 56 +public:
  57 + virtual int initialize(SrsFFMPEG* ff, std::string v, std::string i);
  58 + // the ingest uri, [vhost]/[ingest id]
  59 + virtual std::string uri();
  60 + // the alive in ms.
  61 + virtual int alive();
  62 + virtual bool equals(std::string v, std::string i);
  63 + virtual bool equals(std::string v);
  64 +public:
  65 + virtual int start();
  66 + virtual void stop();
  67 + virtual int cycle();
56 // @see SrsFFMPEG.fast_stop(). 68 // @see SrsFFMPEG.fast_stop().
57 virtual void fast_stop(); 69 virtual void fast_stop();
58 }; 70 };