Blame view

trunk/src/app/srs_app_encoder.cpp 9.8 KB
winlin authored
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2014 winlin
winlin authored
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_encoder.hpp>
winlin authored
25
26 27 28
#include <algorithm>
using namespace std;
29
#include <srs_kernel_error.hpp>
30
#include <srs_kernel_log.hpp>
31
#include <srs_app_config.hpp>
32
#include <srs_protocol_rtmp.hpp>
33
#include <srs_app_pithy_print.hpp>
34
#include <srs_app_ffmpeg.hpp>
35
#include <srs_kernel_utility.hpp>
winlin authored
36
37
#ifdef SRS_AUTO_TRANSCODE
winlin authored
38
winlin authored
39
// when error, encoder sleep for a while and retry.
winlin authored
40
#define SRS_RTMP_ENCODER_SLEEP_US (int64_t)(3*1000*1000LL)
winlin authored
41
42 43 44
// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;
winlin authored
45 46
SrsEncoder::SrsEncoder()
{
47
    pthread = new SrsThread(this, SRS_RTMP_ENCODER_SLEEP_US, true);
winlin authored
48
    pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_ENCODER);
winlin authored
49 50 51 52
}

SrsEncoder::~SrsEncoder()
{
53 54 55
    on_unpublish();
    
    srs_freep(pthread);
56
    srs_freep(pithy_print);
winlin authored
57 58 59 60
}

int SrsEncoder::on_publish(SrsRequest* req)
{
61 62
    int ret = ERROR_SUCCESS;
winlin authored
63
    // parse the transcode engines for vhost and app and stream.
64 65 66
    ret = parse_scope_engines(req);
    
    // ignore the loop encoder
67
    // if got a loop, donot transcode the whole stream.
68 69 70 71 72 73 74 75 76
    if (ret == ERROR_ENCODER_LOOP) {
        clear_engines();
        ret = ERROR_SUCCESS;
    }
    
    // return for error or no engine.
    if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
        return ret;
    }
winlin authored
77 78
    
    // start thread to run all encoding engines.
79
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
winlin authored
80 81 82
        srs_error("st_thread_create failed. ret=%d", ret);
        return ret;
    }
83
    srs_trace("encoder thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
winlin authored
84
    
85
    return ret;
winlin authored
86 87 88 89
}

void SrsEncoder::on_unpublish()
{
90 91
    pthread->stop();
    clear_engines();
92 93 94 95
}

int SrsEncoder::cycle()
{
96 97 98 99 100 101 102 103
    int ret = ERROR_SUCCESS;
    
    std::vector<SrsFFMPEG*>::iterator it;
    for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
        SrsFFMPEG* ffmpeg = *it;
        
        // start all ffmpegs.
        if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
104
            srs_error("transcode ffmpeg start failed. ret=%d", ret);
105 106 107 108 109
            return ret;
        }

        // check ffmpeg status.
        if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {
110
            srs_error("transcode ffmpeg cycle failed. ret=%d", ret);
111 112 113 114 115 116
            return ret;
        }
    }

    // pithy print
    encoder();
117
    pithy_print->elapse();
118 119
    
    return ret;
120 121
}
122
void SrsEncoder::on_thread_stop()
123
{
124 125
    // kill ffmpeg when finished and it alive
    std::vector<SrsFFMPEG*>::iterator it;
126
127 128 129 130
    for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
        SrsFFMPEG* ffmpeg = *it;
        ffmpeg->stop();
    }
winlin authored
131 132 133 134
}

void SrsEncoder::clear_engines()
{
135 136 137 138
    std::vector<SrsFFMPEG*>::iterator it;
    
    for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
        SrsFFMPEG* ffmpeg = *it;
139 140 141
    
        std::string output = ffmpeg->output();
        
142 143 144 145
        std::vector<std::string>::iterator tu_it;
        tu_it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output);
        if (tu_it != _transcoded_url.end()) {
            _transcoded_url.erase(tu_it);
146 147
        }
        
148 149 150 151
        srs_freep(ffmpeg);
    }

    ffmpegs.clear();
winlin authored
152 153 154 155
}

SrsFFMPEG* SrsEncoder::at(int index)
{
156
    return ffmpegs[index];
winlin authored
157 158
}
159 160
int SrsEncoder::parse_scope_engines(SrsRequest* req)
{
161 162 163 164 165 166 167 168
    int ret = ERROR_SUCCESS;
    
    // parse all transcode engines.
    SrsConfDirective* conf = NULL;
    
    // parse vhost scope engines
    std::string scope = "";
    if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
169
        if ((ret = parse_ffmpeg(req, conf)) != ERROR_SUCCESS) {
winlin authored
170 171 172 173
            if (ret != ERROR_ENCODER_LOOP) {
                srs_error("parse vhost scope=%s transcode engines failed. "
                    "ret=%d", scope.c_str(), ret);
            }
174 175 176 177 178 179
            return ret;
        }
    }
    // parse app scope engines
    scope = req->app;
    if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
180
        if ((ret = parse_ffmpeg(req, conf)) != ERROR_SUCCESS) {
winlin authored
181 182 183 184
            if (ret != ERROR_ENCODER_LOOP) {
                srs_error("parse app scope=%s transcode engines failed. "
                    "ret=%d", scope.c_str(), ret);
            }
185 186 187 188 189 190 191
            return ret;
        }
    }
    // parse stream scope engines
    scope += "/";
    scope += req->stream;
    if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
192
        if ((ret = parse_ffmpeg(req, conf)) != ERROR_SUCCESS) {
winlin authored
193 194 195 196
            if (ret != ERROR_ENCODER_LOOP) {
                srs_error("parse stream scope=%s transcode engines failed. "
                    "ret=%d", scope.c_str(), ret);
            }
197 198 199 200 201
            return ret;
        }
    }
    
    return ret;
202 203
}
204
int SrsEncoder::parse_ffmpeg(SrsRequest* req, SrsConfDirective* conf)
winlin authored
205
{
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    int ret = ERROR_SUCCESS;
    
    srs_assert(conf);
    
    // enabled
    if (!_srs_config->get_transcode_enabled(conf)) {
        srs_trace("ignore the disabled transcode: %s", 
            conf->arg0().c_str());
        return ret;
    }
    
    // ffmpeg
    std::string ffmpeg_bin = _srs_config->get_transcode_ffmpeg(conf);
    if (ffmpeg_bin.empty()) {
        srs_trace("ignore the empty ffmpeg transcode: %s", 
            conf->arg0().c_str());
        return ret;
    }
    
    // get all engines.
226
    std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(conf);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
    if (engines.empty()) {
        srs_trace("ignore the empty transcode engine: %s", 
            conf->arg0().c_str());
        return ret;
    }
    
    // create engine
    for (int i = 0; i < (int)engines.size(); i++) {
        SrsConfDirective* engine = engines[i];
        if (!_srs_config->get_engine_enabled(engine)) {
            srs_trace("ignore the diabled transcode engine: %s %s", 
                conf->arg0().c_str(), engine->arg0().c_str());
            continue;
        }
        
        SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
243
        if ((ret = initialize_ffmpeg(ffmpeg, req, engine)) != ERROR_SUCCESS) {
244
            srs_freep(ffmpeg);
winlin authored
245 246 247
            if (ret != ERROR_ENCODER_LOOP) {
                srs_error("invalid transcode engine: %s %s", conf->arg0().c_str(), engine->arg0().c_str());
            }
248 249 250 251 252 253 254
            return ret;
        }

        ffmpegs.push_back(ffmpeg);
    }
    
    return ret;
winlin authored
255 256
}
257 258 259 260 261 262
int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDirective* engine)
{
    int ret = ERROR_SUCCESS;

    std::string input;
    // input stream, from local.
winlin authored
263 264 265 266
    // ie. rtmp://localhost:1935/live/livestream
    input = "rtmp://";
    input += SRS_CONSTS_LOCALHOST;
    input += ":";
267 268 269 270 271 272 273
    input += req->port;
    input += "/";
    input += req->app;
    input += "?vhost=";
    input += req->vhost;
    input += "/";
    input += req->stream;
winlin authored
274 275 276 277 278 279 280

    // stream name: vhost/app/stream for print
    input_stream_name = req->vhost;
    input_stream_name += "/";
    input_stream_name += req->app;
    input_stream_name += "/";
    input_stream_name += req->stream;
281 282 283
    
    std::string output = _srs_config->get_engine_output(engine);
    // output stream, to other/self server
winlin authored
284
    // ie. rtmp://localhost:1935/live/livestream_sd
285 286 287 288 289 290
    output = srs_string_replace(output, "[vhost]", req->vhost);
    output = srs_string_replace(output, "[port]", req->port);
    output = srs_string_replace(output, "[app]", req->app);
    output = srs_string_replace(output, "[stream]", req->stream);
    output = srs_string_replace(output, "[engine]", engine->arg0());
    
winlin authored
291
    std::string log_file = SRS_CONSTS_NULL_FILE; // disabled
292
    // write ffmpeg info to log file.
293 294 295 296 297 298 299 300 301 302 303 304
    if (_srs_config->get_ffmpeg_log_enabled()) {
        log_file = _srs_config->get_ffmpeg_log_dir();
        log_file += "/";
        log_file += "ffmpeg-encoder";
        log_file += "-";
        log_file += req->vhost;
        log_file += "-";
        log_file += req->app;
        log_file += "-";
        log_file += req->stream;
        log_file += ".log";
    }
305 306 307 308 309 310

    // important: loop check, donot transcode again.
    std::vector<std::string>::iterator it;
    it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);
    if (it != _transcoded_url.end()) {
        ret = ERROR_ENCODER_LOOP;
winlin authored
311
        srs_trace("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
312 313 314 315 316
            input.c_str(), output.c_str(), ret);
        return ret;
    }
    _transcoded_url.push_back(output);
    
winlin authored
317 318 319 320
    if ((ret = ffmpeg->initialize(input, output, log_file)) != ERROR_SUCCESS) {
        return ret;
    }
    if ((ret = ffmpeg->initialize_transcode(engine)) != ERROR_SUCCESS) {
321 322 323 324 325 326
        return ret;
    }
    
    return ret;
}
327
void SrsEncoder::encoder()
winlin authored
328
{
329 330 331
    // reportable
    if (pithy_print->can_print()) {
        // TODO: FIXME: show more info.
332
        srs_trace("-> "SRS_CONSTS_LOG_ENCODER" time=%"PRId64", encoders=%d, input=%s", 
333
            pithy_print->age(), (int)ffmpegs.size(), input_stream_name.c_str());
334
    }
winlin authored
335 336 337 338
}

#endif
339