winlin

add encoder/hls pithy print

... ... @@ -6,6 +6,10 @@ listen 1935;
# performance about 10%.
# if not specified, set to 4096.
chunk_size 65000;
# the logs dir.
# if enabled ffmpeg, each stracoding stream will create a log file.
# default: ./objs/logs
log_dir ./objs/logs;
# vhost list, the __defaultVhost__ is the default vhost
# for which cannot identify the required vhost.
# for default demo.
... ... @@ -76,10 +80,10 @@ vhost dev {
hls_path ./objs/nginx/html;
hls_fragment 5;
hls_window 30;
#forward 127.0.0.1:19350;
forward 127.0.0.1:19350;
#forward 127.0.0.1:1936;
transcode {
enabled off;
enabled on;
ffmpeg ./objs/ffmpeg/bin/ffmpeg;
engine dev {
enabled on;
... ... @@ -511,6 +515,12 @@ pithy_print {
# shared print interval for all forwarders, in milliseconds.
# if not specified, set to 2000.
forwarder 3000;
# shared print interval for all encoders, in milliseconds.
# if not specified, set to 2000.
encoder 3000;
# shared print interval for all hls, in milliseconds.
# if not specified, set to 2000.
hls 3000;
}
... ...
... ... @@ -144,6 +144,9 @@ else
echo -e "${YELLOW}warning: without live stream transcoding over FFMPEG support${BLACK}"
fi
# mkdir dirs
mkdir -p ${SRS_OBJS}/logs
# next step.
echo ""
echo "you can:"
... ...
... ... @@ -904,6 +904,17 @@ std::string SrsConfig::get_engine_output(SrsConfDirective* engine)
return conf->arg0();
}
std::string SrsConfig::get_log_dir()
{
srs_assert(root);
SrsConfDirective* conf = root->get("log_dir");
if (!conf || conf->arg0().empty()) {
return "./objs/logs";
}
return conf->arg0();
}
SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost)
{
... ... @@ -1049,6 +1060,26 @@ SrsConfDirective* SrsConfig::get_pithy_print_forwarder()
return pithy->get("forwarder");
}
SrsConfDirective* SrsConfig::get_pithy_print_hls()
{
SrsConfDirective* pithy = root->get("pithy_print");
if (!pithy) {
return NULL;
}
return pithy->get("hls");
}
SrsConfDirective* SrsConfig::get_pithy_print_encoder()
{
SrsConfDirective* pithy = root->get("encoder");
if (!pithy) {
return NULL;
}
return pithy->get("forwarder");
}
SrsConfDirective* SrsConfig::get_pithy_print_play()
{
SrsConfDirective* pithy = root->get("pithy_print");
... ...
... ... @@ -140,6 +140,7 @@ public:
virtual int get_engine_achannels(SrsConfDirective* engine);
virtual void get_engine_aparams(SrsConfDirective* engine, std::vector<std::string>& aparams);
virtual std::string get_engine_output(SrsConfDirective* engine);
virtual std::string get_log_dir();
virtual SrsConfDirective* get_gop_cache(std::string vhost);
virtual SrsConfDirective* get_forward(std::string vhost);
virtual SrsConfDirective* get_hls(std::string vhost);
... ... @@ -154,6 +155,8 @@ public:
virtual SrsConfDirective* get_chunk_size();
virtual SrsConfDirective* get_pithy_print_publish();
virtual SrsConfDirective* get_pithy_print_forwarder();
virtual SrsConfDirective* get_pithy_print_encoder();
virtual SrsConfDirective* get_pithy_print_hls();
virtual SrsConfDirective* get_pithy_print_play();
private:
virtual int parse_file(const char* filename);
... ...
... ... @@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <algorithm>
... ... @@ -33,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_log.hpp>
#include <srs_core_config.hpp>
#include <srs_core_rtmp.hpp>
#include <srs_core_pithy_print.hpp>
#ifdef SRS_FFMPEG
... ... @@ -58,6 +60,8 @@ SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
abitrate = 0;
asample_rate = 0;
achannels = 0;
log_fd = -1;
}
SrsFFMPEG::~SrsFFMPEG()
... ... @@ -109,6 +113,18 @@ int SrsFFMPEG::initialize(SrsRequest* req, SrsConfDirective* engine)
output = srs_replace(output, "[stream]", req->stream);
output = srs_replace(output, "[engine]", engine->arg0());
// write ffmpeg info to log file.
log_file = config->get_log_dir();
log_file += "/";
log_file += "encoder";
log_file += "-";
log_file += req->vhost;
log_file += "-";
log_file += req->app;
log_file += "-";
log_file += req->stream;
log_file += ".log";
// important: loop check, donot transcode again.
std::vector<std::string>::iterator it;
it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);
... ... @@ -316,7 +332,8 @@ int SrsFFMPEG::start()
snprintf(p, last - p, "%s ", ffp.c_str());
p += ffp.length() + 1;
}
srs_trace("start transcoder: %s", pparam);
srs_trace("start transcoder, log: %s, params: %s",
log_file.c_str(), pparam);
srs_freepa(pparam);
}
... ... @@ -329,6 +346,30 @@ int SrsFFMPEG::start()
// child process: ffmpeg encoder engine.
if (pid == 0) {
// redirect logs to file.
int flags = O_CREAT|O_WRONLY|O_APPEND;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {
ret = ERROR_ENCODER_OPEN;
srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret);
return ret;
}
if (dup2(log_fd, STDOUT_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
return ret;
}
if (dup2(log_fd, STDERR_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
return ret;
}
// close other fds
// TODO: do in right way.
for (int i = 3; i < 1024; i++) {
::close(i);
}
// memory leak in child process, it's ok.
char** charpv_params = new char*[params.size() + 1];
for (int i = 0; i < (int)params.size(); i++) {
... ... @@ -389,6 +430,11 @@ int SrsFFMPEG::cycle()
void SrsFFMPEG::stop()
{
if (log_fd > 0) {
::close(log_fd);
log_fd = -1;
}
if (!started) {
return;
}
... ... @@ -598,6 +644,8 @@ void SrsEncoder::encoder_cycle()
log_context->generate_id();
srs_trace("encoder cycle start");
SrsPithyPrint pithy_print(SRS_STAGE_ENCODER);
while (loop) {
if ((ret = cycle()) != ERROR_SUCCESS) {
srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
... ... @@ -609,6 +657,9 @@ void SrsEncoder::encoder_cycle()
break;
}
encoder(&pithy_print);
pithy_print.elapse(SRS_ENCODER_SLEEP_MS);
st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
}
... ... @@ -622,6 +673,15 @@ void SrsEncoder::encoder_cycle()
srs_trace("encoder cycle finished");
}
void SrsEncoder::encoder(SrsPithyPrint* pithy_print)
{
// reportable
if (pithy_print->can_print()) {
srs_trace("-> time=%"PRId64", encoders=%d",
pithy_print->get_age(), (int)ffmpegs.size());
}
}
void* SrsEncoder::encoder_thread(void* arg)
{
SrsEncoder* obj = (SrsEncoder*)arg;
... ...
... ... @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsConfDirective;
class SrsRequest;
class SrsPithyPrint;
#ifdef SRS_FFMPEG
... ... @@ -49,6 +50,9 @@ private:
bool started;
pid_t pid;
private:
std::string log_file;
int log_fd;
private:
std::string ffmpeg;
std::vector<std::string> vfilter;
std::string vcodec;
... ... @@ -101,6 +105,7 @@ private:
virtual int parse_transcode(SrsRequest* req, SrsConfDirective* conf);
virtual int cycle();
virtual void encoder_cycle();
virtual void encoder(SrsPithyPrint* pithy_print);
static void* encoder_thread(void* arg);
};
... ...
... ... @@ -139,5 +139,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_ENCODER_VBITRATE 712
#define ERROR_ENCODER_FORK 713
#define ERROR_ENCODER_LOOP 714
#define ERROR_ENCODER_OPEN 715
#define ERROR_ENCODER_DUP2 716
#endif
\ No newline at end of file
... ...
... ... @@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_rtmp.hpp>
#include <srs_core_pithy_print.hpp>
// max PES packets size to flush the video.
#define SRS_HLS_AUDIO_CACHE_SIZE 512 * 1024
... ... @@ -1110,6 +1111,8 @@ SrsHls::SrsHls()
muxer = new SrsM3u8Muxer();
ts_cache = new SrsTSCache();
pithy_print = new SrsPithyPrint(SRS_STAGE_HLS);
}
SrsHls::~SrsHls()
... ... @@ -1120,6 +1123,8 @@ SrsHls::~SrsHls()
srs_freep(muxer);
srs_freep(ts_cache);
srs_freep(pithy_print);
}
int SrsHls::on_publish(SrsRequest* req)
... ... @@ -1328,8 +1333,20 @@ int SrsHls::on_video(SrsSharedPtrMessage* video)
return ret;
}
_mpegts();
return ret;
}
void SrsHls::_mpegts()
{
// reportable
if (pithy_print->can_print()) {
srs_trace("-> time=%"PRId64"", pithy_print->get_age());
}
pithy_print->elapse(sample->cts);
}
#endif
... ...
... ... @@ -43,6 +43,7 @@ class SrsRtmpJitter;
class SrsTSMuxer;
class SrsCodec;
class SrsRequest;
class SrsPithyPrint;
/**
* jitter correct for audio,
... ... @@ -211,6 +212,7 @@ private:
SrsCodec* codec;
SrsCodecSample* sample;
SrsRtmpJitter* jitter;
SrsPithyPrint* pithy_print;
public:
SrsHls();
virtual ~SrsHls();
... ... @@ -220,6 +222,8 @@ public:
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsSharedPtrMessage* audio);
virtual int on_video(SrsSharedPtrMessage* video);
private:
virtual void _mpegts();
};
#endif
... ...
... ... @@ -35,6 +35,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_PLAY_USER_INTERVAL_MS 1300
#define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100
#define SRS_STAGE_FORWARDER_INTERVAL_MS 2000
#define SRS_STAGE_ENCODER_INTERVAL_MS 2000
#define SRS_STAGE_HLS_INTERVAL_MS 2000
struct SrsStageInfo : public SrsReloadHandler
{
... ... @@ -82,6 +84,22 @@ struct SrsStageInfo : public SrsReloadHandler
}
break;
}
case SRS_STAGE_ENCODER: {
pithy_print_time_ms = SRS_STAGE_ENCODER_INTERVAL_MS;
SrsConfDirective* conf = config->get_pithy_print_encoder();
if (conf && !conf->arg0().empty()) {
pithy_print_time_ms = ::atoi(conf->arg0().c_str());
}
break;
}
case SRS_STAGE_HLS: {
pithy_print_time_ms = SRS_STAGE_HLS_INTERVAL_MS;
SrsConfDirective* conf = config->get_pithy_print_hls();
if (conf && !conf->arg0().empty()) {
pithy_print_time_ms = ::atoi(conf->arg0().c_str());
}
break;
}
default: {
pithy_print_time_ms = SRS_STAGE_DEFAULT_INTERVAL_MS;
break;
... ...
... ... @@ -36,6 +36,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_PUBLISH_USER 2
// the pithy stage for all forward clients.
#define SRS_STAGE_FORWARDER 3
// the pithy stage for all encoders.
#define SRS_STAGE_ENCODER 4
// the pithy stage for all hls.
#define SRS_STAGE_HLS 5
/**
* the stage is used for a collection of object to do print,
... ...
... ... @@ -48,6 +48,8 @@ int main(int argc, char** argv){
return ret;
}
// TODO: create log dir in config->get_log_dir()
if ((ret = _server()->listen()) != ERROR_SUCCESS) {
return ret;
}
... ...
... ... @@ -52,6 +52,7 @@ file
..\core\srs_core_log.cpp,
research readonly separator,
..\..\research\ts_info.cc;
mainconfig
"" = "MAIN";
... ...