winlin

fix bug #34: convert signal to io. 0.9.85

@@ -70,6 +70,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -70,6 +70,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
70 // SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES 70 // SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES
71 #define SRS_SYS_MEMINFO_RESOLUTION_TIMES 60 71 #define SRS_SYS_MEMINFO_RESOLUTION_TIMES 60
72 72
  73 +#define SRS_SIGNAL_THREAD_INTERVAL (int64_t)(100*1000LL)
  74 +
73 SrsListener::SrsListener(SrsServer* server, SrsListenerType type) 75 SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
74 { 76 {
75 fd = -1; 77 fd = -1;
@@ -184,12 +186,117 @@ int SrsListener::cycle() @@ -184,12 +186,117 @@ int SrsListener::cycle()
184 return ret; 186 return ret;
185 } 187 }
186 188
  189 +SrsSignalManager* SrsSignalManager::instance = NULL;
  190 +
  191 +SrsSignalManager::SrsSignalManager(SrsServer* server)
  192 +{
  193 + SrsSignalManager::instance = this;
  194 +
  195 + _server = server;
  196 + sig_pipe[0] = sig_pipe[1] = -1;
  197 + pthread = new SrsThread(this, SRS_SIGNAL_THREAD_INTERVAL);
  198 + signal_read_stfd = NULL;
  199 +}
  200 +
  201 +SrsSignalManager::~SrsSignalManager()
  202 +{
  203 + pthread->stop();
  204 + srs_freep(pthread);
  205 +
  206 + srs_close_stfd(signal_read_stfd);
  207 +
  208 + if (sig_pipe[0] > 0) {
  209 + ::close(sig_pipe[0]);
  210 + }
  211 + if (sig_pipe[1] > 0) {
  212 + ::close(sig_pipe[1]);
  213 + }
  214 +}
  215 +
  216 +int SrsSignalManager::initialize()
  217 +{
  218 + int ret = ERROR_SUCCESS;
  219 + return ret;
  220 +}
  221 +
  222 +int SrsSignalManager::start()
  223 +{
  224 + int ret = ERROR_SUCCESS;
  225 +
  226 + /**
  227 + * Note that if multiple processes are used (see below),
  228 + * the signal pipe should be initialized after the fork(2) call
  229 + * so that each process has its own private pipe.
  230 + */
  231 + struct sigaction sa;
  232 +
  233 + /* Create signal pipe */
  234 + if (pipe(sig_pipe) < 0) {
  235 + ret = ERROR_SYSTEM_CREATE_PIPE;
  236 + srs_error("create signal manager pipe failed. ret=%d", ret);
  237 + return ret;
  238 + }
  239 +
  240 + /* Install sig_catcher() as a signal handler */
  241 + sa.sa_handler = SrsSignalManager::sig_catcher;
  242 + sigemptyset(&sa.sa_mask);
  243 + sa.sa_flags = 0;
  244 + sigaction(SIGNAL_RELOAD, &sa, NULL);
  245 +
  246 + sa.sa_handler = SrsSignalManager::sig_catcher;
  247 + sigemptyset(&sa.sa_mask);
  248 + sa.sa_flags = 0;
  249 + sigaction(SIGTERM, &sa, NULL);
  250 +
  251 + sa.sa_handler = SrsSignalManager::sig_catcher;
  252 + sigemptyset(&sa.sa_mask);
  253 + sa.sa_flags = 0;
  254 + sigaction(SIGINT, &sa, NULL);
  255 +
  256 + return pthread->start();
  257 +}
  258 +
  259 +int SrsSignalManager::cycle()
  260 +{
  261 + int ret = ERROR_SUCCESS;
  262 +
  263 + if (signal_read_stfd == NULL) {
  264 + signal_read_stfd = st_netfd_open(sig_pipe[0]);
  265 + }
  266 +
  267 + int signo;
  268 +
  269 + /* Read the next signal from the pipe */
  270 + st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);
  271 +
  272 + /* Process signal synchronously */
  273 + _server->on_signal(signo);
  274 +
  275 + return ret;
  276 +}
  277 +
  278 +void SrsSignalManager::sig_catcher(int signo)
  279 +{
  280 + int err;
  281 +
  282 + /* Save errno to restore it after the write() */
  283 + err = errno;
  284 +
  285 + /* write() is reentrant/async-safe */
  286 + int fd = SrsSignalManager::instance->sig_pipe[1];
  287 + write(fd, &signo, sizeof(int));
  288 +
  289 + errno = err;
  290 +}
  291 +
187 SrsServer::SrsServer() 292 SrsServer::SrsServer()
188 { 293 {
189 signal_reload = false; 294 signal_reload = false;
190 signal_gmc_stop = false; 295 signal_gmc_stop = false;
191 pid_fd = -1; 296 pid_fd = -1;
192 297
  298 + signal_manager = new SrsSignalManager(this);
  299 +
193 // donot new object in constructor, 300 // donot new object in constructor,
194 // for some global instance is not ready now, 301 // for some global instance is not ready now,
195 // new these objects in initialize instead. 302 // new these objects in initialize instead.
@@ -226,6 +333,8 @@ SrsServer::~SrsServer() @@ -226,6 +333,8 @@ SrsServer::~SrsServer()
226 pid_fd = -1; 333 pid_fd = -1;
227 } 334 }
228 335
  336 + srs_freep(signal_manager);
  337 +
229 #ifdef SRS_AUTO_HTTP_API 338 #ifdef SRS_AUTO_HTTP_API
230 srs_freep(http_api_handler); 339 srs_freep(http_api_handler);
231 #endif 340 #endif
@@ -276,6 +385,11 @@ int SrsServer::initialize() @@ -276,6 +385,11 @@ int SrsServer::initialize()
276 return ret; 385 return ret;
277 } 386 }
278 387
  388 +int SrsServer::initialize_signal()
  389 +{
  390 + return signal_manager->initialize();
  391 +}
  392 +
279 int SrsServer::acquire_pid_file() 393 int SrsServer::acquire_pid_file()
280 { 394 {
281 int ret = ERROR_SUCCESS; 395 int ret = ERROR_SUCCESS;
@@ -397,6 +511,12 @@ int SrsServer::listen() @@ -397,6 +511,12 @@ int SrsServer::listen()
397 return ret; 511 return ret;
398 } 512 }
399 513
  514 +int SrsServer::register_signal()
  515 +{
  516 + // start signal process thread.
  517 + return signal_manager->start();
  518 +}
  519 +
400 int SrsServer::ingest() 520 int SrsServer::ingest()
401 { 521 {
402 int ret = ERROR_SUCCESS; 522 int ret = ERROR_SUCCESS;
@@ -64,7 +64,7 @@ private: @@ -64,7 +64,7 @@ private:
64 SrsServer* _server; 64 SrsServer* _server;
65 SrsThread* pthread; 65 SrsThread* pthread;
66 public: 66 public:
67 - SrsListener(SrsServer* _server, SrsListenerType type); 67 + SrsListener(SrsServer* server, SrsListenerType type);
68 virtual ~SrsListener(); 68 virtual ~SrsListener();
69 public: 69 public:
70 virtual SrsListenerType type(); 70 virtual SrsListenerType type();
@@ -75,9 +75,39 @@ public: @@ -75,9 +75,39 @@ public:
75 virtual int cycle(); 75 virtual int cycle();
76 }; 76 };
77 77
  78 +/**
  79 +* convert signal to io,
  80 +* @see: st-1.9/docs/notes.html
  81 +*/
  82 +class SrsSignalManager : public ISrsThreadHandler
  83 +{
  84 +private:
  85 + /* Per-process pipe which is used as a signal queue. */
  86 + /* Up to PIPE_BUF/sizeof(int) signals can be queued up. */
  87 + int sig_pipe[2];
  88 + st_netfd_t signal_read_stfd;
  89 +private:
  90 + SrsServer* _server;
  91 + SrsThread* pthread;
  92 +public:
  93 + SrsSignalManager(SrsServer* server);
  94 + virtual ~SrsSignalManager();
  95 +public:
  96 + virtual int initialize();
  97 + virtual int start();
  98 +// interface ISrsThreadHandler.
  99 +public:
  100 + virtual int cycle();
  101 +private:
  102 + // global singleton instance
  103 + static SrsSignalManager* instance;
  104 + /* Signal catching function. */
  105 + /* Converts signal event to I/O event. */
  106 + static void sig_catcher(int signo);
  107 +};
  108 +
78 class SrsServer : public ISrsReloadHandler 109 class SrsServer : public ISrsReloadHandler
79 { 110 {
80 - friend class SrsListener;  
81 private: 111 private:
82 #ifdef SRS_AUTO_HTTP_API 112 #ifdef SRS_AUTO_HTTP_API
83 SrsHttpHandler* http_api_handler; 113 SrsHttpHandler* http_api_handler;
@@ -92,6 +122,7 @@ private: @@ -92,6 +122,7 @@ private:
92 int pid_fd; 122 int pid_fd;
93 std::vector<SrsConnection*> conns; 123 std::vector<SrsConnection*> conns;
94 std::vector<SrsListener*> listeners; 124 std::vector<SrsListener*> listeners;
  125 + SrsSignalManager* signal_manager;
95 bool signal_reload; 126 bool signal_reload;
96 bool signal_gmc_stop; 127 bool signal_gmc_stop;
97 public: 128 public:
@@ -99,9 +130,11 @@ public: @@ -99,9 +130,11 @@ public:
99 virtual ~SrsServer(); 130 virtual ~SrsServer();
100 public: 131 public:
101 virtual int initialize(); 132 virtual int initialize();
  133 + virtual int initialize_signal();
102 virtual int acquire_pid_file(); 134 virtual int acquire_pid_file();
103 virtual int initialize_st(); 135 virtual int initialize_st();
104 virtual int listen(); 136 virtual int listen();
  137 + virtual int register_signal();
105 virtual int ingest(); 138 virtual int ingest();
106 virtual int cycle(); 139 virtual int cycle();
107 virtual void remove(SrsConnection* conn); 140 virtual void remove(SrsConnection* conn);
@@ -111,6 +144,8 @@ private: @@ -111,6 +144,8 @@ private:
111 virtual int listen_http_api(); 144 virtual int listen_http_api();
112 virtual int listen_http_stream(); 145 virtual int listen_http_stream();
113 virtual void close_listeners(SrsListenerType type); 146 virtual void close_listeners(SrsListenerType type);
  147 +// internal only
  148 +public:
114 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); 149 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
115 // interface ISrsThreadHandler. 150 // interface ISrsThreadHandler.
116 public: 151 public:
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "84" 34 +#define VERSION_REVISION "85"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "srs" 37 #define RTMP_SIG_SRS_KEY "srs"
@@ -114,6 +114,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -114,6 +114,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
114 #define ERROR_SYSTEM_FILE_WRITE 427 114 #define ERROR_SYSTEM_FILE_WRITE 427
115 #define ERROR_SYSTEM_FILE_EOF 428 115 #define ERROR_SYSTEM_FILE_EOF 428
116 #define ERROR_SYSTEM_FILE_RENAME 429 116 #define ERROR_SYSTEM_FILE_RENAME 429
  117 +#define ERROR_SYSTEM_CREATE_PIPE 430
117 118
118 // see librtmp. 119 // see librtmp.
119 // failed when open ssl create the dh 120 // failed when open ssl create the dh
@@ -24,8 +24,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,8 +24,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_core.hpp> 24 #include <srs_core.hpp>
25 25
26 #include <stdlib.h> 26 #include <stdlib.h>
27 -#include <signal.h>  
28 -  
29 #include <sys/types.h> 27 #include <sys/types.h>
30 #include <sys/wait.h> 28 #include <sys/wait.h>
31 29
@@ -56,13 +54,6 @@ ISrsThreadContext* _srs_context = new SrsThreadContext(); @@ -56,13 +54,6 @@ ISrsThreadContext* _srs_context = new SrsThreadContext();
56 SrsConfig* _srs_config = new SrsConfig(); 54 SrsConfig* _srs_config = new SrsConfig();
57 SrsServer* _srs_server = new SrsServer(); 55 SrsServer* _srs_server = new SrsServer();
58 56
59 -// signal handler  
60 -void handler(int signo)  
61 -{  
62 - srs_trace("get a signal, signo=%d", signo);  
63 - _srs_server->on_signal(signo);  
64 -}  
65 -  
66 // main entrance. 57 // main entrance.
67 int main(int argc, char** argv) 58 int main(int argc, char** argv)
68 { 59 {
@@ -166,9 +157,9 @@ int run_master() @@ -166,9 +157,9 @@ int run_master()
166 { 157 {
167 int ret = ERROR_SUCCESS; 158 int ret = ERROR_SUCCESS;
168 159
169 - signal(SIGNAL_RELOAD, handler);  
170 - signal(SIGTERM, handler);  
171 - signal(SIGINT, handler); 160 + if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
  161 + return ret;
  162 + }
172 163
173 if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) { 164 if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
174 return ret; 165 return ret;
@@ -182,6 +173,10 @@ int run_master() @@ -182,6 +173,10 @@ int run_master()
182 return ret; 173 return ret;
183 } 174 }
184 175
  176 + if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
  177 + return ret;
  178 + }
  179 +
185 if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) { 180 if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
186 return ret; 181 return ret;
187 } 182 }