winlin

async call worker fast execute tasks.

@@ -257,7 +257,9 @@ echo "\"" >> $SRS_AUTO_HEADERS_H @@ -257,7 +257,9 @@ echo "\"" >> $SRS_AUTO_HEADERS_H
257 # new empty line to auto headers file. 257 # new empty line to auto headers file.
258 echo "" >> $SRS_AUTO_HEADERS_H 258 echo "" >> $SRS_AUTO_HEADERS_H
259 259
  260 +#####################################################################################
260 # auto header EOF. 261 # auto header EOF.
  262 +#####################################################################################
261 echo "#endif" >> $SRS_AUTO_HEADERS_H 263 echo "#endif" >> $SRS_AUTO_HEADERS_H
262 echo "" >> $SRS_AUTO_HEADERS_H 264 echo "" >> $SRS_AUTO_HEADERS_H
263 265
@@ -42,6 +42,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() @@ -42,6 +42,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
42 SrsAsyncCallWorker::SrsAsyncCallWorker() 42 SrsAsyncCallWorker::SrsAsyncCallWorker()
43 { 43 {
44 pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US); 44 pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
  45 + wait = st_cond_new();
45 } 46 }
46 47
47 SrsAsyncCallWorker::~SrsAsyncCallWorker() 48 SrsAsyncCallWorker::~SrsAsyncCallWorker()
@@ -54,6 +55,8 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker() @@ -54,6 +55,8 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker()
54 srs_freep(task); 55 srs_freep(task);
55 } 56 }
56 tasks.clear(); 57 tasks.clear();
  58 +
  59 + st_cond_destroy(wait);
57 } 60 }
58 61
59 int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) 62 int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
@@ -61,10 +64,16 @@ int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) @@ -61,10 +64,16 @@ int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
61 int ret = ERROR_SUCCESS; 64 int ret = ERROR_SUCCESS;
62 65
63 tasks.push_back(t); 66 tasks.push_back(t);
  67 + st_cond_signal(wait);
64 68
65 return ret; 69 return ret;
66 } 70 }
67 71
  72 +int SrsAsyncCallWorker::count()
  73 +{
  74 + return (int)tasks.size();
  75 +}
  76 +
68 int SrsAsyncCallWorker::start() 77 int SrsAsyncCallWorker::start()
69 { 78 {
70 return pthread->start(); 79 return pthread->start();
@@ -72,23 +81,30 @@ int SrsAsyncCallWorker::start() @@ -72,23 +81,30 @@ int SrsAsyncCallWorker::start()
72 81
73 void SrsAsyncCallWorker::stop() 82 void SrsAsyncCallWorker::stop()
74 { 83 {
  84 + st_cond_signal(wait);
75 pthread->stop(); 85 pthread->stop();
76 } 86 }
77 87
78 int SrsAsyncCallWorker::cycle() 88 int SrsAsyncCallWorker::cycle()
79 { 89 {
80 int ret = ERROR_SUCCESS; 90 int ret = ERROR_SUCCESS;
81 -  
82 - std::vector<ISrsAsyncCallTask*> copies = tasks;  
83 - tasks.clear();  
84 91
85 - std::vector<ISrsAsyncCallTask*>::iterator it;  
86 - for (it = copies.begin(); it != copies.end(); ++it) {  
87 - ISrsAsyncCallTask* task = *it;  
88 - if ((ret = task->call()) != ERROR_SUCCESS) {  
89 - srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret); 92 + while (pthread->can_loop()) {
  93 + if (tasks.empty()) {
  94 + st_cond_wait(wait);
  95 + }
  96 +
  97 + std::vector<ISrsAsyncCallTask*> copies = tasks;
  98 + tasks.clear();
  99 +
  100 + std::vector<ISrsAsyncCallTask*>::iterator it;
  101 + for (it = copies.begin(); it != copies.end(); ++it) {
  102 + ISrsAsyncCallTask* task = *it;
  103 + if ((ret = task->call()) != ERROR_SUCCESS) {
  104 + srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret);
  105 + }
  106 + srs_freep(task);
90 } 107 }
91 - srs_freep(task);  
92 } 108 }
93 109
94 return ret; 110 return ret;
@@ -70,12 +70,15 @@ class SrsAsyncCallWorker : public ISrsReusableThreadHandler @@ -70,12 +70,15 @@ class SrsAsyncCallWorker : public ISrsReusableThreadHandler
70 { 70 {
71 private: 71 private:
72 SrsReusableThread* pthread; 72 SrsReusableThread* pthread;
  73 +protected:
73 std::vector<ISrsAsyncCallTask*> tasks; 74 std::vector<ISrsAsyncCallTask*> tasks;
  75 + st_cond_t wait;
74 public: 76 public:
75 SrsAsyncCallWorker(); 77 SrsAsyncCallWorker();
76 virtual ~SrsAsyncCallWorker(); 78 virtual ~SrsAsyncCallWorker();
77 public: 79 public:
78 virtual int execute(ISrsAsyncCallTask* t); 80 virtual int execute(ISrsAsyncCallTask* t);
  81 + virtual int count();
79 public: 82 public:
80 virtual int start(); 83 virtual int start();
81 virtual void stop(); 84 virtual void stop();