winlin

fix #316, http api provides stream/vhost/srs/server bytes, codec and count. 2.0.136

... ... @@ -550,6 +550,7 @@ Supported operating systems and hardware:
### SRS 2.0 history
* v2.0, 2015-03-08, fix [#316](https://github.com/winlinvip/simple-rtmp-server/issues/316), http api provides stream/vhost/srs/server bytes, codec and count. 2.0.136.
* v2.0, 2015-03-08, fix [#310](https://github.com/winlinvip/simple-rtmp-server/issues/310), refine aac LC, support aac HE/HEv2. 2.0.134.
* v2.0, 2015-03-06, for [#322](https://github.com/winlinvip/simple-rtmp-server/issues/322), fix http-flv stream bug, support multiple streams. 2.0.133.
* v2.0, 2015-03-06, refine http request parse. 2.0.132.
... ...
... ... @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
{
id = 0;
server = srs_server;
stfd = client_stfd;
... ... @@ -55,6 +56,8 @@ int SrsConnection::cycle()
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
id = _srs_context->get_id();
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
ret = do_cycle();
... ... @@ -86,6 +89,11 @@ void SrsConnection::on_thread_stop()
server->remove(this);
}
int SrsConnection::srs_id()
{
return id;
}
void SrsConnection::stop()
{
srs_close_stfd(stfd);
... ...
... ... @@ -51,6 +51,10 @@ private:
* when thread stop, the connection will be delete by server.
*/
SrsThread* pthread;
/**
* the id of connection.
*/
int id;
protected:
/**
* the server object to manage the connection.
... ... @@ -92,14 +96,9 @@ public:
virtual void on_thread_stop();
public:
/**
* reset and start sample of bytes.
* when server to get the kbps of connection,
* it cannot wait the connection terminated then get the kbps,
* it must sample the kbps every some interval, for instance, 9s to sample all connections kbps,
* all connections will extends from IKbpsDelta which provides the bytes delta,
* while the delta must be update by the sample which invoke by the kbps_resample().
* get the srs id which identify the client.
*/
virtual void kbps_resample() = 0;
virtual int srs_id();
protected:
/**
* for concrete connection to do the cycle.
... ...
... ... @@ -486,7 +486,7 @@ SrsHttpApi::~SrsHttpApi()
srs_freep(parser);
}
void SrsHttpApi::kbps_resample()
void SrsHttpApi::resample()
{
// TODO: FIXME: implements it
}
... ... @@ -503,6 +503,11 @@ int64_t SrsHttpApi::get_recv_bytes_delta()
return 0;
}
void SrsHttpApi::cleanup()
{
// TODO: FIXME: implements it
}
int SrsHttpApi::do_cycle()
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -168,12 +168,12 @@ private:
public:
SrsHttpApi(SrsServer* svr, st_netfd_t fd, SrsHttpServeMux* m);
virtual ~SrsHttpApi();
public:
virtual void kbps_resample();
// interface IKbpsDelta
public:
virtual void resample();
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
protected:
virtual int do_cycle();
private:
... ...
... ... @@ -1193,7 +1193,7 @@ SrsHttpConn::~SrsHttpConn()
srs_freep(parser);
}
void SrsHttpConn::kbps_resample()
void SrsHttpConn::resample()
{
// TODO: FIXME: implements it
}
... ... @@ -1210,6 +1210,11 @@ int64_t SrsHttpConn::get_recv_bytes_delta()
return 0;
}
void SrsHttpConn::cleanup()
{
// TODO: FIXME: implements it
}
int SrsHttpConn::do_cycle()
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -360,12 +360,12 @@ private:
public:
SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m);
virtual ~SrsHttpConn();
public:
virtual void kbps_resample();
// interface IKbpsDelta
public:
virtual void resample();
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
protected:
virtual int do_cycle();
private:
... ...
... ... @@ -203,20 +203,29 @@ int64_t SrsKbps::get_recv_bytes()
return is.get_total_bytes();
}
void SrsKbps::resample()
{
sample();
}
int64_t SrsKbps::get_send_bytes_delta()
{
int64_t delta = os.get_total_bytes() - os.delta_bytes;
os.delta_bytes = os.get_total_bytes();
return delta;
}
int64_t SrsKbps::get_recv_bytes_delta()
{
int64_t delta = is.get_total_bytes() - is.delta_bytes;
is.delta_bytes = is.get_total_bytes();
return delta;
}
void SrsKbps::cleanup()
{
os.delta_bytes = os.get_total_bytes();
is.delta_bytes = is.get_total_bytes();
}
void SrsKbps::add_delta(IKbpsDelta* delta)
{
srs_assert(delta);
... ...
... ... @@ -107,6 +107,11 @@ public:
/**
* the interface which provices delta of bytes.
* for a delta, for example, a live stream connection, we can got the delta by:
* IKbpsDelta* delta = ...;
* delta->resample();
* kbps->add_delta(delta);
* delta->cleanup();
*/
class IKbpsDelta
{
... ... @@ -114,8 +119,19 @@ public:
IKbpsDelta();
virtual ~IKbpsDelta();
public:
/**
* resample to generate the value of delta bytes.
*/
virtual void resample() = 0;
/**
* get the send or recv bytes delta.
*/
virtual int64_t get_send_bytes_delta() = 0;
virtual int64_t get_recv_bytes_delta() = 0;
/**
* cleanup the value of delta bytes.
*/
virtual void cleanup() = 0;
};
/**
... ... @@ -123,16 +139,21 @@ public:
* itself can be a statistic source, for example, used for SRS bytes stat.
* there are two usage scenarios:
* 1. connections to calc kbps by sample():
* set_io(in, out)
* sample()
* get_xxx_kbps().
* SrsKbps* kbps = ...;
* kbps->set_io(in, out)
* kbps->sample()
* kbps->get_xxx_kbps().
* the connections know how many bytes already send/recv.
* 2. server to calc kbps by add_delta():
* set_io(NULL, NULL)
* SrsKbps* kbps = ...;
* kbps->set_io(NULL, NULL)
* for each connection in connections:
* add_delta(connections) // where connection is a IKbpsDelta*
* sample()
* get_xxx_kbps().
* IKbpsDelta* delta = connection; // where connection implements IKbpsDelta
* delta->resample()
* kbps->add_delta(delta)
* delta->cleanup()
* kbps->sample()
* kbps->get_xxx_kbps().
* the server never know how many bytes already send/recv, for the connection maybe closed.
*/
class SrsKbps : public virtual ISrsProtocolStatistic, public virtual IKbpsDelta
... ... @@ -174,18 +195,26 @@ public:
*/
virtual int64_t get_send_bytes();
virtual int64_t get_recv_bytes();
public:
/**
* resample to get the delta.
*/
virtual void resample();
/**
* get the delta of send/recv bytes.
* @remark, used for add_delta to calc the total system bytes/kbps.
*/
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
/**
* cleanup the delta.
*/
virtual void cleanup();
public:
/**
* add delta to kbps clac mechenism.
* we donot know the total bytes, but know the delta, for instance,
* for rtmp server to calc total bytes and kbps.
* @remark user must invoke sample() when invoke this method.
* @remark user must invoke sample() to calc result after invoke this method.
* @param delta, assert should never be NULL.
*/
virtual void add_delta(IKbpsDelta* delta);
... ...
... ... @@ -110,11 +110,6 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(kbps);
}
void SrsRtmpConn::kbps_resample()
{
kbps->sample();
}
// TODO: return detail message when error for client.
int SrsRtmpConn::do_cycle()
{
... ... @@ -255,6 +250,11 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
return ret;
}
void SrsRtmpConn::resample()
{
kbps->resample();
}
int64_t SrsRtmpConn::get_send_bytes_delta()
{
return kbps->get_send_bytes_delta();
... ... @@ -265,6 +265,11 @@ int64_t SrsRtmpConn::get_recv_bytes_delta()
return kbps->get_recv_bytes_delta();
}
void SrsRtmpConn::cleanup()
{
kbps->cleanup();
}
int SrsRtmpConn::service_cycle()
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -83,8 +83,6 @@ private:
public:
SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd);
virtual ~SrsRtmpConn();
public:
virtual void kbps_resample();
protected:
virtual int do_cycle();
// interface ISrsReloadHandler
... ... @@ -94,8 +92,10 @@ public:
virtual int on_reload_vhost_realtime(std::string vhost);
// interface IKbpsDelta
public:
virtual void resample();
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
private:
// when valid and connected to vhost/app, service the client.
virtual int service_cycle();
... ...
... ... @@ -45,6 +45,7 @@ using namespace std;
#include <srs_app_heartbeat.hpp>
#include <srs_app_mpegts_udp.hpp>
#include <srs_app_rtsp.hpp>
#include <srs_app_statistic.hpp>
// signal defines.
#define SIGNAL_RELOAD SIGHUP
... ... @@ -392,7 +393,6 @@ SrsServer::SrsServer()
pid_fd = -1;
signal_manager = NULL;
kbps = NULL;
// donot new object in constructor,
// for some global instance is not ready now,
... ... @@ -452,7 +452,6 @@ void SrsServer::destroy()
}
srs_freep(signal_manager);
srs_freep(kbps);
// @remark never destroy the connections,
// for it's still alive.
... ... @@ -478,10 +477,6 @@ int SrsServer::initialize()
srs_assert(!signal_manager);
signal_manager = new SrsSignalManager(this);
srs_assert(!kbps);
kbps = new SrsKbps();
kbps->set_io(NULL, NULL);
#ifdef SRS_AUTO_HTTP_API
if ((ret = http_api_mux->initialize()) != ERROR_SUCCESS) {
return ret;
... ... @@ -745,12 +740,8 @@ void SrsServer::remove(SrsConnection* conn)
srs_info("conn removed. conns=%d", (int)conns.size());
// resample the kbps to collect the delta.
conn->kbps_resample();
// add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat.
kbps->add_delta(conn);
SrsStatistic* stat = SrsStatistic::instance();
stat->kbps_add_delta(conn);
// all connections are created by server,
// so we free it here.
... ... @@ -868,7 +859,6 @@ int SrsServer::do_cycle()
if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
srs_info("update network server kbps info.");
resample_kbps();
srs_update_rtmp_server((int)conns.size(), kbps);
}
#ifdef SRS_AUTO_HTTP_PARSER
if (_srs_config->get_heartbeat_enabled()) {
... ... @@ -1019,22 +1009,23 @@ void SrsServer::close_listeners(SrsListenerType type)
void SrsServer::resample_kbps()
{
SrsStatistic* stat = SrsStatistic::instance();
// collect delta from all clients.
for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
SrsConnection* conn = *it;
// resample the kbps to collect the delta.
conn->kbps_resample();
// add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat.
kbps->add_delta(conn);
stat->kbps_add_delta(conn);
}
// TODO: FXME: support all other connections.
// sample the kbps, get the stat.
kbps->sample();
SrsKbps* kbps = stat->kbps_sample();
srs_update_rtmp_server((int)conns.size(), kbps);
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
... ...
... ... @@ -210,10 +210,6 @@ private:
*/
SrsSignalManager* signal_manager;
/**
* server total kbps.
*/
SrsKbps* kbps;
/**
* user send the signal, convert to variable.
*/
bool signal_reload;
... ...
... ... @@ -29,6 +29,8 @@ using namespace std;
#include <srs_rtmp_sdk.hpp>
#include <srs_app_json.hpp>
#include <srs_app_kbps.hpp>
#include <srs_app_conn.hpp>
int64_t __srs_gvid = getpid();
... ... @@ -40,10 +42,14 @@ int64_t __srs_generate_id()
SrsStatisticVhost::SrsStatisticVhost()
{
id = __srs_generate_id();
kbps = new SrsKbps();
kbps->set_io(NULL, NULL);
}
SrsStatisticVhost::~SrsStatisticVhost()
{
srs_freep(kbps);
}
SrsStatisticStream::SrsStatisticStream()
... ... @@ -61,10 +67,14 @@ SrsStatisticStream::SrsStatisticStream()
asample_rate = SrsCodecAudioSampleRateReserved;
asound_type = SrsCodecAudioSoundTypeReserved;
aac_object = SrsAacObjectTypeReserved;
kbps = new SrsKbps();
kbps->set_io(NULL, NULL);
}
SrsStatisticStream::~SrsStatisticStream()
{
srs_freep(kbps);
}
void SrsStatisticStream::close()
... ... @@ -78,10 +88,15 @@ SrsStatistic* SrsStatistic::_instance = new SrsStatistic();
SrsStatistic::SrsStatistic()
{
_server_id = __srs_generate_id();
kbps = new SrsKbps();
kbps->set_io(NULL, NULL);
}
SrsStatistic::~SrsStatistic()
{
srs_freep(kbps);
if (true) {
std::map<std::string, SrsStatisticVhost*>::iterator it;
for (it = vhosts.begin(); it != vhosts.end(); it++) {
... ... @@ -183,6 +198,49 @@ void SrsStatistic::on_disconnect(int id)
}
}
void SrsStatistic::kbps_add_delta(SrsConnection* conn)
{
int id = conn->srs_id();
if (clients.find(id) == clients.end()) {
return;
}
SrsStatisticClient* client = clients[id];
// resample the kbps to collect the delta.
conn->resample();
// add delta of connection to kbps.
// for next sample() of server kbps can get the stat.
kbps->add_delta(conn);
client->stream->kbps->add_delta(conn);
client->stream->vhost->kbps->add_delta(conn);
// cleanup the delta.
conn->cleanup();
}
SrsKbps* SrsStatistic::kbps_sample()
{
kbps->sample();
if (true) {
std::map<std::string, SrsStatisticVhost*>::iterator it;
for (it = vhosts.begin(); it != vhosts.end(); it++) {
SrsStatisticVhost* vhost = it->second;
vhost->kbps->sample();
}
}
if (true) {
std::map<std::string, SrsStatisticStream*>::iterator it;
for (it = streams.begin(); it != streams.end(); it++) {
SrsStatisticStream* stream = it->second;
stream->kbps->sample();
}
}
return kbps;
}
int64_t SrsStatistic::server_id()
{
return _server_id;
... ... @@ -202,7 +260,9 @@ int SrsStatistic::dumps_vhosts(stringstream& ss)
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_ORG("id", vhost->id) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("name", vhost->vhost)
<< __SRS_JFIELD_STR("name", vhost->vhost) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("send_bytes", vhost->kbps->get_send_bytes()) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("recv_bytes", vhost->kbps->get_recv_bytes())
<< __SRS_JOBJECT_END;
}
ss << __SRS_JARRAY_END;
... ... @@ -235,7 +295,9 @@ int SrsStatistic::dumps_streams(stringstream& ss)
<< __SRS_JFIELD_ORG("id", stream->id) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("name", stream->stream) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("vhost", stream->vhost->id) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("clients", client_num) << __SRS_JFIELD_CONT;
<< __SRS_JFIELD_ORG("clients", client_num) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("send_bytes", stream->kbps->get_send_bytes()) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_ORG("recv_bytes", stream->kbps->get_recv_bytes()) << __SRS_JFIELD_CONT;
if (!stream->has_video) {
ss << __SRS_JFIELD_NULL("video") << __SRS_JFIELD_CONT;
... ...
... ... @@ -35,7 +35,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_codec.hpp>
class SrsKbps;
class SrsRequest;
class SrsConnection;
struct SrsStatisticVhost
{
... ... @@ -43,6 +45,11 @@ public:
int64_t id;
std::string vhost;
public:
/**
* vhost total kbps.
*/
SrsKbps* kbps;
public:
SrsStatisticVhost();
virtual ~SrsStatisticVhost();
};
... ... @@ -56,6 +63,11 @@ public:
std::string stream;
std::string url;
public:
/**
* stream total kbps.
*/
SrsKbps* kbps;
public:
bool has_video;
SrsCodecVideo vcodec;
// profile_idc, H.264-AVC-ISO_IEC_14496-10.pdf, page 45.
... ... @@ -103,6 +115,8 @@ private:
std::map<std::string, SrsStatisticStream*> streams;
// key: client id, value: stream object.
std::map<int, SrsStatisticClient*> clients;
// server total kbps.
SrsKbps* kbps;
private:
SrsStatistic();
virtual ~SrsStatistic();
... ... @@ -137,6 +151,16 @@ public:
* client disconnect
*/
virtual void on_disconnect(int id);
/**
* sample the kbps, add delta bytes of conn.
* use kbps_sample() to get all result of kbps stat.
*/
virtual void kbps_add_delta(SrsConnection* conn);
/**
* calc the result for all kbps.
* @return the server kbps.
*/
virtual SrsKbps* kbps_sample();
public:
/**
* get the server id, used to identify the server.
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 135
#define VERSION_REVISION 136
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
... ...
... ... @@ -39,7 +39,7 @@ string srs_codec_video2str(SrsCodecVideo codec)
return "H264";
case SrsCodecVideoOn2VP6:
case SrsCodecVideoOn2VP6WithAlphaChannel:
return "H264";
return "VP6";
case SrsCodecVideoReserved:
case SrsCodecVideoReserved1:
case SrsCodecVideoReserved2:
... ...