winlin

refine code, rename the sync call to common class.

... ... @@ -31,65 +31,65 @@ using namespace std;
// the sleep interval for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_SLEEP_US 300000
ISrsDvrAsyncCall::ISrsDvrAsyncCall()
ISrsAsyncCallTask::ISrsAsyncCallTask()
{
}
ISrsDvrAsyncCall::~ISrsDvrAsyncCall()
ISrsAsyncCallTask::~ISrsAsyncCallTask()
{
}
SrsDvrAsyncCallThread::SrsDvrAsyncCallThread()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true);
}
SrsDvrAsyncCallThread::~SrsDvrAsyncCallThread()
SrsAsyncCallWorker::~SrsAsyncCallWorker()
{
stop();
srs_freep(pthread);
std::vector<ISrsDvrAsyncCall*>::iterator it;
for (it = callbacks.begin(); it != callbacks.end(); ++it) {
ISrsDvrAsyncCall* call = *it;
srs_freep(call);
std::vector<ISrsAsyncCallTask*>::iterator it;
for (it = tasks.begin(); it != tasks.end(); ++it) {
ISrsAsyncCallTask* task = *it;
srs_freep(task);
}
callbacks.clear();
tasks.clear();
}
int SrsDvrAsyncCallThread::call(ISrsDvrAsyncCall* c)
int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
{
int ret = ERROR_SUCCESS;
callbacks.push_back(c);
tasks.push_back(t);
return ret;
}
int SrsDvrAsyncCallThread::start()
int SrsAsyncCallWorker::start()
{
return pthread->start();
}
void SrsDvrAsyncCallThread::stop()
void SrsAsyncCallWorker::stop()
{
pthread->stop();
}
int SrsDvrAsyncCallThread::cycle()
int SrsAsyncCallWorker::cycle()
{
int ret = ERROR_SUCCESS;
std::vector<ISrsDvrAsyncCall*> copies = callbacks;
callbacks.clear();
std::vector<ISrsAsyncCallTask*> copies = tasks;
tasks.clear();
std::vector<ISrsDvrAsyncCall*>::iterator it;
std::vector<ISrsAsyncCallTask*>::iterator it;
for (it = copies.begin(); it != copies.end(); ++it) {
ISrsDvrAsyncCall* call = *it;
if ((ret = call->call()) != ERROR_SUCCESS) {
srs_warn("ignore async callback %s, ret=%d", call->to_string().c_str(), ret);
ISrsAsyncCallTask* task = *it;
if ((ret = task->call()) != ERROR_SUCCESS) {
srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret);
}
srs_freep(call);
srs_freep(task);
}
return ret;
... ...
... ... @@ -42,29 +42,31 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* a video and pass it to the dvr again.
* futhurmore, the aync call never block the main worker thread.
*/
class ISrsDvrAsyncCall
class ISrsAsyncCallTask
{
public:
ISrsDvrAsyncCall();
virtual ~ISrsDvrAsyncCall();
ISrsAsyncCallTask();
virtual ~ISrsAsyncCallTask();
public:
virtual int call() = 0;
virtual std::string to_string() = 0;
};
/**
* the async callback for dvr.
*/
class SrsDvrAsyncCallThread : public ISrsThreadHandler
* the async callback for dvr.
* when worker call with the task, the worker will do it in isolate thread.
* that is, the task is execute/call in async mode.
*/
class SrsAsyncCallWorker : public ISrsThreadHandler
{
private:
SrsThread* pthread;
std::vector<ISrsDvrAsyncCall*> callbacks;
std::vector<ISrsAsyncCallTask*> tasks;
public:
SrsDvrAsyncCallThread();
virtual ~SrsDvrAsyncCallThread();
SrsAsyncCallWorker();
virtual ~SrsAsyncCallWorker();
public:
virtual int call(ISrsDvrAsyncCall* c);
virtual int execute(ISrsAsyncCallTask* t);
public:
virtual int start();
virtual void stop();
... ...
... ... @@ -548,7 +548,7 @@ SrsDvrPlan::SrsDvrPlan()
dvr_enabled = false;
segment = new SrsFlvSegment(this);
async = new SrsDvrAsyncCallThread();
async = new SrsAsyncCallWorker();
}
SrsDvrPlan::~SrsDvrPlan()
... ... @@ -629,7 +629,7 @@ int SrsDvrPlan::on_reap_segment()
{
int ret = ERROR_SUCCESS;
if ((ret = async->call(new SrsDvrAsyncCallOnDvr(req, segment->get_path()))) != ERROR_SUCCESS) {
if ((ret = async->execute(new SrsDvrAsyncCallOnDvr(req, segment->get_path()))) != ERROR_SUCCESS) {
return ret;
}
... ...
... ... @@ -178,7 +178,7 @@ public:
/**
* the dvr async call.
*/
class SrsDvrAsyncCallOnDvr : public ISrsDvrAsyncCall
class SrsDvrAsyncCallOnDvr : public ISrsAsyncCallTask
{
private:
std::string path;
... ... @@ -206,7 +206,7 @@ public:
SrsRequest* req;
protected:
SrsFlvSegment* segment;
SrsDvrAsyncCallThread* async;
SrsAsyncCallWorker* async;
bool dvr_enabled;
public:
SrsDvrPlan();
... ...
... ... @@ -286,7 +286,7 @@ SrsHlsMuxer::SrsHlsMuxer()
acodec = SrsCodecAudioReserved1;
should_write_cache = false;
should_write_file = true;
async = new SrsDvrAsyncCallThread();
async = new SrsAsyncCallWorker();
context = new SrsTsContext();
}
... ... @@ -669,7 +669,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
segments.push_back(current);
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->call(new SrsDvrAsyncCallOnHls(req,
if ((ret = async->execute(new SrsDvrAsyncCallOnHls(req,
current->full_path, current->uri, m3u8, m3u8_url,
current->sequence_no, current->duration))) != ERROR_SUCCESS)
{
... ... @@ -677,7 +677,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
}
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->call(new SrsDvrAsyncCallOnHlsNotify(req, current->uri))) != ERROR_SUCCESS) {
if ((ret = async->execute(new SrsDvrAsyncCallOnHlsNotify(req, current->uri))) != ERROR_SUCCESS) {
return ret;
}
... ...
... ... @@ -159,7 +159,7 @@ public:
/**
* the hls async call: on_hls
*/
class SrsDvrAsyncCallOnHls : public ISrsDvrAsyncCall
class SrsDvrAsyncCallOnHls : public ISrsAsyncCallTask
{
private:
std::string path;
... ... @@ -180,7 +180,7 @@ public:
/**
* the hls async call: on_hls_notify
*/
class SrsDvrAsyncCallOnHlsNotify : public ISrsDvrAsyncCall
class SrsDvrAsyncCallOnHlsNotify : public ISrsAsyncCallTask
{
private:
std::string ts_url;
... ... @@ -215,7 +215,7 @@ private:
double hls_aof_ratio;
double hls_fragment;
double hls_window;
SrsDvrAsyncCallThread* async;
SrsAsyncCallWorker* async;
private:
// whether use floor algorithm for timestamp.
bool hls_ts_floor;
... ...