winlin

support pithy print log message specified by stage.

... ... @@ -46,6 +46,7 @@ url: rtmp://127.0.0.1:1935/live/livestream
* nginx v1.5.0: 139524 lines <br/>
### History
* v0.3, 2013-10-29, support pithy print log message specified by stage.
* v0.3, 2013-10-28, support librtmp without extended-timestamp in 0xCX chunk packet.
* v0.3, 2013-10-27, support cache last gop for client fast startup.
* v0.2, 2013-10-25, v0.2 released. 10125 lines.
... ...
... ... @@ -87,7 +87,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server"
"srs_core_rtmp" "srs_core_socket" "srs_core_buffer"
"srs_core_auto_free" "srs_core_protocol" "srs_core_amf0"
"srs_core_stream" "srs_core_source" "srs_core_codec"
"srs_core_complex_handshake")
"srs_core_complex_handshake" "srs_core_pithy_print")
MODULE_DIR="src/core" . auto/modules.sh
CORE_OBJS="${MODULE_OBJS[@]}"
... ...
... ... @@ -22,3 +22,24 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_core.hpp>
#include <sys/time.h>
static int64_t _srs_system_time_us_cache = 0;
int64_t srs_get_system_time_ms()
{
return _srs_system_time_us_cache / 1000;
}
void srs_update_system_time_ms()
{
timeval now;
gettimeofday(&now, NULL);
// we must convert the tv_sec/tv_usec to int64_t.
_srs_system_time_us_cache = now.tv_sec * 1000 * 1000 + now.tv_usec;
_srs_system_time_us_cache = srs_max(0, _srs_system_time_us_cache);
}
... ...
... ... @@ -79,4 +79,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define srs_min(a, b) ((a < b)? a : b)
#define srs_max(a, b) ((a < b)? b : a)
// get current system time in ms, use cache to avoid performance problem
extern int64_t srs_get_system_time_ms();
// the deamon st-thread will update it.
extern void srs_update_system_time_ms();
#endif
\ No newline at end of file
... ...
... ... @@ -32,9 +32,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_auto_free.hpp>
#include <srs_core_source.hpp>
#include <srs_core_server.hpp>
#include <srs_core_pithy_print.hpp>
#define SRS_PULSE_TIMEOUT_MS 100
#define SRS_SEND_TIMEOUT_MS 5000
#define SRS_RECV_TIMEOUT_MS SRS_SEND_TIMEOUT_MS
SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd)
... ... @@ -61,9 +63,10 @@ int SrsClient::do_cycle()
srs_error("get peer ip failed. ret=%d", ret);
return ret;
}
srs_verbose("get peer ip success. ip=%s", ip);
srs_trace("get peer ip success. ip=%s, send_to=%d, recv_to=%d",
ip, SRS_SEND_TIMEOUT_MS, SRS_RECV_TIMEOUT_MS);
rtmp->set_recv_timeout(SRS_SEND_TIMEOUT_MS * 1000);
rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_MS * 1000);
rtmp->set_send_timeout(SRS_SEND_TIMEOUT_MS * 1000);
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
... ... @@ -135,7 +138,7 @@ int SrsClient::do_cycle()
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str());
return streaming_play(source);
return playing(source);
}
case SrsClientFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
... ... @@ -145,7 +148,7 @@ int SrsClient::do_cycle()
return ret;
}
srs_info("start to publish stream %s success", req->stream.c_str());
ret = streaming_publish(source, true);
ret = publish(source, true);
source->on_unpublish();
return ret;
}
... ... @@ -157,7 +160,7 @@ int SrsClient::do_cycle()
return ret;
}
srs_info("flash start to publish stream %s success", req->stream.c_str());
ret = streaming_publish(source, false);
ret = publish(source, false);
source->on_unpublish();
return ret;
}
... ... @@ -171,7 +174,7 @@ int SrsClient::do_cycle()
return ret;
}
int SrsClient::streaming_play(SrsSource* source)
int SrsClient::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
... ... @@ -187,11 +190,10 @@ int SrsClient::streaming_play(SrsSource* source)
rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
int64_t report_time = 0;
int64_t reported_time = 0;
SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER);
while (true) {
report_time += SRS_PULSE_TIMEOUT_MS;
pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
// switch to other st-threads.
st_usleep(0);
... ... @@ -223,8 +225,9 @@ int SrsClient::streaming_play(SrsSource* source)
}
// reportable
if (server->can_report(reported_time, report_time)) {
srs_trace("play report, time=%"PRId64", ctl_msg_ret=%d, msgs=%d", report_time, ctl_msg_ret, count);
if (pithy_print.can_print()) {
srs_trace("-> clock=%u, time=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
(int)srs_get_system_time_ms(), pithy_print.get_age(), ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
}
if (count <= 0) {
... ... @@ -251,10 +254,12 @@ int SrsClient::streaming_play(SrsSource* source)
return ret;
}
int SrsClient::streaming_publish(SrsSource* source, bool is_fmle)
int SrsClient::publish(SrsSource* source, bool is_fmle)
{
int ret = ERROR_SUCCESS;
SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
while (true) {
// switch to other st-threads.
st_usleep(0);
... ... @@ -267,6 +272,14 @@ int SrsClient::streaming_publish(SrsSource* source, bool is_fmle)
SrsAutoFree(SrsCommonMessage, msg, false);
pithy_print.set_age(msg->header.timestamp);
// reportable
if (pithy_print.can_print()) {
srs_trace("<- clock=%u, time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
(int)srs_get_system_time_ms(), pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
}
// process audio packet
if (msg->header.is_audio() && ((ret = source->on_audio(msg)) != ERROR_SUCCESS)) {
srs_error("process audio message failed. ret=%d", ret);
... ... @@ -357,7 +370,7 @@ int SrsClient::get_peer_ip()
ip = new char[strlen(buf) + 1];
strcpy(ip, buf);
srs_trace("get peer ip success. ip=%s, fd=%d", ip, fd);
srs_verbose("get peer ip success. ip=%s, fd=%d", ip, fd);
return ret;
}
... ...
... ... @@ -53,8 +53,8 @@ public:
protected:
virtual int do_cycle();
private:
virtual int streaming_play(SrsSource* source);
virtual int streaming_publish(SrsSource* source, bool is_fmle);
virtual int playing(SrsSource* source);
virtual int publish(SrsSource* source, bool is_fmle);
virtual int get_peer_ip();
};
... ...
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_core_pithy_print.hpp>
#include <map>
#include <srs_core_log.hpp>
#define SRS_STAGE_DEFAULT_INTERVAL_MS 1200
#define SRS_STAGE_PLAY_USER_INTERVAL_MS 1300
#define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100
struct SrsStageInfo
{
int stage_id;
int pithy_print_time_ms;
int nb_clients;
SrsStageInfo(int _stage_id)
{
stage_id = _stage_id;
switch (_stage_id) {
case SRS_STAGE_PLAY_USER:
pithy_print_time_ms = SRS_STAGE_PLAY_USER_INTERVAL_MS;
case SRS_STAGE_PUBLISH_USER:
pithy_print_time_ms = SRS_STAGE_PUBLISH_USER_INTERVAL_MS;
break;
default:
pithy_print_time_ms = SRS_STAGE_DEFAULT_INTERVAL_MS;
break;
}
nb_clients = 0;
}
};
static std::map<int, SrsStageInfo*> _srs_stages;
SrsPithyPrint::SrsPithyPrint(int _stage_id)
{
stage_id = _stage_id;
client_id = enter_stage();
printed_age = age = 0;
}
SrsPithyPrint::~SrsPithyPrint()
{
leave_stage();
}
int SrsPithyPrint::enter_stage()
{
SrsStageInfo* stage = NULL;
std::map<int, SrsStageInfo*>::iterator it = _srs_stages.find(stage_id);
if (it == _srs_stages.end()) {
stage = _srs_stages[stage_id] = new SrsStageInfo(stage_id);
} else {
stage = it->second;
}
srs_assert(stage != NULL);
client_id = stage->nb_clients++;
srs_verbose("enter stage, stage_id=%d, client_id=%d, nb_clients=%d, time_ms=%d",
stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
return client_id;
}
void SrsPithyPrint::leave_stage()
{
SrsStageInfo* stage = _srs_stages[stage_id];
srs_assert(stage != NULL);
stage->nb_clients--;
srs_verbose("leave stage, stage_id=%d, client_id=%d, nb_clients=%d, time_ms=%d",
stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
}
void SrsPithyPrint::elapse(int64_t time_ms)
{
age += time_ms;
}
bool SrsPithyPrint::can_print()
{
SrsStageInfo* stage = _srs_stages[stage_id];
srs_assert(stage != NULL);
int64_t alive_age = age - printed_age;
int64_t can_print_age = stage->nb_clients * stage->pithy_print_time_ms;
bool can_print = alive_age >= can_print_age;
if (can_print) {
printed_age = age;
}
return can_print;
}
int64_t SrsPithyPrint::get_age()
{
return age;
}
void SrsPithyPrint::set_age(int64_t _age)
{
age = _age;
}
... ...
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_CORE_PITHY_PRINT_HPP
#define SRS_CORE_PITHY_PRINT_HPP
/*
#include <srs_core_pithy_print.hpp>
*/
#include <srs_core.hpp>
// the pithy stage for all play clients.
#define SRS_STAGE_PLAY_USER 1
// the pithy stage for all publish clients.
#define SRS_STAGE_PUBLISH_USER 2
/**
* the stage is used for a collection of object to do print,
* the print time in a stage is constant and not changed.
* for example, stage #1 for all play clients, print time is 3s,
* if there is 10clients, then all clients should print in 10*3s.
*/
class SrsPithyPrint
{
private:
int client_id;
int stage_id;
int64_t age;
int64_t printed_age;
public:
/**
* @param _stage_id defined in SRS_STAGE_xxx, eg. SRS_STAGE_PLAY_USER.
*/
SrsPithyPrint(int _stage_id);
virtual ~SrsPithyPrint();
private:
/**
* enter the specified stage, return the client id.
*/
virtual int enter_stage();
/**
* leave the specified stage, release the client id.
*/
virtual void leave_stage();
public:
/**
* specified client elapse some time.
*/
virtual void elapse(int64_t time_ms);
/**
* whether current client can print.
*/
virtual bool can_print();
/**
* get the elapsed time in ms.
*/
virtual int64_t get_age();
virtual void set_age(int64_t _age);
};
#endif
\ No newline at end of file
... ...
... ... @@ -296,6 +296,26 @@ void SrsProtocol::set_send_timeout(int64_t timeout_us)
return skt->set_send_timeout(timeout_us);
}
int64_t SrsProtocol::get_recv_bytes()
{
return skt->get_recv_bytes();
}
int64_t SrsProtocol::get_send_bytes()
{
return skt->get_send_bytes();
}
int SrsProtocol::get_recv_kbps()
{
return skt->get_recv_kbps();
}
int SrsProtocol::get_send_kbps()
{
return skt->get_send_kbps();
}
int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
*pmsg = NULL;
... ...
... ... @@ -110,6 +110,10 @@ public:
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
/**
* recv a message with raw/undecoded payload from peer.
* the payload is not decoded, use srs_rtmp_expect_message<T> if requires
... ...
... ... @@ -164,6 +164,26 @@ void SrsRtmp::set_send_timeout(int64_t timeout_us)
return protocol->set_send_timeout(timeout_us);
}
int64_t SrsRtmp::get_recv_bytes()
{
return protocol->get_recv_bytes();
}
int64_t SrsRtmp::get_send_bytes()
{
return protocol->get_send_bytes();
}
int SrsRtmp::get_recv_kbps()
{
return protocol->get_recv_kbps();
}
int SrsRtmp::get_send_kbps()
{
return protocol->get_send_kbps();
}
int SrsRtmp::recv_message(SrsCommonMessage** pmsg)
{
return protocol->recv_message(pmsg);
... ...
... ... @@ -108,6 +108,10 @@ public:
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
public:
... ...
... ... @@ -36,14 +36,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_client.hpp>
#define SERVER_LISTEN_BACKLOG 10
// global value, ensure the report interval,
// it will be changed when clients increase.
#define SRS_CONST_REPORT_INTERVAL_MS 3000
#define SRS_TIME_RESOLUTION_MS 1000
SrsServer::SrsServer()
{
srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS;
}
SrsServer::~SrsServer()
... ... @@ -140,8 +136,13 @@ int SrsServer::listen(int port)
int SrsServer::cycle()
{
int ret = ERROR_SUCCESS;
// TODO: canbe a api thread.
st_thread_exit(NULL);
// the deamon thread, update the time cache
while (true) {
st_usleep(SRS_TIME_RESOLUTION_MS * 1000);
srs_update_system_time_ms();
}
return ret;
}
... ... @@ -160,20 +161,6 @@ void SrsServer::remove(SrsConnection* conn)
srs_freep(conn);
}
bool SrsServer::can_report(int64_t& reported, int64_t time)
{
if (srs_report_interval_ms <= 0) {
return false;
}
if (time - reported < srs_report_interval_ms) {
return false;
}
reported = time;
return true;
}
int SrsServer::accept_client(st_netfd_t client_stfd)
{
int ret = ERROR_SUCCESS;
... ... @@ -183,9 +170,6 @@ int SrsServer::accept_client(st_netfd_t client_stfd)
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
srs_verbose("add conn to vector. conns=%d", (int)conns.size());
// ensure the report interval is consts
srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS * (int)conns.size();
// cycle will start process thread and when finished remove the client.
if ((ret = conn->start()) != ERROR_SUCCESS) {
... ...
... ... @@ -41,7 +41,6 @@ private:
int fd;
st_netfd_t stfd;
std::vector<SrsConnection*> conns;
int srs_report_interval_ms;
public:
SrsServer();
virtual ~SrsServer();
... ... @@ -50,7 +49,6 @@ public:
virtual int listen(int port);
virtual int cycle();
virtual void remove(SrsConnection* conn);
virtual bool can_report(int64_t& reported, int64_t time);
private:
virtual int accept_client(st_netfd_t client_stfd);
virtual void listen_cycle();
... ...
... ... @@ -30,6 +30,7 @@ SrsSocket::SrsSocket(st_netfd_t client_stfd)
stfd = client_stfd;
send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
recv_bytes = send_bytes = 0;
start_time_ms = srs_get_system_time_ms();
}
SrsSocket::~SrsSocket()
... ... @@ -61,6 +62,28 @@ int64_t SrsSocket::get_send_bytes()
return send_bytes;
}
int SrsSocket::get_recv_kbps()
{
int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;
if (diff_ms <= 0) {
return 0;
}
return recv_bytes * 8 / diff_ms;
}
int SrsSocket::get_send_kbps()
{
int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;
if (diff_ms <= 0) {
return 0;
}
return send_bytes * 8 / diff_ms;
}
int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -43,6 +43,7 @@ private:
int64_t send_timeout;
int64_t recv_bytes;
int64_t send_bytes;
int64_t start_time_ms;
st_netfd_t stfd;
public:
SrsSocket(st_netfd_t client_stfd);
... ... @@ -53,6 +54,8 @@ public:
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
public:
virtual int read(const void* buf, size_t size, ssize_t* nread);
virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
... ...
... ... @@ -32,6 +32,8 @@ file
..\core\srs_core_socket.cpp,
..\core\srs_core_buffer.hpp,
..\core\srs_core_buffer.cpp,
..\core\srs_core_pithy_print.hpp,
..\core\srs_core_pithy_print.cpp,
..\core\srs_core_log.hpp,
..\core\srs_core_log.cpp;
mainconfig
... ...