winlin

for bug #251, support mic(message iovs cache). 2.0.61

... ... @@ -634,6 +634,10 @@ SrsSource::SrsSource(SrsRequest* req)
_srs_config->subscribe(this);
atc = _srs_config->get_atc(_req->vhost);
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
chunk_size = 0;
#endif
}
SrsSource::~SrsSource()
... ... @@ -860,6 +864,26 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
return ret;
}
int SrsSource::on_reload_vhost_chunk_size(string vhost)
{
int ret = ERROR_SUCCESS;
if (_req->vhost != vhost) {
return ret;
}
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
int size = _srs_config->get_chunk_size(_req->vhost);
if (chunk_size != size) {
srs_warn("connected clients will error for mic chunk_size changed %d=>%d",
chunk_size, size);
}
chunk_size = size;
#endif
return ret;
}
int SrsSource::on_reload_vhost_transcode(string vhost)
{
int ret = ERROR_SUCCESS;
... ... @@ -1089,6 +1113,14 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
}
srs_verbose("initialize shared ptr metadata success.");
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
if ((ret = cache_metadata->mic_evaluate(chunk_size)) != ERROR_SUCCESS) {
srs_error("mic metadata iovs failed, chunk_size=%d. ret=%d", chunk_size, ret);
return ret;
}
srs_info("mic metadata iovs ok, chunk_size=%d", chunk_size);
#endif
// copy to all consumer
if (true) {
std::vector<SrsConsumer*>::iterator it;
... ... @@ -1130,6 +1162,14 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
}
srs_verbose("initialize shared ptr audio success.");
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
if ((ret = msg.mic_evaluate(chunk_size)) != ERROR_SUCCESS) {
srs_error("mic audio iovs failed, chunk_size=%d. ret=%d", chunk_size, ret);
return ret;
}
srs_info("mic audio iovs ok, chunk_size=%d", chunk_size);
#endif
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(&msg)) != ERROR_SUCCESS) {
srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
... ... @@ -1240,6 +1280,14 @@ int SrsSource::on_video(SrsCommonMessage* __video)
}
srs_verbose("initialize shared ptr video success.");
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
if ((ret = msg.mic_evaluate(chunk_size)) != ERROR_SUCCESS) {
srs_error("mic video iovs failed, chunk_size=%d. ret=%d", chunk_size, ret);
return ret;
}
srs_info("mic video iovs ok, chunk_size=%d", chunk_size);
#endif
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_video(&msg)) != ERROR_SUCCESS) {
srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
... ... @@ -1492,6 +1540,11 @@ int SrsSource::on_publish()
}
#endif
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
chunk_size = _srs_config->get_chunk_size(_req->vhost);
srs_trace("mic use chunk_size=%d to send msgs", chunk_size);
#endif
return ret;
}
... ...
... ... @@ -352,6 +352,11 @@ private:
std::vector<SrsForwarder*> forwarders;
// for aggregate message
SrsStream* aggregate_stream;
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
// the chunk size for mic,
// update when publish stream.
int chunk_size;
#endif
private:
/**
* the sample rate of audio in metadata.
... ... @@ -396,6 +401,7 @@ public:
virtual int on_reload_vhost_forward(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_chunk_size(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
// for the tools callback
public:
... ...
... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 60
#define VERSION_REVISION 61
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
... ...
... ... @@ -104,6 +104,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#define SRS_PERF_MW_MSGS 128
/**
* use iovs cache in each msg,
* for the shared ptr message, we calc once and used for every copy.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
* @remark if enable this, donot use protocol iovs cache.
* @remark when reload change the chunk size, previous clients error.
*/
#undef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* whether set the socket send buffer size.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
*/
... ...
... ... @@ -134,6 +134,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_OpenSslSha256DigestSize 2037
#define ERROR_OpenSslGetPeerPublicKey 2038
#define ERROR_OpenSslComputeSharedKey 2039
#define ERROR_RTMP_MIC_CHUNKSIZE_CHANGED 2040
//
// system control message,
// not an error, but special control logic.
... ...
... ... @@ -149,6 +149,11 @@ void show_macro_features()
#endif
srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)",
SRS_PERF_MW_SLEEP, possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000);
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
srs_warn("MIC(message iovs cache) enabled, the connected clients will be"
"disconneted when reload changed the chunk_size.");
#endif
}
void check_macro_features()
... ...
... ... @@ -387,16 +387,110 @@ SrsSharedPtrMessage::__SrsSharedPtr::__SrsSharedPtr()
payload = NULL;
size = 0;
shared_count = 0;
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
nb_iovs = 0;
iovs = NULL;
chunk_size = 0;
#endif
}
SrsSharedPtrMessage::__SrsSharedPtr::~__SrsSharedPtr()
{
srs_freep(payload);
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
srs_freep(iovs);
#endif
}
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
int SrsSharedPtrMessage::__SrsSharedPtr::mic_evaluate(
SrsMessageHeader* mh, int chunk_size
) {
int ret = ERROR_SUCCESS;
// use the chunk size, shuold not be changed.
this->chunk_size = chunk_size;
// ignore size
srs_chunk_header(mic_c0, mh, true);
mic_c3 = 0xC0 | (mh->perfer_cid & 0x3F);
// calc number of iovs
nb_chunks = mh->payload_length / chunk_size;
if (mh->payload_length % chunk_size) {
nb_chunks++;
}
nb_iovs = 1/*cid*/ + 1/*size*/ + 1 /*type*/+ 1/*chunk*/;
// left chunks, always cid+chunk.
if (nb_chunks > 0) {
nb_iovs += (nb_chunks - 1) * 2;
}
// create iovs
srs_freep(iovs);
iovs = new iovec[nb_iovs];
// for payload chunks.
char* p = payload;
char* end = p + size;
iovec* iov = iovs + 0;
while (p < end) {
// size of payload.
int payload_size = srs_min(chunk_size, end - p);
// header, c0 or c3
if (p == payload) {
// c0, cid+size+type
// cid, 1B
iov[0].iov_base = mic_c0;
iov[0].iov_len = 1;
// size(payload length), 3B
iov[1].iov_base = mic_c0 + 4;
iov[1].iov_len = 3;
// type(message type)
iov[2].iov_base = mic_c0 + 7;
iov[2].iov_len = 1;
// chunk
iov[3].iov_base = p;
iov[3].iov_len = payload_size;
// move to next iovs.
iov += 4;
} else {
// c3
iov[0].iov_base = &mic_c3;
iov[0].iov_len = 1;
// chunk
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
// move to next iovs.
iov += 2;
}
// to next chunk
p += payload_size;
}
return ret;
}
#endif
SrsSharedPtrMessage::SrsSharedPtrMessage()
{
ptr = NULL;
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
mic_etime_present = false;
iovs = NULL;
nb_iovs = 0;
#endif
}
SrsSharedPtrMessage::~SrsSharedPtrMessage()
... ... @@ -408,6 +502,10 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
ptr->shared_count--;
}
}
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
srs_freep(iovs);
#endif
}
int SrsSharedPtrMessage::create(SrsCommonMessage* msg)
... ... @@ -479,6 +577,153 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
return copy;
}
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
int SrsSharedPtrMessage::mic_evaluate(int chunk_size)
{
int ret = ERROR_SUCCESS;
// when chunk size changed, error to disconnect the client..
if (ptr->chunk_size > 0 && chunk_size != ptr->chunk_size) {
ret = ERROR_RTMP_MIC_CHUNKSIZE_CHANGED;
srs_warn("mic chunk size changed %d=>%d, ret=%d",
ptr->chunk_size, chunk_size, ret);
return ret;
}
// calc the shared ptr iovs at the first time.
if (ptr->chunk_size <= 0) {
if ((ret = ptr->mic_evaluate(&header, chunk_size)) != ERROR_SUCCESS) {
srs_warn("mic evaluate source iovs failed. ret=%d", ret);
return ret;
}
}
// calc the private iovs
char* pp = NULL;
// timestamp for c0/c3
u_int32_t timestamp = (u_int32_t)header.timestamp;
mic_etime_present = timestamp >= RTMP_EXTENDED_TIMESTAMP;
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
char* p = mic_c0_time;
if (!mic_etime_present) {
pp = (char*)&timestamp;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
} else {
*p++ = 0xFF;
*p++ = 0xFF;
*p++ = 0xFF;
}
// stream_id, 4bytes, little-endian
p = mic_c0_sid;
pp = (char*)&header.stream_id;
*p++ = pp[0];
*p++ = pp[1];
*p++ = pp[2];
*p++ = pp[3];
// for c0
// chunk extended timestamp header, 0 or 4 bytes, big-endian
//
// for c3:
// chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// adobe changed for Type3 chunk:
// FMLE always sendout the extended-timestamp,
// must send the extended-timestamp to FMS,
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
// TODO: FIXME: extract to outer.
p = mic_etime;
if (mic_etime_present) {
pp = (char*)&timestamp;
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
// calc number of iovs.
nb_iovs = 1/*time*/ + 1/*sid*/;
// insert etime before all chunks.
if (mic_etime_present) {
nb_iovs += ptr->nb_chunks;
}
// create iovs
srs_freep(iovs);
iovs = new iovec[nb_iovs];
// time, 3B
iovs[0].iov_base = mic_c0_time;
iovs[0].iov_len = 3;
// sid, 4B
iovs[1].iov_base = mic_c0_sid;
iovs[1].iov_len = 4;
// etime, 4B for each chunks.
for (int i = 2; i < nb_iovs; i++) {
iovs[i].iov_base = mic_etime;
iovs[i].iov_len = 4;
}
return ret;
}
int SrsSharedPtrMessage::mic_iovs_count()
{
return nb_iovs + ptr->nb_iovs;
}
int SrsSharedPtrMessage::mic_iovs_dump(iovec* _iovs, int _nb_iovs)
{
int shared_index = 0;
int private_index = 0;
int index = 0;
// dumps all.
srs_assert(nb_iovs + ptr->nb_iovs <= _nb_iovs);
// dump the c0 chunk
_iovs[index++] = ptr->iovs[shared_index++]; // cid
_iovs[index++] = iovs[private_index++]; // time
_iovs[index++] = ptr->iovs[shared_index++]; // size
_iovs[index++] = ptr->iovs[shared_index++]; // type
_iovs[index++] = iovs[private_index++]; // sid
if (mic_etime_present) {
_iovs[index++] = iovs[private_index++]; // etime
}
_iovs[index++] = ptr->iovs[shared_index++]; // chunk
// dump left c3 chunks
for (int i = 1; i < ptr->nb_chunks; i++) {
_iovs[index++] = ptr->iovs[shared_index++]; // cid
if (mic_etime_present) {
_iovs[index++] = iovs[private_index++]; // etime
}
_iovs[index++] = ptr->iovs[shared_index++]; // chunk
}
srs_assert(index == private_index + shared_index);
srs_assert(index == nb_iovs + ptr->nb_iovs);
srs_assert(index <= _nb_iovs);
return nb_iovs + ptr->nb_iovs;
}
#endif
SrsProtocol::AckWindowSize::AckWindowSize()
{
ack_window_size = 0;
... ... @@ -498,8 +743,10 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
// each chunk consumers atleast 2 iovs
srs_assert(nb_out_iovs >= 2);
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
warned_c0c3_cache_dry = false;
auto_response_when_recv = true;
#endif
cs_cache = NULL;
if (SRS_PERF_CHUNK_STREAM_CACHE > 0) {
... ... @@ -707,6 +954,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
int ret = ERROR_SUCCESS;
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
// TODO: FIXME: use cache system instead.
int iov_index = 0;
iovec* iov = out_iovs + iov_index;
... ... @@ -811,6 +1059,40 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
}
}
}
#else
// send all iovs for all msgs.
int total_iovs = 0;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if ((ret = msg->mic_evaluate(out_chunk_size)) != ERROR_SUCCESS) {
srs_error("mic evaluate failed, chunk=%d. ret=%d", out_chunk_size, ret);
return ret;
}
total_iovs += msg->mic_iovs_count();
}
srs_verbose("mic nb_iovs=%d, max=%d", total_iovs, nb_out_iovs);
// realloc the iovs if exceed,
// for we donot know how many messges maybe to send entirely,
// we just alloc the iovs, it's ok.
if (total_iovs > nb_out_iovs) {
srs_warn("resize iovs %d => %d, msgs=%d, max_msgs=%d",
nb_out_iovs, total_iovs, nb_msgs, SRS_PERF_MW_MSGS);
nb_out_iovs = total_iovs;
int realloc_size = sizeof(iovec) * nb_out_iovs;
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
}
// dumps iovs
int iov_index = 0;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
iov_index += msg->mic_iovs_dump(
out_iovs + iov_index, nb_out_iovs - iov_index
);
}
#endif
// maybe the iovs already sendout when c0c3 cache dry,
// so just ignore when no iovs to send.
... ... @@ -824,11 +1106,21 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
iovec* iov = out_iovs + i;
nb_bytes += iov->iov_len;
}
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
srs_info("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#else
srs_info("mic nb_iovs=%d, max=%d, msgs=%d %dB",
total_iovs, nb_out_iovs, nb_msgs, nb_bytes);
#endif
#else
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#else
srs_info("mic nb_iovs=%d, max=%d, msgs=%d",
total_iovs, nb_out_iovs, nb_msgs);
#endif
#endif
// the limits of writev iovs.
... ...
... ... @@ -214,14 +214,82 @@ private:
class __SrsSharedPtr
{
public:
// actual shared payload.
char* payload;
// size of payload.
int size;
// the reference count
int shared_count;
public:
// the iovs cache in shared ptr message.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* the mic(msg iovs cache).
* why share the cache in msg?
* all msgs of a source are same for:
* 1. cid, all use the same cid, copy from src msg.
* 2. size, all msg size never changed.
* 3. type, type never changed.
* 4. chunk size, all connections in a vhost use the same chunk size.
* the different:
* 1. time and etime, almost different.
* 2. stream id, maybe different, but almost the same.
* @remark, when reload change the chunk size, clients will be disconnected.
*/
// the c0 shared section for all msgs
// 1. cid, 1B, same.
// 2. [*]time, 3B, not same.
// 3. size, 3B, same.
// 4. type, 1B, same.
// 5. [*]stream id, 4B, not same, little-endian.
// 6. [*]etime, 4B, not same.
// the stared field must be calced in each msg.
char mic_c0[16];
// the c3 headers.
char mic_c3;
// the calced iovs for all msg,
// we assumpt that the chunk size is not changed for a vhost,
// if do changed, the client will got an error msg and disconnect.
iovec* iovs;
int nb_iovs;
// the msgs source chunk size,
// which is evaluated the iovs first time,
// this cannot be changed.
int chunk_size;
// the number of chunks.
int nb_chunks;
#endif
public:
__SrsSharedPtr();
virtual ~__SrsSharedPtr();
public:
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* for iovs msg cache, calc the iovs.
* @param chunk_size use the specified chunk size to evaluate the iovs.
*/
virtual int mic_evaluate(SrsMessageHeader* mh, int chunk_size);
#endif
};
__SrsSharedPtr* ptr;
private:
// msgs level cache.
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
// the c0 private section for this
// 1. time, 3B, not same, not used.
// 2. stream id, 4B, almost the same, little-endian.
// 3. etime, 4B, optional, always same for all chunk when present.
// the stared field must be calced in each msg.
char mic_c0_time[3];
char mic_c0_sid[4];
char mic_etime[4];
// whether etime present.
bool mic_etime_present;
// the calced private iovs for this msg
iovec* iovs;
int nb_iovs;
#endif
public:
SrsSharedPtrMessage();
virtual ~SrsSharedPtrMessage();
... ... @@ -253,6 +321,23 @@ public:
* @remark, assert object is created.
*/
virtual SrsSharedPtrMessage* copy();
public:
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* for iovs msg cache, calc the iovs.
* @param chunk_size use the specified chunk size to evaluate the iovs.
*/
virtual int mic_evaluate(int chunk_size);
/**
* count the total iovs needed.
*/
virtual int mic_iovs_count();
/**
* dump all iovs, the _nb_iovs must equals to mic_iovs_count().
* @return the dumped count.
*/
virtual int mic_iovs_dump(iovec* _iovs, int _nb_iovs);
#endif
};
/**
... ... @@ -326,6 +411,9 @@ private:
*/
iovec* out_iovs;
int nb_out_iovs;
// if use iovs cache in each msg,
// donot use protocol level c0c3 cache.
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* output header cache.
* used for type0, 11bytes(or 15bytes with extended timestamp) header.
... ... @@ -337,6 +425,7 @@ private:
char out_c0c3_caches[SRS_CONSTS_C0C3_HEADERS_MAX];
// whether warned user to increase the c0c3 header cache.
bool warned_c0c3_cache_dry;
#endif
/**
* output chunk size, default to 128, set by config.
*/
... ...