winlin

refine server, add comments

... ... @@ -34,7 +34,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
{
server = srs_server;
stfd = client_stfd;
connection_id = 0;
// the client thread should reap itself,
// so we never use joinable.
// TODO: FIXME: maybe other thread need to stop it.
... ... @@ -57,7 +57,6 @@ int SrsConnection::cycle()
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
connection_id = _srs_context->get_id();
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
ret = do_cycle();
... ...
... ... @@ -37,26 +37,78 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_kbps.hpp>
class SrsServer;
/**
* the basic connection of SRS,
* all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove.
*/
class SrsConnection : public virtual ISrsThreadHandler, public virtual IKbpsDelta
{
private:
/**
* each connection start a green thread,
* when thread stop, the connection will be delete by server.
*/
SrsThread* pthread;
protected:
/**
* the server object to manage the connection.
*/
SrsServer* server;
/**
* the underlayer st fd handler.
*/
st_netfd_t stfd;
int connection_id;
/**
* the ip of client.
*/
std::string ip;
public:
SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd);
virtual ~SrsConnection();
public:
/**
* start the client green thread.
* when server get a client from listener,
* 1. server will create an concrete connection(for instance, RTMP connection),
* 2. then add connection to its connection manager,
* 3. start the client thread by invoke this start()
* when client cycle thread stop, invoke the on_thread_stop(), which will use server
* to remove the client by server->remove(this).
*/
virtual int start();
/**
* the thread cycle function,
* when serve connection completed, terminate the loop which will terminate the thread,
* thread will invoke the on_thread_stop() when it terminated.
*/
virtual int cycle();
/**
* when the thread cycle finished, thread will invoke the on_thread_stop(),
* which will remove self from server, server will remove the connection from manager
* then delete the connection.
*/
virtual void on_thread_stop();
public:
/**
* when server to get the kbps of connection,
* it cannot wait the connection terminated then get the kbps,
* it must sample the kbps every some interval, for instance, 9s to sample all connections kbps,
* all connections will extends from IKbpsDelta which provides the bytes delta,
* while the delta must be update by the sample which invoke by the kbps_resample().
*/
virtual void kbps_resample() = 0;
protected:
/**
* for concrete connection to do the cycle.
*/
virtual int do_cycle() = 0;
private:
/**
* when delete the connection, stop the connection,
* close the underlayer socket, delete the thread.
*/
virtual void stop();
};
... ...
... ... @@ -94,8 +94,6 @@ SrsRtmpConn::~SrsRtmpConn()
{
_srs_config->unsubscribe(this);
stop();
srs_freep(req);
srs_freep(res);
srs_freep(rtmp);
... ... @@ -993,6 +991,7 @@ int SrsRtmpConn::http_hooks_on_connect()
return ret;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_connect->args.size(); i++) {
std::string url = on_connect->args.at(i);
if ((ret = SrsHttpHooks::on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) {
... ... @@ -1016,6 +1015,7 @@ void SrsRtmpConn::http_hooks_on_close()
return;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_close->args.size(); i++) {
std::string url = on_close->args.at(i);
SrsHttpHooks::on_close(url, connection_id, ip, req);
... ... @@ -1035,6 +1035,7 @@ int SrsRtmpConn::http_hooks_on_publish()
return ret;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_publish->args.size(); i++) {
std::string url = on_publish->args.at(i);
if ((ret = SrsHttpHooks::on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) {
... ... @@ -1058,6 +1059,7 @@ void SrsRtmpConn::http_hooks_on_unpublish()
return;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_unpublish->args.size(); i++) {
std::string url = on_unpublish->args.at(i);
SrsHttpHooks::on_unpublish(url, connection_id, ip, req);
... ... @@ -1077,6 +1079,7 @@ int SrsRtmpConn::http_hooks_on_play()
return ret;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_play->args.size(); i++) {
std::string url = on_play->args.at(i);
if ((ret = SrsHttpHooks::on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) {
... ... @@ -1100,6 +1103,7 @@ void SrsRtmpConn::http_hooks_on_stop()
return;
}
int connection_id = _srs_context->get_id();
for (int i = 0; i < (int)on_stop->args.size(); i++) {
std::string url = on_stop->args.at(i);
SrsHttpHooks::on_stop(url, connection_id, ip, req);
... ...
... ... @@ -857,6 +857,7 @@ void SrsServer::resample_kbps(SrsConnection* conn, bool do_resample)
kbps->add_delta(conn);
// resample for server.
if (do_resample) {
kbps->sample();
}
... ...
... ... @@ -128,17 +128,45 @@ private:
SrsIngester* ingester;
#endif
private:
/**
* the pid file fd, lock the file write when server is running.
* @remark the init.d script should cleanup the pid file, when stop service,
* for the server never delete the file; when system startup, the pid in pid file
* maybe valid but the process is not SRS, the init.d script will never start server.
*/
int pid_fd;
/**
* all connections, connection manager
*/
std::vector<SrsConnection*> conns;
/**
* all listners, listener manager.
*/
std::vector<SrsListener*> listeners;
/**
* signal manager which convert gignal to io message.
*/
SrsSignalManager* signal_manager;
/**
* server total kbps.
*/
SrsKbps* kbps;
/**
* user send the signal, convert to variable.
*/
bool signal_reload;
bool signal_gmc_stop;
public:
SrsServer();
virtual ~SrsServer();
public:
/**
* the destroy is for gmc to analysis the memory leak,
* if not destroy global/static data, the gmc will warning memory leak.
* in service, server never destroy, directly exit when restart.
*/
virtual void destroy();
// server startup workflow, @see run_master()
public:
virtual int initialize();
virtual int initialize_signal();
... ... @@ -148,18 +176,58 @@ public:
virtual int register_signal();
virtual int ingest();
virtual int cycle();
// server utility
public:
/**
* callback for connection to remove itself.
* when connection thread cycle terminated, callback this to delete connection.
* @see SrsConnection.on_thread_stop().
*/
virtual void remove(SrsConnection* conn);
/**
* callback for signal manager got a signal.
* the signal manager convert signal to io message,
* whatever, we will got the signo like the orignal signal(int signo) handler.
* @remark, direclty exit for SIGTERM.
* @remark, do reload for SIGNAL_RELOAD.
* @remark, for SIGINT and SIGUSR2:
* no gmc, directly exit.
* for gmc, set the variable signal_gmc_stop, the cycle will return and cleanup for gmc.
*/
virtual void on_signal(int signo);
private:
/**
* the server thread main cycle,
* update the global static data, for instance, the current time,
* the cpu/mem/network statistic.
*/
virtual int do_cycle();
/**
* listen at specified protocol.
*/
virtual int listen_rtmp();
virtual int listen_http_api();
virtual int listen_http_stream();
/**
* close the listeners for specified type,
* remove the listen object from manager.
*/
virtual void close_listeners(SrsListenerType type);
// resample the server kbps. resample all when conn is NULL.
/**
* resample the server kbps.
* if conn is NULL, resample all connections delta, then calc the total kbps.
* @param conn, the connection to do resample the kbps. NULL to resample all connections.
* @param do_resample, whether resample the server kbps. always false when sample a connection.
*/
virtual void resample_kbps(SrsConnection* conn, bool do_resample = true);
// internal only
public:
/**
* when listener got a fd, notice server to accept it.
* @param type, the client type, used to create concrete connection,
* for instance RTMP connection to serve client.
* @param client_stfd, the client fd in st boxed, the underlayer fd.
*/
virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
// interface ISrsThreadHandler.
public:
... ...