winlin

for bug #251, refine the mic algorithm. 2.0.63

... ... @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 61
#define VERSION_REVISION 63
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
... ...
... ... @@ -97,13 +97,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// the default config of mw.
#define SRS_PERF_MW_SLEEP 350
/**
* how many msgs can be send entirely.
* for play clients to get msgs then totally send out.
* for the mw sleep set to 1800, the msgs is about 133.
* @remark, recomment to 128.
*/
#define SRS_PERF_MW_MSGS 128
/**
* use iovs cache in each msg,
* for the shared ptr message, we calc once and used for every copy.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/251
... ... @@ -111,6 +104,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* @remark when reload change the chunk size, previous clients error.
*/
#undef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* how many msgs can be send entirely.
* for play clients to get msgs then totally send out.
* for the mw sleep set to 1800, the msgs is about 133.
* @remark, recomment to 128.
* @remark, when mic enabled, use larger iovs cache, to 512.
*/
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
#define SRS_PERF_MW_MSGS 128
#else
#define SRS_PERF_MW_MSGS 512
#endif
/**
* whether set the socket send buffer size.
... ...
... ... @@ -135,6 +135,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_OpenSslGetPeerPublicKey 2038
#define ERROR_OpenSslComputeSharedKey 2039
#define ERROR_RTMP_MIC_CHUNKSIZE_CHANGED 2040
#define ERROR_RTMP_MIC_CACHE_OVERFLOW 2041
//
// system control message,
// not an error, but special control logic.
... ...
... ... @@ -422,7 +422,7 @@ int SrsSharedPtrMessage::__SrsSharedPtr::mic_evaluate(
if (mh->payload_length % chunk_size) {
nb_chunks++;
}
nb_iovs = 1/*cid*/ + 1/*size*/ + 1 /*type*/+ 1/*chunk*/;
nb_iovs = 1/*cid*/ + 1/*size*//*type*/+ 1/*chunk*/;
// left chunks, always cid+chunk.
if (nb_chunks > 0) {
nb_iovs += (nb_chunks - 1) * 2;
... ... @@ -448,19 +448,16 @@ int SrsSharedPtrMessage::__SrsSharedPtr::mic_evaluate(
iov[0].iov_len = 1;
// size(payload length), 3B
// type(message type), 1B
iov[1].iov_base = mic_c0 + 4;
iov[1].iov_len = 3;
// type(message type)
iov[2].iov_base = mic_c0 + 7;
iov[2].iov_len = 1;
iov[1].iov_len = 4;
// chunk
iov[3].iov_base = p;
iov[3].iov_len = payload_size;
iov[2].iov_base = p;
iov[2].iov_len = payload_size;
// move to next iovs.
iov += 4;
iov += 3;
} else {
// c3
iov[0].iov_base = &mic_c3;
... ... @@ -488,8 +485,6 @@ SrsSharedPtrMessage::SrsSharedPtrMessage()
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
mic_etime_present = false;
iovs = NULL;
nb_iovs = 0;
#endif
}
... ... @@ -502,10 +497,6 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
ptr->shared_count--;
}
}
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE
srs_freep(iovs);
#endif
}
int SrsSharedPtrMessage::create(SrsCommonMessage* msg)
... ... @@ -598,9 +589,26 @@ int SrsSharedPtrMessage::mic_evaluate(int chunk_size)
}
}
return ret;
}
int SrsSharedPtrMessage::mic_iovs_dump(iovec* iovs, int max_nb_iovs)
{
// calc the private iovs
char* pp = NULL;
// calc number of iovs.
int nb_iovs = 1/*time*/ + 1/*sid*/;
// insert etime before all chunks.
if (mic_etime_present) {
nb_iovs += ptr->nb_chunks;
}
// not enough, return nagetive to try another loop.
if (max_nb_iovs < nb_iovs + ptr->nb_iovs) {
return -1;
}
// timestamp for c0/c3
u_int32_t timestamp = (u_int32_t)header.timestamp;
mic_etime_present = timestamp >= RTMP_EXTENDED_TIMESTAMP;
... ... @@ -654,72 +662,57 @@ int SrsSharedPtrMessage::mic_evaluate(int chunk_size)
*p++ = pp[0];
}
// calc number of iovs.
nb_iovs = 1/*time*/ + 1/*sid*/;
// insert etime before all chunks.
if (mic_etime_present) {
nb_iovs += ptr->nb_chunks;
}
// create iovs
srs_freep(iovs);
iovs = new iovec[nb_iovs];
// dumps all ovs
iovec* shared = ptr->iovs;
iovec* iov = iovs;
// dump the c0 chunk
// cid
iov->iov_len = shared->iov_len;
iov->iov_base = shared->iov_base;
iov++; shared++;
// time, 3B
iovs[0].iov_base = mic_c0_time;
iovs[0].iov_len = 3;
iov->iov_base = mic_c0_time;
iov->iov_len = 3;
iov++;
// size, type
iov->iov_len = shared->iov_len;
iov->iov_base = shared->iov_base;
iov++; shared++;
// sid, 4B
iovs[1].iov_base = mic_c0_sid;
iovs[1].iov_len = 4;
// etime, 4B for each chunks.
for (int i = 2; i < nb_iovs; i++) {
iovs[i].iov_base = mic_etime;
iovs[i].iov_len = 4;
}
return ret;
}
int SrsSharedPtrMessage::mic_iovs_count()
{
return nb_iovs + ptr->nb_iovs;
}
int SrsSharedPtrMessage::mic_iovs_dump(iovec* _iovs, int _nb_iovs)
{
int shared_index = 0;
int private_index = 0;
int index = 0;
// dumps all.
srs_assert(nb_iovs + ptr->nb_iovs <= _nb_iovs);
// dump the c0 chunk
_iovs[index++] = ptr->iovs[shared_index++]; // cid
_iovs[index++] = iovs[private_index++]; // time
_iovs[index++] = ptr->iovs[shared_index++]; // size
_iovs[index++] = ptr->iovs[shared_index++]; // type
_iovs[index++] = iovs[private_index++]; // sid
iov->iov_base = mic_c0_sid;
iov->iov_len = 4;
iov++;
// etime, 4B
if (mic_etime_present) {
_iovs[index++] = iovs[private_index++]; // etime
// etime
iov->iov_base = mic_etime;
iov->iov_len = 4;
iov++;
}
_iovs[index++] = ptr->iovs[shared_index++]; // chunk
// chunk
iov->iov_len = shared->iov_len;
iov->iov_base = shared->iov_base;
iov++; shared++;
// dump left c3 chunks
for (int i = 1; i < ptr->nb_chunks; i++) {
_iovs[index++] = ptr->iovs[shared_index++]; // cid
// cid
iov->iov_len = shared->iov_len;
iov->iov_base = shared->iov_base;
iov++; shared++;
if (mic_etime_present) {
_iovs[index++] = iovs[private_index++]; // etime
// etime
iov->iov_base = mic_etime;
iov->iov_len = 4;
iov++;
}
_iovs[index++] = ptr->iovs[shared_index++]; // chunk
// chunk
iov->iov_len = shared->iov_len;
iov->iov_base = shared->iov_base;
iov++; shared++;
}
srs_assert(index == private_index + shared_index);
srs_assert(index == nb_iovs + ptr->nb_iovs);
srs_assert(index <= _nb_iovs);
return nb_iovs + ptr->nb_iovs;
}
#endif
... ... @@ -955,7 +948,6 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
int ret = ERROR_SUCCESS;
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
// TODO: FIXME: use cache system instead.
int iov_index = 0;
iovec* iov = out_iovs + iov_index;
... ... @@ -1042,10 +1034,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
// when c0c3 cache dry,
// sendout all messages and reset the cache, then send again.
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send msgs with writev failed. ret=%d", ret);
}
if ((ret = do_iovs_send(out_iovs, iov_index)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -1059,76 +1048,73 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
}
}
}
// maybe the iovs already sendout when c0c3 cache dry,
// so just ignore when no iovs to send.
if (iov_index <= 0) {
return ret;
}
srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
return do_iovs_send(out_iovs, iov_index);
#else
// send all iovs for all msgs.
int total_iovs = 0;
for (int i = 0; i < nb_msgs; i++) {
int msg_sent = 0;
while (msg_sent < nb_msgs) {
int iov_index = 0;
for (int i = msg_sent; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
// evaluate
if ((ret = msg->mic_evaluate(out_chunk_size)) != ERROR_SUCCESS) {
srs_error("mic evaluate failed, chunk=%d. ret=%d", out_chunk_size, ret);
return ret;
}
total_iovs += msg->mic_iovs_count();
}
srs_verbose("mic nb_iovs=%d, max=%d", total_iovs, nb_out_iovs);
// realloc the iovs if exceed,
// for we donot know how many messges maybe to send entirely,
// we just alloc the iovs, it's ok.
if (total_iovs > nb_out_iovs) {
srs_warn("resize iovs %d => %d, msgs=%d, max_msgs=%d",
nb_out_iovs, total_iovs, nb_msgs, SRS_PERF_MW_MSGS);
nb_out_iovs = total_iovs;
int realloc_size = sizeof(iovec) * nb_out_iovs;
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
}
// dumps iovs
int iov_index = 0;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
iov_index += msg->mic_iovs_dump(
// dump msg to iovec.
int ok_iovs = msg->mic_iovs_dump(
out_iovs + iov_index, nb_out_iovs - iov_index
);
// protocol iovs cache exceed.
if (ok_iovs <= 0) {
break;
}
#endif
// maybe the iovs already sendout when c0c3 cache dry,
// so just ignore when no iovs to send.
// ok, dump next.
msg_sent++;
iov_index += ok_iovs;
}
srs_info("mic nb_iovs=%d, msgs=%d, msg_sent=%d, iovs_sent=%d",
nb_out_iovs, nb_msgs, msg_sent, iov_index);
// cache not enough.
if (iov_index <= 0) {
ret = ERROR_RTMP_MIC_CACHE_OVERFLOW;
srs_warn("mic iovs overflow, nb_iovs=%d, msgs=%d, msg_sent=%d, iovs_sent=%d, ret=%d",
nb_out_iovs, nb_msgs, msg_sent, iov_index, ret);
return ret;
}
#if 0
// calc the bytes of iovs, for debug.
int nb_bytes = 0;
for (int i = 0; i < iov_index; i++) {
iovec* iov = out_iovs + i;
nb_bytes += iov->iov_len;
}
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
srs_info("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#else
srs_info("mic nb_iovs=%d, max=%d, msgs=%d %dB",
total_iovs, nb_out_iovs, nb_msgs, nb_bytes);
#endif
#else
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#else
srs_info("mic nb_iovs=%d, max=%d, msgs=%d",
total_iovs, nb_out_iovs, nb_msgs);
#endif
// send out these iovs.
if ((ret = do_iovs_send(out_iovs, iov_index)) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
#endif
}
int SrsProtocol::do_iovs_send(iovec* iovs, int size)
{
int ret = ERROR_SUCCESS;
// the limits of writev iovs.
static int limits = sysconf(_SC_IOV_MAX);
// send in a time.
if (iov_index < limits) {
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
if (size < limits) {
if ((ret = skt->writev(iovs, size, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
... ... @@ -1139,9 +1125,9 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
// send in multiple times.
int cur_iov = 0;
while (cur_iov < iov_index) {
int cur_count = srs_min(limits, iov_index - cur_iov);
if ((ret = skt->writev(out_iovs + cur_iov, cur_count, NULL)) != ERROR_SUCCESS) {
while (cur_iov < size) {
int cur_count = srs_min(limits, size - cur_iov);
if ((ret = skt->writev(iovs + cur_iov, cur_count, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
... ...
... ... @@ -286,9 +286,6 @@ private:
char mic_etime[4];
// whether etime present.
bool mic_etime_present;
// the calced private iovs for this msg
iovec* iovs;
int nb_iovs;
#endif
public:
SrsSharedPtrMessage();
... ... @@ -329,14 +326,10 @@ public:
*/
virtual int mic_evaluate(int chunk_size);
/**
* count the total iovs needed.
*/
virtual int mic_iovs_count();
/**
* dump all iovs, the _nb_iovs must equals to mic_iovs_count().
* @return the dumped count.
* @return the dumped count. -1 if not enough iovs.
*/
virtual int mic_iovs_dump(iovec* _iovs, int _nb_iovs);
virtual int mic_iovs_dump(iovec* iovs, int max_nb_iovs);
#endif
};
... ... @@ -594,6 +587,10 @@ private:
*/
virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
/**
* send iovs. send multiple times if exceed limits.
*/
virtual int do_iovs_send(iovec* iovs, int size);
/**
* underlayer api for send and free packet.
*/
virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id);
... ...