wenjiegit

add bandcheck code and modified srs code for merge

... ... @@ -8,16 +8,18 @@ log_dir ./objs/logs;
# if exceed the max connections, server will drop the new connection.
# default: 2000
max_connections 2000;
# the default chunk size is 128, max is 65536,
# some client does not support chunk size change,
# however, most clients supports it and it can improve
# performance about 10%.
# if not specified, set to 4096.
# priority of chunk size in vhost > priority of chunk size in global.
chunk_size 65000;
# vhost list, the __defaultVhost__ is the default vhost
# for example, user use ip to access the stream: rtmp://192.168.1.2/live/livestream.
# for which cannot identify the required vhost.
# for default demo.
vhost __defaultVhost__ {
# the default chunk size is 128, max is 65536,
# some client does not support chunk size change,
# however, most clients supports it and it can improve
# performance about 10%.
# if not specified, set to 4096.
chunk_size 65000;
enabled on;
gop_cache on;
... ... @@ -157,9 +159,8 @@ vhost bandcheck.srs.com {
bandcheck{
enabled on;
key test kate;
interval 30;
max_play_kbps 45000;
max_pub_kbps 25000;
interval 5;
limit_kbps 4000;
}
}
... ... @@ -747,4 +748,3 @@ pithy_print {
hls 3000;
}
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -78,8 +79,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define RTMP_SIG_SRS_WEB "http://blog.csdn.net/win_lin"
#define RTMP_SIG_SRS_EMAIL "winterserver@126.com"
#define RTMP_SIG_SRS_LICENSE "The MIT License (MIT)"
#define RTMP_SIG_SRS_COPYRIGHT "Copyright (c) 2013 winlin"
#define RTMP_SIG_SRS_CONTRIBUTOR "winlin"
#define RTMP_SIG_SRS_COPYRIGHT "Copyright (c) 2013 winlin,wenjiegit"
#define RTMP_SIG_SRS_CONTRIBUTOR "winlin,wenjiegit"
// compare
#define srs_min(a, b) (((a) < (b))? (a) : (b))
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -153,32 +154,35 @@ int SrsClient::service_cycle()
}
srs_verbose("set peer bandwidth success");
if(config->get_bw_check_enabled(req->vhost, req->bw_key))
{
if (config->get_bw_check_enabled(req->vhost, req->bw_key)) {
static int64_t last_check_time_ms = srs_get_system_time_ms();
int64_t interval_ms = 0;
int play_kbps = 0;
int pub_kbps = 0;
config->get_bw_check_settings(req->vhost, interval_ms, play_kbps, pub_kbps);
int limit_kbps = 0;
config->get_bw_check_settings(req->vhost, interval_ms, limit_kbps);
if((srs_get_system_time_ms() - last_check_time_ms) < interval_ms
&& last_check_time_ms != srs_get_system_time_ms())
{
srs_trace("bandcheck interval less than limted interval. last time=%lld, current time=%lld"
, last_check_time_ms, srs_get_system_time_ms());
return rtmp->response_connect_reject(req, "your bandcheck frequency is too high!");
} else {
last_check_time_ms = srs_get_system_time_ms(); // update last check time
char* local_ip = 0;
if((ret = get_local_ip(local_ip)) != ERROR_SUCCESS){
if ((ret = get_local_ip(local_ip)) != ERROR_SUCCESS) {
srs_error("get local ip failed. ret = %d", ret);
return ret;
}
if ((ret = rtmp->response_connect_app(req, local_ip)) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret);
return ret;
}
return rtmp->start_bandwidth_check(play_kbps, pub_kbps);
return rtmp->start_bandwidth_check(limit_kbps);
}
}
... ... @@ -203,7 +207,7 @@ int SrsClient::service_cycle()
req->strip();
srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
int chunk_size = config->get_chunk_size();
int chunk_size = config->get_chunk_size(req->vhost);
if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
return ret;
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -1449,7 +1450,23 @@ int SrsConfig::get_chunk_size()
return SRS_CONF_DEFAULT_CHUNK_SIZE;
}
return ::atoi(conf->arg0().c_str());
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_chunk_size(const std::string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return get_chunk_size();
}
SrsConfDirective* conf_vhost = conf->get("chunk_size");
if (!conf_vhost) {
return get_chunk_size();
}
return ::atoi(conf_vhost->arg0().c_str());
}
int SrsConfig::get_pithy_print_publish()
... ... @@ -1524,12 +1541,11 @@ bool SrsConfig::get_bw_check_enabled(const std::string &vhost, const std::string
return false;
}
void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps)
void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &limit_kbps)
{
// set default value;
interval_ms = 30 * 1000;
play_kbps = 45000;
pub_kbps = 25000;
limit_kbps = 32000;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
... ... @@ -1537,20 +1553,19 @@ void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interva
}
SrsConfDirective* bw_test = conf->get("bandcheck");
if(!bw_test)
if (!bw_test) {
return;
}
SrsConfDirective* interval_conf = bw_test->get("interval");
if(interval_conf)
if (interval_conf) {
interval_ms = ::atoll(interval_conf->arg0().c_str()) * 1000;
}
SrsConfDirective* play_conf = bw_test->get("max_play_kbps");
if(play_conf)
play_kbps = ::atoi(play_conf->arg0().c_str());
SrsConfDirective* pub_conf = bw_test->get("max_pub_kbps");
if(pub_conf)
pub_kbps = ::atoi(pub_conf->arg0().c_str());
SrsConfDirective* limit_kbps_conf = bw_test->get("limit_kbps");
if (limit_kbps_conf) {
limit_kbps = ::atoi(limit_kbps_conf->arg0().c_str());
}
}
int SrsConfig::get_pithy_print_encoder()
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -164,13 +165,14 @@ public:
virtual SrsConfDirective* get_refer_publish(std::string vhost);
virtual SrsConfDirective* get_listen();
virtual int get_chunk_size();
virtual int get_chunk_size(const std::string& vhost);
virtual int get_pithy_print_publish();
virtual int get_pithy_print_forwarder();
virtual int get_pithy_print_encoder();
virtual int get_pithy_print_hls();
virtual int get_pithy_print_play();
virtual bool get_bw_check_enabled(const std::string &vhost, const std::string &key);
virtual void get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps);
virtual void get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &limit_kbps);
};
/**
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -51,7 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SOCKET_WRITE 209
#define ERROR_SOCKET_WAIT 210
#define ERROR_SOCKET_TIMEOUT 211
#define ERROR_SOCKET_GET_LOCAL_IP 222
#define ERROR_SOCKET_GET_LOCAL_IP 212
#define ERROR_RTMP_PLAIN_REQUIRED 300
#define ERROR_RTMP_CHUNK_START 301
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -1355,10 +1356,6 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_verbose("start to decode set chunk size message.");
packet = new SrsSetChunkSizePacket();
return packet->decode(stream);
} else if(header.is_windows_ackledgement()) {
srs_verbose("start to decode AcknowledgementPacket message.");
packet = new SrsAcknowledgementPacket();
return packet->decode(stream);
} else {
// default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type);
... ...
... ... @@ -2,6 +2,7 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
... ... @@ -67,6 +68,25 @@ class ISrsMessage;
#define RTMP_MAX_FMT3_HEADER_SIZE 5
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsOnStatusCallPacket as its internal packet type,
* so ensure you set command name when you use it.
*/
// for play
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
// for publish
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/**
* the protocol provides the rtmp-message-protocol services,
* to recv RTMP message from RTMP chunk stream,
* and to send out RTMP message over RTMP chunk stream.
... ... @@ -802,26 +822,6 @@ protected:
virtual int encode_packet(SrsStream* stream);
};
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsOnStatusCallPacket as its internal packet type,
* so ensure you set command name when you use it.
*/
// for play
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
// for publish
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/**
* onStatus command, AMF0 Call
* @remark, user must set the stream_id by SrsMessage.set_packet().
... ...
... ... @@ -1106,7 +1106,7 @@ int SrsRtmp::start_flash_publish(int stream_id)
return ret;
}
int SrsRtmp::start_bandwidth_check(int max_play_kbps, int max_pub_kbps)
int SrsRtmp::start_bandwidth_check(int limit_kbps)
{
int ret = ERROR_SUCCESS;
... ... @@ -1121,11 +1121,11 @@ int SrsRtmp::start_bandwidth_check(int max_play_kbps, int max_pub_kbps)
int publish_bytes = 0;
int64_t start_time = srs_get_system_time_ms();
if((ret = bandwidth_check_play(play_duration_ms, play_interval_ms,
play_actual_duration_ms, play_bytes, max_play_kbps) != ERROR_SUCCESS)
if ((ret = bandwidth_check_play(play_duration_ms, play_interval_ms,
play_actual_duration_ms, play_bytes, limit_kbps) != ERROR_SUCCESS)
|| (ret = bandwidth_check_publish(publish_duration_ms, publish_interval_ms,
publish_actual_duration_ms, publish_bytes, max_pub_kbps)) != ERROR_SUCCESS)
{
publish_actual_duration_ms, publish_bytes, limit_kbps)) != ERROR_SUCCESS) {
srs_error("band width check failed. ret = %d", ret);
return ret;
... ... @@ -1272,8 +1272,7 @@ int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_
// recv client's starting play response
while (true) {
SrsCommonMessage* msg = 0;
if( (ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
if ( (ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's starting play response failed. ret= %d", ret);
return ret;
}
... ... @@ -1286,21 +1285,19 @@ int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_
srs_trace("BW check recv play begin response.");
// send play data to client
int64_t current_Time = srs_get_system_time_ms();
int size = 1024*4; // 32KB
int64_t current_time = srs_get_system_time_ms();
int size = 1024;
char random_data[size];
memset(random_data, 0x01, size);
int64_t last_time = current_Time;
int interval = 0;
while ( (srs_get_system_time_ms() - current_Time) < duration_ms ){
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(interval);
SrsCommonMessage* msg = new SrsCommonMessage;
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket;
pkt->command_name = SRS_BW_CHECK_PLAYING;
for(int i = 0; i < 10; ++i)
{
for (int i = 0; i < 100; ++i) {
char buf[32];
sprintf(buf, "%d", i);
pkt->data->set(buf, new SrsAmf0String(random_data));
... ... @@ -1310,22 +1307,24 @@ int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_
play_bytes += pkt->get_payload_length();
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms;
int kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_Time);
if(kbps > max_play_kbps){
interval += 1000*3; // 2 ms
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
// sleep while current kbps <= max_play_kbps
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_play_kbps) {
st_usleep(500);
} else {
interval -= 1000*3;
if(interval < 0)
interval = 0;
break;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_Time;
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check send play bytes over.");
// notify client to stop play
... ... @@ -1348,8 +1347,7 @@ int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_
// recv client's stop play response.
while (true) {
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's stop play response failed. ret = %d", ret);
return ret;
}
... ... @@ -1386,8 +1384,7 @@ int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actu
// read client's notification of starting publish
while (true) {
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's notification of starting publish failed. ret = %d", ret);
return ret;
}
... ... @@ -1401,12 +1398,8 @@ int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actu
// recv publish msgs until @duration_ms ms
int64_t current_time = srs_get_system_time_ms();
int64_t last_time = current_time;
int interval = 0;
while( (srs_get_system_time_ms() - current_time) < duration_ms )
{
st_usleep(interval);
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
... ... @@ -1416,14 +1409,15 @@ int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actu
publish_bytes += msg->header.payload_length;
if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms;
int kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time);
if(kbps > max_pub_kbps){
interval += 1000*3; // 2 ms
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_pub_kbps) {
st_usleep(500);
} else {
interval -= 1000*3;
if(interval < 0)
interval = 0;
break;
}
}
}
... ... @@ -1448,11 +1442,10 @@ int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actu
// recv left msg
while (true) {
if((ret = st_netfd_poll(stfd, POLLIN, 1000*500)) == ERROR_SUCCESS)
{
if((ret = st_netfd_poll(stfd, POLLIN, 1000*500)) == ERROR_SUCCESS) {
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's left msg failed, ret = %d", ret);
return ret;
}
... ...
... ... @@ -225,7 +225,7 @@ public:
/**
* used to process band width check from client.
*/
virtual int start_bandwidth_check(int max_play_kbps, int max_pub_kbps);
virtual int start_bandwidth_check(int limit_kbps);
private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
... ...