winlin

refine kbps, provides 30s,1m,5m,60m kbps. 0.9.97

... ... @@ -172,9 +172,12 @@ int SrsEdgeIngester::ingest()
// pithy print
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("<- "SRS_LOG_ID_EDGE_PLAY
" time=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(),
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
// read from client.
... ... @@ -464,9 +467,12 @@ int SrsEdgeForwarder::cycle()
// pithy print
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH
" time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
// ignore when no messages.
... ...
... ... @@ -342,9 +342,12 @@ int SrsForwarder::forward()
// pithy print
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("-> "SRS_LOG_ID_FOWARDER
" time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
// ignore when no messages.
... ...
... ... @@ -28,6 +28,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_io.hpp>
#include <srs_kernel_utility.hpp>
SrsKbpsSample::SrsKbpsSample()
{
bytes = time = 0;
kbps = 0;
}
SrsKbpsSlice::SrsKbpsSlice()
{
io.in = NULL;
... ... @@ -39,6 +45,59 @@ SrsKbpsSlice::~SrsKbpsSlice()
{
}
int64_t SrsKbpsSlice::get_total_bytes()
{
return bytes + last_bytes - io_bytes_base;
}
void SrsKbpsSlice::sample()
{
int64_t now = srs_get_system_time_ms();
int64_t total_bytes = get_total_bytes();
if (sample_30s.time <= 0) {
sample_30s.kbps = 0;
sample_30s.time = now;
sample_30s.bytes = total_bytes;
}
if (sample_1m.time <= 0) {
sample_1m.kbps = 0;
sample_1m.time = now;
sample_1m.bytes = total_bytes;
}
if (sample_5m.time <= 0) {
sample_5m.kbps = 0;
sample_5m.time = now;
sample_5m.bytes = total_bytes;
}
if (sample_60m.time <= 0) {
sample_60m.kbps = 0;
sample_60m.time = now;
sample_60m.bytes = total_bytes;
}
if (now - sample_30s.time > 30 * 1000) {
sample_30s.kbps = (total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time);
sample_30s.time = now;
sample_30s.bytes = total_bytes;
}
if (now - sample_1m.time > 60 * 1000) {
sample_1m.kbps = (total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time);
sample_1m.time = now;
sample_1m.bytes = total_bytes;
}
if (now - sample_5m.time > 300 * 1000) {
sample_5m.kbps = (total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time);
sample_5m.time = now;
sample_5m.bytes = total_bytes;
}
if (now - sample_60m.time > 3600 * 1000) {
sample_60m.kbps = (total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time);
sample_60m.time = now;
sample_60m.bytes = total_bytes;
}
}
SrsKbps::SrsKbps()
{
}
... ... @@ -64,6 +123,8 @@ void SrsKbps::set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out)
if (in) {
is.last_bytes = is.io_bytes_base = in->get_recv_bytes();
}
// resample
is.sample();
// set output stream
// now, set start time.
... ... @@ -80,41 +141,74 @@ void SrsKbps::set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out)
if (out) {
os.last_bytes = os.io_bytes_base = out->get_send_bytes();
}
// resample
os.sample();
}
int SrsKbps::get_send_kbps()
{
int64_t duration = srs_get_system_time_ms() - is.starttime;
int64_t bytes = get_send_bytes();
if (duration <= 0) {
return 0;
}
int64_t bytes = get_send_bytes();
return bytes * 8 / duration;
}
int SrsKbps::get_recv_kbps()
{
int64_t duration = srs_get_system_time_ms() - os.starttime;
int64_t bytes = get_recv_bytes();
if (duration <= 0) {
return 0;
}
int64_t bytes = get_recv_bytes();
return bytes * 8 / duration;
}
int SrsKbps::get_send_kbps_sample_high()
{
return os.sample_30s.kbps;
}
int SrsKbps::get_recv_kbps_sample_high()
{
return is.sample_30s.kbps;
}
int SrsKbps::get_send_kbps_sample_medium()
{
return os.sample_5m.kbps;
}
int SrsKbps::get_recv_kbps_sample_medium()
{
return is.sample_5m.kbps;
}
int64_t SrsKbps::get_send_bytes()
{
if (os.io.out) {
os.last_bytes = os.io.out->get_send_bytes();
}
return os.bytes + os.last_bytes - os.io_bytes_base;
return os.get_total_bytes();
}
int64_t SrsKbps::get_recv_bytes()
{
return is.get_total_bytes();
}
void SrsKbps::sample()
{
if (os.io.out) {
os.last_bytes = os.io.out->get_send_bytes();
}
// resample
os.sample();
if (is.io.in) {
is.last_bytes = is.io.in->get_recv_bytes();
}
return is.bytes + is.last_bytes - is.io_bytes_base;
// resample
is.sample();
}
... ...
... ... @@ -34,7 +34,32 @@ class ISrsProtocolReader;
class ISrsProtocolWriter;
/**
* a kbps sample, for example, 1minute kbps,
* 10minute kbps sample.
*/
class SrsKbpsSample
{
public:
int64_t bytes;
int64_t time;
int kbps;
public:
SrsKbpsSample();
};
/**
* a slice of kbps statistic, for input or output.
* a slice contains a set of sessions, which has a base offset of bytes,
* where a slice is:
* starttime(oldest session startup time)
* bytes(total bytes of previous sessions)
* io_bytes_base(bytes offset of current session)
* last_bytes(bytes of current session)
* so, the total send bytes now is:
* send_bytes = bytes + last_bytes - io_bytes_base
* so, the bytes sent duration current session is:
* send_bytes = last_bytes - io_bytes_base
* @remark user use set_io to start new session.
*/
class SrsKbpsSlice
{
... ... @@ -45,17 +70,34 @@ private:
};
public:
slice_io io;
// session startup bytes
// @remark, use total_bytes() to get the total bytes of slice.
int64_t bytes;
// slice starttime, the first time to record bytes.
int64_t starttime;
// startup bytes number for io when set it,
// session startup bytes number for io when set it,
// the base offset of bytes for io.
int64_t io_bytes_base;
// last updated bytes number,
// cache for io maybe freed.
int64_t last_bytes;
// samples
SrsKbpsSample sample_30s;
SrsKbpsSample sample_1m;
SrsKbpsSample sample_5m;
SrsKbpsSample sample_60m;
public:
SrsKbpsSlice();
virtual ~SrsKbpsSlice();
public:
/**
* get current total bytes.
*/
virtual int64_t get_total_bytes();
/**
* resample all samples.
*/
virtual void sample();
};
/**
... ... @@ -71,6 +113,7 @@ public:
virtual ~SrsKbps();
public:
/**
* set io to start new session.
* set the underlayer reader/writer,
* if the io destroied, for instance, the forwarder reconnect,
* user must set the io of SrsKbps to NULL to continue to use the kbps object.
... ... @@ -82,15 +125,28 @@ public:
public:
/**
* get total kbps, duration is from the startup of io.
* @remark, use sample() to update data.
*/
virtual int get_send_kbps();
virtual int get_recv_kbps();
// 30s
virtual int get_send_kbps_sample_high();
virtual int get_recv_kbps_sample_high();
// 5m
virtual int get_send_kbps_sample_medium();
virtual int get_recv_kbps_sample_medium();
public:
/**
* get the total send/recv bytes, from the startup of the oldest io.
* @remark, use sample() to update data.
*/
virtual int64_t get_send_bytes();
virtual int64_t get_recv_bytes();
public:
/**
* resample all samples.
*/
virtual void sample();
};
#endif
\ No newline at end of file
... ...
... ... @@ -504,9 +504,12 @@ int SrsRtmpConn::playing(SrsSource* source)
// reportable
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("-> "SRS_LOG_ID_PLAY
" time=%"PRId64", duration=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
pithy_print.age(), duration, count, kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
if (count <= 0) {
... ... @@ -590,9 +593,11 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
// reportable
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH
" time=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(),
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
// process UnPublish event.
... ... @@ -666,9 +671,12 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
// reportable
if (pithy_print.can_print()) {
kbps->sample();
srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH
" time=%"PRId64", okbps=%d, ikbps=%d",
pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(),
kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium());
}
// process UnPublish event.
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR "0"
#define VERSION_MINOR "9"
#define VERSION_REVISION "96"
#define VERSION_REVISION "97"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info.
#define RTMP_SIG_SRS_KEY "srs"
... ...