winlin

support listen multiple ports.

@@ -46,6 +46,7 @@ url: rtmp://127.0.0.1:1935/live/livestream @@ -46,6 +46,7 @@ url: rtmp://127.0.0.1:1935/live/livestream
46 * nginx v1.5.0: 139524 lines <br/> 46 * nginx v1.5.0: 139524 lines <br/>
47 47
48 ### History 48 ### History
  49 +* v0.3, 2013-11-02, support listen multiple ports.
49 * v0.3, 2013-11-02, support config file in nginx-conf style. 50 * v0.3, 2013-11-02, support config file in nginx-conf style.
50 * v0.3, 2013-10-29, support pithy print log message specified by stage. 51 * v0.3, 2013-10-29, support pithy print log message specified by stage.
51 * v0.3, 2013-10-28, support librtmp without extended-timestamp in 0xCX chunk packet. 52 * v0.3, 2013-10-28, support librtmp without extended-timestamp in 0xCX chunk packet.
1 -listen 1935; 1 +listen 1935 19350;
2 vhost __defaultVhost__ { 2 vhost __defaultVhost__ {
3 application live { 3 application live {
  4 + no_delay on;
  5 + allow all;
4 } 6 }
5 } 7 }
@@ -83,6 +83,8 @@ int SrsFileBuffer::open(const char* filename) @@ -83,6 +83,8 @@ int SrsFileBuffer::open(const char* filename)
83 return ERROR_SYSTEM_CONFIG_INVALID; 83 return ERROR_SYSTEM_CONFIG_INVALID;
84 } 84 }
85 85
  86 + line = 1;
  87 +
86 return ERROR_SUCCESS; 88 return ERROR_SUCCESS;
87 } 89 }
88 90
@@ -105,6 +107,19 @@ SrsConfDirective* SrsConfDirective::at(int index) @@ -105,6 +107,19 @@ SrsConfDirective* SrsConfDirective::at(int index)
105 return directives.at(index); 107 return directives.at(index);
106 } 108 }
107 109
  110 +SrsConfDirective* SrsConfDirective::get(std::string _name)
  111 +{
  112 + std::vector<SrsConfDirective*>::iterator it;
  113 + for (it = directives.begin(); it != directives.end(); ++it) {
  114 + SrsConfDirective* directive = *it;
  115 + if (directive->name == _name) {
  116 + return directive;
  117 + }
  118 + }
  119 +
  120 + return NULL;
  121 +}
  122 +
108 int SrsConfDirective::parse(const char* filename) 123 int SrsConfDirective::parse(const char* filename)
109 { 124 {
110 int ret = ERROR_SUCCESS; 125 int ret = ERROR_SUCCESS;
@@ -161,6 +176,7 @@ int SrsConfDirective::parse_conf(SrsFileBuffer* buffer, SrsDirectiveType type) @@ -161,6 +176,7 @@ int SrsConfDirective::parse_conf(SrsFileBuffer* buffer, SrsDirectiveType type)
161 // build directive tree. 176 // build directive tree.
162 SrsConfDirective* directive = new SrsConfDirective(); 177 SrsConfDirective* directive = new SrsConfDirective();
163 178
  179 + directive->conf_line = buffer->line;
164 directive->name = args[0]; 180 directive->name = args[0];
165 args.erase(args.begin()); 181 args.erase(args.begin());
166 directive->args.swap(args); 182 directive->args.swap(args);
@@ -361,15 +377,22 @@ int SrsConfDirective::refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s @@ -361,15 +377,22 @@ int SrsConfDirective::refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s
361 return ret; 377 return ret;
362 } 378 }
363 379
  380 +Config* config = new Config();
  381 +
364 Config::Config() 382 Config::Config()
365 { 383 {
366 show_help = false; 384 show_help = false;
367 show_version = false; 385 show_version = false;
368 config_file = NULL; 386 config_file = NULL;
  387 +
  388 + root = new SrsConfDirective();
  389 + root->conf_line = 0;
  390 + root->name = "root";
369 } 391 }
370 392
371 Config::~Config() 393 Config::~Config()
372 { 394 {
  395 + srs_freep(root);
373 } 396 }
374 397
375 // see: ngx_get_options 398 // see: ngx_get_options
@@ -400,16 +423,25 @@ int Config::parse_options(int argc, char** argv) @@ -400,16 +423,25 @@ int Config::parse_options(int argc, char** argv)
400 return ERROR_SYSTEM_CONFIG_INVALID; 423 return ERROR_SYSTEM_CONFIG_INVALID;
401 } 424 }
402 425
403 - SrsConfDirective root;  
404 - root.name = "root";  
405 -  
406 - if ((ret = root.parse(config_file)) != ERROR_SUCCESS) { 426 + if ((ret = root->parse(config_file)) != ERROR_SUCCESS) {
407 return ret; 427 return ret;
408 } 428 }
409 429
  430 + SrsConfDirective* conf = NULL;
  431 + if ((conf = get_listen()) == NULL || conf->args.size() == 0) {
  432 + fprintf(stderr, "line %d: conf error, "
  433 + "directive \"listen\" is empty\n", conf? conf->conf_line:0);
  434 + return ERROR_SYSTEM_CONFIG_INVALID;
  435 + }
  436 +
410 return ret; 437 return ret;
411 } 438 }
412 439
  440 +SrsConfDirective* Config::get_listen()
  441 +{
  442 + return root->get("listen");
  443 +}
  444 +
413 int Config::parse_argv(int& i, char** argv) 445 int Config::parse_argv(int& i, char** argv)
414 { 446 {
415 int ret = ERROR_SUCCESS; 447 int ret = ERROR_SUCCESS;
@@ -32,25 +32,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,25 +32,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <vector> 32 #include <vector>
33 #include <string> 33 #include <string>
34 34
35 -/**  
36 -* the config parser.  
37 -*/  
38 -class Config  
39 -{  
40 -private:  
41 - bool show_help;  
42 - bool show_version;  
43 - char* config_file;  
44 -public:  
45 - Config();  
46 - virtual ~Config();  
47 -public:  
48 - virtual int parse_options(int argc, char** argv);  
49 -private:  
50 - virtual int parse_argv(int& i, char** argv);  
51 - virtual void print_help(char** argv);  
52 -};  
53 -  
54 class SrsFileBuffer 35 class SrsFileBuffer
55 { 36 {
56 public: 37 public:
@@ -73,6 +54,7 @@ public: @@ -73,6 +54,7 @@ public:
73 class SrsConfDirective 54 class SrsConfDirective
74 { 55 {
75 public: 56 public:
  57 + int conf_line;
76 std::string name; 58 std::string name;
77 std::vector<std::string> args; 59 std::vector<std::string> args;
78 std::vector<SrsConfDirective*> directives; 60 std::vector<SrsConfDirective*> directives;
@@ -80,6 +62,7 @@ public: @@ -80,6 +62,7 @@ public:
80 SrsConfDirective(); 62 SrsConfDirective();
81 virtual ~SrsConfDirective(); 63 virtual ~SrsConfDirective();
82 SrsConfDirective* at(int index); 64 SrsConfDirective* at(int index);
  65 + SrsConfDirective* get(std::string _name);
83 public: 66 public:
84 virtual int parse(const char* filename); 67 virtual int parse(const char* filename);
85 public: 68 public:
@@ -89,4 +72,28 @@ public: @@ -89,4 +72,28 @@ public:
89 virtual int refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s_quoted, int startline, char*& pstart); 72 virtual int refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s_quoted, int startline, char*& pstart);
90 }; 73 };
91 74
  75 +/**
  76 +* the config parser.
  77 +*/
  78 +class Config
  79 +{
  80 +private:
  81 + bool show_help;
  82 + bool show_version;
  83 + char* config_file;
  84 + SrsConfDirective* root;
  85 +public:
  86 + Config();
  87 + virtual ~Config();
  88 +public:
  89 + virtual int parse_options(int argc, char** argv);
  90 + virtual SrsConfDirective* get_listen();
  91 +private:
  92 + virtual int parse_argv(int& i, char** argv);
  93 + virtual void print_help(char** argv);
  94 +};
  95 +
  96 +// global config
  97 +extern Config* config;
  98 +
92 #endif 99 #endif
@@ -34,52 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,52 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #include <srs_core_log.hpp> 34 #include <srs_core_log.hpp>
35 #include <srs_core_error.hpp> 35 #include <srs_core_error.hpp>
36 #include <srs_core_client.hpp> 36 #include <srs_core_client.hpp>
  37 +#include <srs_core_config.hpp>
37 38
38 #define SERVER_LISTEN_BACKLOG 10 39 #define SERVER_LISTEN_BACKLOG 10
39 #define SRS_TIME_RESOLUTION_MS 1000 40 #define SRS_TIME_RESOLUTION_MS 1000
40 41
41 -SrsServer::SrsServer() 42 +SrsListener::SrsListener(SrsServer* _server, SrsListenerType _type)
42 { 43 {
  44 + fd = -1;
  45 + stfd = NULL;
  46 +
  47 + port = 0;
  48 + server = _server;
  49 + type = _type;
43 } 50 }
44 51
45 -SrsServer::~SrsServer() 52 +SrsListener::~SrsListener()
46 { 53 {
47 - for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {  
48 - SrsConnection* conn = *it;  
49 - srs_freep(conn); 54 + if (stfd) {
  55 + st_netfd_close(stfd);
  56 + stfd = NULL;
50 } 57 }
51 - conns.clear();  
52 } 58 }
53 59
54 -int SrsServer::initialize() 60 +int SrsListener::listen(int _port)
55 { 61 {
56 int ret = ERROR_SUCCESS; 62 int ret = ERROR_SUCCESS;
57 -  
58 - // use linux epoll.  
59 - if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {  
60 - ret = ERROR_ST_SET_EPOLL;  
61 - srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret);  
62 - return ret;  
63 - }  
64 - srs_verbose("st_set_eventsys use linux epoll success");  
65 -  
66 - if(st_init() != 0){  
67 - ret = ERROR_ST_INITIALIZE;  
68 - srs_error("st_init failed. ret=%d", ret);  
69 - return ret;  
70 - }  
71 - srs_verbose("st_init success");  
72 63
73 - // set current log id.  
74 - log_context->generate_id();  
75 - srs_info("log set id success");  
76 -  
77 - return ret;  
78 -}  
79 -  
80 -int SrsServer::listen(int port)  
81 -{  
82 - int ret = ERROR_SUCCESS; 64 + port = _port;
83 65
84 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { 66 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
85 ret = ERROR_SOCKET_CREATE; 67 ret = ERROR_SOCKET_CREATE;
@@ -133,6 +115,117 @@ int SrsServer::listen(int port) @@ -133,6 +115,117 @@ int SrsServer::listen(int port)
133 return ret; 115 return ret;
134 } 116 }
135 117
  118 +void SrsListener::listen_cycle()
  119 +{
  120 + int ret = ERROR_SUCCESS;
  121 +
  122 + log_context->generate_id();
  123 + srs_trace("listen cycle start, port=%d, type=%d, fd=%d", port, type, fd);
  124 +
  125 + while (true) {
  126 + st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
  127 +
  128 + if(client_stfd == NULL){
  129 + // ignore error.
  130 + srs_warn("ignore accept thread stoppped for accept client error");
  131 + continue;
  132 + }
  133 + srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
  134 +
  135 + if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) {
  136 + srs_warn("accept client error. ret=%d", ret);
  137 + continue;
  138 + }
  139 +
  140 + srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
  141 + }
  142 +}
  143 +
  144 +void* SrsListener::listen_thread(void* arg)
  145 +{
  146 + SrsListener* obj = (SrsListener*)arg;
  147 + srs_assert(obj != NULL);
  148 +
  149 + obj->listen_cycle();
  150 +
  151 + return NULL;
  152 +}
  153 +
  154 +SrsServer::SrsServer()
  155 +{
  156 +}
  157 +
  158 +SrsServer::~SrsServer()
  159 +{
  160 + if (true) {
  161 + std::vector<SrsConnection*>::iterator it;
  162 + for (it = conns.begin(); it != conns.end(); ++it) {
  163 + SrsConnection* conn = *it;
  164 + srs_freep(conn);
  165 + }
  166 + conns.clear();
  167 + }
  168 +
  169 + if (true) {
  170 + std::vector<SrsListener*>::iterator it;
  171 + for (it = listeners.begin(); it != listeners.end(); ++it) {
  172 + SrsListener* listener = *it;
  173 + srs_freep(listener);
  174 + }
  175 + listeners.clear();
  176 + }
  177 +}
  178 +
  179 +int SrsServer::initialize()
  180 +{
  181 + int ret = ERROR_SUCCESS;
  182 +
  183 + // use linux epoll.
  184 + if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
  185 + ret = ERROR_ST_SET_EPOLL;
  186 + srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret);
  187 + return ret;
  188 + }
  189 + srs_verbose("st_set_eventsys use linux epoll success");
  190 +
  191 + if(st_init() != 0){
  192 + ret = ERROR_ST_INITIALIZE;
  193 + srs_error("st_init failed. ret=%d", ret);
  194 + return ret;
  195 + }
  196 + srs_verbose("st_init success");
  197 +
  198 + // set current log id.
  199 + log_context->generate_id();
  200 + srs_info("log set id success");
  201 +
  202 + return ret;
  203 +}
  204 +
  205 +int SrsServer::listen()
  206 +{
  207 + int ret = ERROR_SUCCESS;
  208 +
  209 + SrsConfDirective* conf = NULL;
  210 +
  211 + // stream service port.
  212 + conf = config->get_listen();
  213 + srs_assert(conf);
  214 +
  215 + for (int i = 0; i < (int)conf->args.size(); i++) {
  216 + SrsListener* listener = new SrsListener(this, SrsListenerStream);
  217 + listeners.push_back(listener);
  218 +
  219 + int port = ::atoi(conf->args.at(i).c_str());
  220 + if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
  221 + srs_error("listen at port %d failed. ret=%d", port, ret);
  222 + return ret;
  223 + }
  224 + }
  225 +
  226 + return ret;
  227 +}
  228 +
136 int SrsServer::cycle() 229 int SrsServer::cycle()
137 { 230 {
138 int ret = ERROR_SUCCESS; 231 int ret = ERROR_SUCCESS;
@@ -161,15 +254,21 @@ void SrsServer::remove(SrsConnection* conn) @@ -161,15 +254,21 @@ void SrsServer::remove(SrsConnection* conn)
161 srs_freep(conn); 254 srs_freep(conn);
162 } 255 }
163 256
164 -int SrsServer::accept_client(st_netfd_t client_stfd) 257 +int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
165 { 258 {
166 int ret = ERROR_SUCCESS; 259 int ret = ERROR_SUCCESS;
167 260
168 - SrsConnection* conn = new SrsClient(this, client_stfd); 261 + SrsConnection* conn = NULL;
  262 + if (type == SrsListenerStream) {
  263 + conn = new SrsClient(this, client_stfd);
  264 + } else {
  265 + // handler others
  266 + }
  267 + srs_assert(conn);
169 268
170 // directly enqueue, the cycle thread will remove the client. 269 // directly enqueue, the cycle thread will remove the client.
171 conns.push_back(conn); 270 conns.push_back(conn);
172 - srs_verbose("add conn to vector. conns=%d", (int)conns.size()); 271 + srs_verbose("add conn from port %d to vector. conns=%d", port, (int)conns.size());
173 272
174 // cycle will start process thread and when finished remove the client. 273 // cycle will start process thread and when finished remove the client.
175 if ((ret = conn->start()) != ERROR_SUCCESS) { 274 if ((ret = conn->start()) != ERROR_SUCCESS) {
@@ -180,39 +279,3 @@ int SrsServer::accept_client(st_netfd_t client_stfd) @@ -180,39 +279,3 @@ int SrsServer::accept_client(st_netfd_t client_stfd)
180 return ret; 279 return ret;
181 } 280 }
182 281
183 -void SrsServer::listen_cycle()  
184 -{  
185 - int ret = ERROR_SUCCESS;  
186 -  
187 - log_context->generate_id();  
188 - srs_trace("listen cycle start.");  
189 -  
190 - while (true) {  
191 - st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);  
192 -  
193 - if(client_stfd == NULL){  
194 - // ignore error.  
195 - srs_warn("ignore accept thread stoppped for accept client error");  
196 - continue;  
197 - }  
198 - srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));  
199 -  
200 - if ((ret = accept_client(client_stfd)) != ERROR_SUCCESS) {  
201 - srs_warn("accept client error. ret=%d", ret);  
202 - continue;  
203 - }  
204 -  
205 - srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);  
206 - }  
207 -}  
208 -  
209 -void* SrsServer::listen_thread(void* arg)  
210 -{  
211 - SrsServer* server = (SrsServer*)arg;  
212 - srs_assert(server != NULL);  
213 -  
214 - server->listen_cycle();  
215 -  
216 - return NULL;  
217 -}  
218 -  
@@ -34,25 +34,50 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,25 +34,50 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 34
35 #include <st.h> 35 #include <st.h>
36 36
  37 +class SrsServer;
37 class SrsConnection; 38 class SrsConnection;
38 -class SrsServer 39 +
  40 +enum SrsListenerType
  41 +{
  42 + SrsListenerStream = 0,
  43 + SrsListenerApi
  44 +};
  45 +
  46 +class SrsListener
39 { 47 {
  48 +public:
  49 + SrsListenerType type;
40 private: 50 private:
41 int fd; 51 int fd;
42 st_netfd_t stfd; 52 st_netfd_t stfd;
  53 + int port;
  54 + SrsServer* server;
  55 +public:
  56 + SrsListener(SrsServer* _server, SrsListenerType _type);
  57 + virtual ~SrsListener();
  58 +public:
  59 + virtual int listen(int port);
  60 +private:
  61 + virtual void listen_cycle();
  62 + static void* listen_thread(void* arg);
  63 +};
  64 +
  65 +class SrsServer
  66 +{
  67 + friend class SrsListener;
  68 +private:
43 std::vector<SrsConnection*> conns; 69 std::vector<SrsConnection*> conns;
  70 + std::vector<SrsListener*> listeners;
44 public: 71 public:
45 SrsServer(); 72 SrsServer();
46 virtual ~SrsServer(); 73 virtual ~SrsServer();
47 public: 74 public:
48 virtual int initialize(); 75 virtual int initialize();
49 - virtual int listen(int port); 76 + virtual int listen();
50 virtual int cycle(); 77 virtual int cycle();
51 virtual void remove(SrsConnection* conn); 78 virtual void remove(SrsConnection* conn);
52 private: 79 private:
53 - virtual int accept_client(st_netfd_t client_stfd);  
54 - virtual void listen_cycle();  
55 - static void* listen_thread(void* arg); 80 + virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
56 }; 81 };
57 82
58 #endif 83 #endif
@@ -31,9 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,9 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 int main(int argc, char** argv){ 31 int main(int argc, char** argv){
32 int ret = ERROR_SUCCESS; 32 int ret = ERROR_SUCCESS;
33 33
34 - Config config;  
35 -  
36 - if ((ret = config.parse_options(argc, argv)) != ERROR_SUCCESS) { 34 + if ((ret = config->parse_options(argc, argv)) != ERROR_SUCCESS) {
37 return ret; 35 return ret;
38 } 36 }
39 37
@@ -43,7 +41,7 @@ int main(int argc, char** argv){ @@ -43,7 +41,7 @@ int main(int argc, char** argv){
43 return ret; 41 return ret;
44 } 42 }
45 43
46 - if ((ret = server.listen(1935)) != ERROR_SUCCESS) { 44 + if ((ret = server.listen()) != ERROR_SUCCESS) {
47 return ret; 45 return ret;
48 } 46 }
49 47