winlin

refine code for reload http api/stream

... ... @@ -48,14 +48,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SERVER_LISTEN_BACKLOG 512
#define SRS_TIME_RESOLUTION_MS 500
SrsListener::SrsListener(SrsServer* _server, SrsListenerType _type)
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
{
fd = -1;
stfd = NULL;
port = 0;
server = _server;
type = _type;
_port = 0;
_server = server;
_type = type;
pthread = new SrsThread(this, 0);
}
... ... @@ -72,11 +72,16 @@ SrsListener::~SrsListener()
close(fd);
}
int SrsListener::listen(int _port)
SrsListenerType SrsListener::type()
{
return _type;
}
int SrsListener::listen(int port)
{
int ret = ERROR_SUCCESS;
port = _port;
_port = port;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
... ... @@ -95,7 +100,7 @@ int SrsListener::listen(int _port)
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_port = htons(_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
... ... @@ -124,14 +129,14 @@ int SrsListener::listen(int _port)
}
srs_verbose("create st listen thread success.");
srs_trace("server started, listen at port=%d, type=%d, fd=%d", port, type, fd);
srs_trace("server started, listen at port=%d, type=%d, fd=%d", _port, _type, fd);
return ret;
}
void SrsListener::on_thread_start()
{
srs_trace("listen cycle start, port=%d, type=%d, fd=%d", port, type, fd);
srs_trace("listen cycle start, port=%d, type=%d, fd=%d", _port, _type, fd);
}
int SrsListener::cycle()
... ... @@ -147,7 +152,7 @@ int SrsListener::cycle()
}
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) {
if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
... ... @@ -188,7 +193,9 @@ SrsServer::~SrsServer()
conns.clear();
}
close_listeners();
close_listeners(SrsListenerRtmpStream);
close_listeners(SrsListenerHttpApi);
close_listeners(SrsListenerHttpStream);
if (pid_fd > 0) {
::close(pid_fd);
... ... @@ -351,50 +358,17 @@ int SrsServer::listen()
{
int ret = ERROR_SUCCESS;
SrsConfDirective* conf = NULL;
// stream service port.
conf = _srs_config->get_listen();
srs_assert(conf);
close_listeners();
for (int i = 0; i < (int)conf->args.size(); i++) {
SrsListener* listener = new SrsListener(this, SrsListenerRtmpStream);
listeners.push_back(listener);
int port = ::atoi(conf->args.at(i).c_str());
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("RTMP stream listen at port %d failed. ret=%d", port, ret);
return ret;
}
if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
return ret;
}
#ifdef SRS_HTTP_API
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
int port = _srs_config->get_http_api_listen();
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("HTTP api listen at port %d failed. ret=%d", port, ret);
return ret;
}
if ((ret = listen_http_api()) != ERROR_SUCCESS) {
return ret;
}
#endif
#ifdef SRS_HTTP_SERVER
if (_srs_config->get_http_stream_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpStream);
listeners.push_back(listener);
int port = _srs_config->get_http_stream_listen();
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("HTTP stream listen at port %d failed. ret=%d", port, ret);
return ret;
}
if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
... ... @@ -492,14 +466,86 @@ void SrsServer::on_signal(int signo)
}
}
void SrsServer::close_listeners()
int SrsServer::listen_rtmp()
{
int ret = ERROR_SUCCESS;
// stream service port.
SrsConfDirective* conf = _srs_config->get_listen();
srs_assert(conf);
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)conf->args.size(); i++) {
SrsListener* listener = new SrsListener(this, SrsListenerRtmpStream);
listeners.push_back(listener);
int port = ::atoi(conf->args.at(i).c_str());
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("RTMP stream listen at port %d failed. ret=%d", port, ret);
return ret;
}
}
return ret;
}
int SrsServer::listen_http_api()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP_API
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
int port = _srs_config->get_http_api_listen();
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("HTTP api listen at port %d failed. ret=%d", port, ret);
return ret;
}
}
#endif
return ret;
}
int SrsServer::listen_http_stream()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP_SERVER
close_listeners(SrsListenerHttpStream);
if (_srs_config->get_http_stream_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpStream);
listeners.push_back(listener);
int port = _srs_config->get_http_stream_listen();
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("HTTP stream listen at port %d failed. ret=%d", port, ret);
return ret;
}
}
#endif
return ret;
}
void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
for (it = listeners.begin(); it != listeners.end(); ++it) {
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->type() != type) {
++it;
continue;
}
srs_freep(listener);
it = listeners.erase(it);
}
listeners.clear();
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
... ...
... ... @@ -56,17 +56,18 @@ enum SrsListenerType
class SrsListener : public ISrsThreadHandler
{
public:
SrsListenerType type;
SrsListenerType _type;
private:
int fd;
st_netfd_t stfd;
int port;
SrsServer* server;
int _port;
SrsServer* _server;
SrsThread* pthread;
public:
SrsListener(SrsServer* _server, SrsListenerType _type);
SrsListener(SrsServer* _server, SrsListenerType type);
virtual ~SrsListener();
public:
virtual SrsListenerType type();
virtual int listen(int port);
// interface ISrsThreadHandler.
public:
... ... @@ -106,7 +107,10 @@ public:
virtual void remove(SrsConnection* conn);
virtual void on_signal(int signo);
private:
virtual void close_listeners();
virtual int listen_rtmp();
virtual int listen_http_api();
virtual int listen_http_stream();
virtual void close_listeners(SrsListenerType type);
virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
// interface ISrsThreadHandler.
public:
... ...