Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2014-11-22 15:53:05 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
622218c4ddf02d52b6f999c61fe210fe3b000bba
622218c4
1 parent
d3c770d2
for bug #217, use isolate thread to improve 17% performance.
显示空白字符变更
内嵌
并排对比
正在显示
7 个修改的文件
包含
279 行增加
和
55 行删除
trunk/src/app/srs_app_rtmp_conn.cpp
trunk/src/app/srs_app_rtmp_conn.hpp
trunk/src/app/srs_app_thread.cpp
trunk/src/rtmp/srs_protocol_rtmp.cpp
trunk/src/rtmp/srs_protocol_rtmp.hpp
trunk/src/rtmp/srs_protocol_stack.cpp
trunk/src/rtmp/srs_protocol_stack.hpp
trunk/src/app/srs_app_rtmp_conn.cpp
查看文件 @
622218c
...
...
@@ -493,10 +493,117 @@ int SrsRtmpConn::check_vhost()
return
ret
;
}
class
IsolateRecvThread
:
public
ISrsThreadHandler
{
private
:
SrsThread
*
trd
;
SrsRtmpServer
*
rtmp
;
std
::
vector
<
SrsMessage
*>
queue
;
public
:
IsolateRecvThread
(
SrsRtmpServer
*
rtmp_sdk
)
{
rtmp
=
rtmp_sdk
;
trd
=
new
SrsThread
(
this
,
0
,
true
);
}
virtual
~
IsolateRecvThread
()
{
// stop recv thread.
stop
();
// destroy the thread.
srs_freep
(
trd
);
// clear all messages.
std
::
vector
<
SrsMessage
*>::
iterator
it
;
for
(
it
=
queue
.
begin
();
it
!=
queue
.
end
();
++
it
)
{
SrsMessage
*
msg
=
*
it
;
srs_freep
(
msg
);
}
queue
.
clear
();
}
public
:
virtual
bool
empty
()
{
return
queue
.
empty
();
}
virtual
SrsMessage
*
pump
()
{
SrsMessage
*
msg
=
*
queue
.
begin
();
queue
.
erase
(
queue
.
begin
());
return
msg
;
}
public
:
virtual
int
start
()
{
return
trd
->
start
();
}
virtual
void
stop
()
{
trd
->
stop
();
}
virtual
int
cycle
()
{
int
ret
=
ERROR_SUCCESS
;
SrsMessage
*
msg
=
NULL
;
if
((
ret
=
rtmp
->
recv_message
(
&
msg
))
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_error
(
"recv client control message failed. ret=%d"
,
ret
);
}
// we use no timeout to recv, should never got any error.
trd
->
stop_loop
();
return
ret
;
}
srs_verbose
(
"play loop recv message. ret=%d"
,
ret
);
return
ret
;
}
};
int
SrsRtmpConn
::
playing
(
SrsSource
*
source
)
{
int
ret
=
ERROR_SUCCESS
;
// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/194
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
//rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
rtmp
->
set_recv_timeout
(
ST_UTIME_NO_TIMEOUT
);
// disable the protocol auto response,
// for the isolate recv thread should never send any messages.
rtmp
->
set_auto_response
(
false
);
// use isolate thread to recv,
// start isolate recv thread.
IsolateRecvThread
trd
(
rtmp
);
if
((
ret
=
trd
.
start
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"start isolate recv thread failed. ret=%d"
,
ret
);
return
ret
;
}
// delivery messages for clients playing stream.
ret
=
do_playing
(
source
,
&
trd
);
// stop isolate recv thread
trd
.
stop
();
// enable the protocol auto response,
// for the isolate recv thread terminated.
rtmp
->
set_auto_response
(
true
);
return
ret
;
}
int
SrsRtmpConn
::
do_playing
(
SrsSource
*
source
,
IsolateRecvThread
*
trd
)
{
int
ret
=
ERROR_SUCCESS
;
if
((
ret
=
refer
->
check
(
req
->
pageUrl
,
_srs_config
->
get_refer_play
(
req
->
vhost
)))
!=
ERROR_SUCCESS
)
{
srs_error
(
"check play_refer failed. ret=%d"
,
ret
);
return
ret
;
...
...
@@ -519,32 +626,14 @@ int SrsRtmpConn::playing(SrsSource* source)
bool
user_specified_duration_to_stop
=
(
req
->
duration
>
0
);
int64_t
starttime
=
-
1
;
// TODO: use isolate thread to recv,
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
// the performance bottleneck not in the timeout recv, but
// in the multiple messages send, so it's ok for timeout recv,
// @see https://github.com/winlinvip/simple-rtmp-server/issues/194
rtmp
->
set_recv_timeout
(
SRS_CONSTS_RTMP_PULSE_TIMEOUT_US
);
while
(
true
)
{
//
TODO: to use isolate thread to recv, can improve about 5
% performance.
//
to use isolate thread to recv, can improve about 33
% performance.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
// read from client.
if
(
true
)
{
SrsMessage
*
msg
=
NULL
;
ret
=
rtmp
->
recv_message
(
&
msg
);
srs_verbose
(
"play loop recv message. ret=%d"
,
ret
);
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
while
(
!
trd
->
empty
())
{
SrsMessage
*
msg
=
trd
->
pump
();
srs_warn
(
"pump client message to process."
);
if
(
ret
==
ERROR_SOCKET_TIMEOUT
)
{
// it's ok, do nothing.
ret
=
ERROR_SUCCESS
;
srs_verbose
(
"recv timeout, ignore. ret=%d"
,
ret
);
}
else
if
(
ret
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_error
(
"recv client control message failed. ret=%d"
,
ret
);
}
return
ret
;
}
else
{
if
((
ret
=
process_play_control_msg
(
consumer
,
msg
))
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_system_control_error
(
ret
))
{
srs_error
(
"process play control message failed. ret=%d"
,
ret
);
...
...
@@ -552,7 +641,6 @@ int SrsRtmpConn::playing(SrsSource* source)
return
ret
;
}
}
}
// collect elapse for pithy print.
pithy_print
.
elapse
();
...
...
@@ -565,6 +653,12 @@ int SrsRtmpConn::playing(SrsSource* source)
return
ret
;
}
// no message to send, sleep a while.
if
(
count
<=
0
)
{
srs_verbose
(
"sleep for no messages to send"
);
st_usleep
(
SRS_CONSTS_RTMP_PULSE_TIMEOUT_US
);
}
// reportable
if
(
pithy_print
.
can_print
())
{
kbps
->
sample
();
...
...
@@ -596,7 +690,9 @@ int SrsRtmpConn::playing(SrsSource* source)
if
(
count
>
0
)
{
// no need to assert msg, for the rtmp will assert it.
if
((
ret
=
rtmp
->
send_and_free_messages
(
msgs
.
msgs
,
count
,
res
->
stream_id
))
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_error
(
"send messages to client failed. ret=%d"
,
ret
);
}
return
ret
;
}
}
...
...
trunk/src/app/srs_app_rtmp_conn.hpp
查看文件 @
622218c
...
...
@@ -49,6 +49,7 @@ class SrsBandwidth;
class
SrsKbps
;
class
SrsRtmpClient
;
class
SrsSharedPtrMessage
;
class
IsolateRecvThread
;
/**
* the client provides the main logic control for RTMP clients.
...
...
@@ -88,6 +89,7 @@ private:
virtual
int
stream_service_cycle
();
virtual
int
check_vhost
();
virtual
int
playing
(
SrsSource
*
source
);
virtual
int
do_playing
(
SrsSource
*
source
,
IsolateRecvThread
*
trd
);
virtual
int
fmle_publishing
(
SrsSource
*
source
);
virtual
int
do_fmle_publishing
(
SrsSource
*
source
);
virtual
int
flash_publishing
(
SrsSource
*
source
);
...
...
trunk/src/app/srs_app_thread.cpp
查看文件 @
622218c
...
...
@@ -161,7 +161,9 @@ void SrsThread::thread_cycle()
srs_info
(
"thread on before cycle success"
);
if
((
ret
=
handler
->
cycle
())
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_warn
(
"thread cycle failed, ignored and retry, ret=%d"
,
ret
);
}
goto
failed
;
}
srs_info
(
"thread cycle success"
);
...
...
trunk/src/rtmp/srs_protocol_rtmp.cpp
查看文件 @
622218c
...
...
@@ -735,6 +735,11 @@ SrsRtmpServer::~SrsRtmpServer()
srs_freep
(
hs_bytes
);
}
void
SrsRtmpServer
::
set_auto_response
(
bool
v
)
{
protocol
->
set_auto_response
(
v
);
}
void
SrsRtmpServer
::
set_recv_timeout
(
int64_t
timeout_us
)
{
protocol
->
set_recv_timeout
(
timeout_us
);
...
...
trunk/src/rtmp/srs_protocol_rtmp.hpp
查看文件 @
622218c
...
...
@@ -335,6 +335,12 @@ public:
// protocol methods proxy
public:
/**
* set the auto response message when recv for protocol stack.
* @param v, whether auto response message when recv message.
* @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
*/
virtual
void
set_auto_response
(
bool
v
);
/**
* set/get the recv timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
...
...
trunk/src/rtmp/srs_protocol_stack.cpp
查看文件 @
622218c
...
...
@@ -415,6 +415,7 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
srs_assert
(
nb_out_iovs
>=
2
);
warned_c0c3_cache_dry
=
false
;
auto_response_when_recv
=
true
;
}
SrsProtocol
::~
SrsProtocol
()
...
...
@@ -430,6 +431,15 @@ SrsProtocol::~SrsProtocol()
chunk_streams
.
clear
();
}
if
(
true
)
{
std
::
vector
<
SrsPacket
*>::
iterator
it
;
for
(
it
=
manual_response_queue
.
begin
();
it
!=
manual_response_queue
.
end
();
++
it
)
{
SrsPacket
*
pkt
=
*
it
;
srs_freep
(
pkt
);
}
manual_response_queue
.
clear
();
}
srs_freep
(
in_buffer
);
// alloc by malloc, use free directly.
...
...
@@ -439,6 +449,35 @@ SrsProtocol::~SrsProtocol()
}
}
void
SrsProtocol
::
set_auto_response
(
bool
v
)
{
auto_response_when_recv
=
v
;
}
int
SrsProtocol
::
manual_response_flush
()
{
int
ret
=
ERROR_SUCCESS
;
if
(
manual_response_queue
.
empty
())
{
return
ret
;
}
std
::
vector
<
SrsPacket
*>::
iterator
it
;
for
(
it
=
manual_response_queue
.
begin
();
it
!=
manual_response_queue
.
end
();)
{
SrsPacket
*
pkt
=
*
it
;
// erase this packet, the send api always free it.
it
=
manual_response_queue
.
erase
(
it
);
// use underlayer api to send, donot flush again.
if
((
ret
=
do_send_and_free_packet
(
pkt
,
0
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
}
return
ret
;
}
void
SrsProtocol
::
set_recv_timeout
(
int64_t
timeout_us
)
{
return
skt
->
set_recv_timeout
(
timeout_us
);
...
...
@@ -638,7 +677,9 @@ int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs)
// when c0c3 cache dry,
// sendout all messages and reset the cache, then send again.
if
((
ret
=
skt
->
writev
(
out_iovs
,
iov_index
,
NULL
))
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_error
(
"send with writev failed. ret=%d"
,
ret
);
}
return
ret
;
}
...
...
@@ -663,13 +704,57 @@ int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs)
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
if
((
ret
=
skt
->
writev
(
out_iovs
,
iov_index
,
NULL
))
!=
ERROR_SUCCESS
)
{
if
(
!
srs_is_client_gracefully_close
(
ret
))
{
srs_error
(
"send with writev failed. ret=%d"
,
ret
);
}
return
ret
;
}
return
ret
;
}
int
SrsProtocol
::
do_send_and_free_packet
(
SrsPacket
*
packet
,
int
stream_id
)
{
int
ret
=
ERROR_SUCCESS
;
srs_assert
(
packet
);
SrsAutoFree
(
SrsPacket
,
packet
);
int
size
=
0
;
char
*
payload
=
NULL
;
if
((
ret
=
packet
->
encode
(
size
,
payload
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"encode RTMP packet to bytes oriented RTMP message failed. ret=%d"
,
ret
);
return
ret
;
}
// encode packet to payload and size.
if
(
size
<=
0
||
payload
==
NULL
)
{
srs_warn
(
"packet is empty, ignore empty message."
);
return
ret
;
}
// to message
SrsMessage
*
msg
=
new
SrsCommonMessage
();
msg
->
payload
=
payload
;
msg
->
size
=
size
;
msg
->
header
.
payload_length
=
size
;
msg
->
header
.
message_type
=
packet
->
get_message_type
();
msg
->
header
.
stream_id
=
stream_id
;
msg
->
header
.
perfer_cid
=
packet
->
get_prefer_cid
();
// donot use the auto free to free the msg,
// for performance issue.
ret
=
do_send_messages
(
&
msg
,
1
);
if
(
ret
==
ERROR_SUCCESS
)
{
ret
=
on_send_packet
(
msg
,
packet
);
}
srs_freep
(
msg
);
return
ret
;
}
void
SrsProtocol
::
generate_chunk_header
(
char
*
cache
,
SrsMessageHeader
*
mh
,
bool
c0
,
int
*
pnbh
,
char
**
ph
)
{
// to directly set the field.
...
...
@@ -948,6 +1033,16 @@ int SrsProtocol::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stre
srs_freep
(
msg
);
}
// donot flush when send failed
if
(
ret
!=
ERROR_SUCCESS
)
{
return
ret
;
}
// flush messages in manual queue
if
((
ret
=
manual_response_flush
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
return
ret
;
}
...
...
@@ -955,41 +1050,15 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
{
int
ret
=
ERROR_SUCCESS
;
srs_assert
(
packet
);
SrsAutoFree
(
SrsPacket
,
packet
);
int
size
=
0
;
char
*
payload
=
NULL
;
if
((
ret
=
packet
->
encode
(
size
,
payload
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"encode RTMP packet to bytes oriented RTMP message failed. ret=%d"
,
ret
);
if
((
ret
=
do_send_and_free_packet
(
packet
,
stream_id
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
// encode packet to payload and size.
if
(
size
<=
0
||
payload
==
NULL
)
{
srs_warn
(
"packet is empty, ignore empty message."
);
// flush messages in manual queue
if
((
ret
=
manual_response_flush
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
// to message
SrsMessage
*
msg
=
new
SrsCommonMessage
();
msg
->
payload
=
payload
;
msg
->
size
=
size
;
msg
->
header
.
payload_length
=
size
;
msg
->
header
.
message_type
=
packet
->
get_message_type
();
msg
->
header
.
stream_id
=
stream_id
;
msg
->
header
.
perfer_cid
=
packet
->
get_prefer_cid
();
// donot use the auto free to free the msg,
// for performance issue.
ret
=
do_send_messages
(
&
msg
,
1
);
if
(
ret
==
ERROR_SUCCESS
)
{
ret
=
on_send_packet
(
msg
,
packet
);
}
srs_freep
(
msg
);
return
ret
;
}
...
...
@@ -1698,7 +1767,15 @@ int SrsProtocol::response_acknowledgement_message()
SrsAcknowledgementPacket
*
pkt
=
new
SrsAcknowledgementPacket
();
in_ack_size
.
acked_size
=
skt
->
get_recv_bytes
();
pkt
->
sequence_number
=
(
int32_t
)
in_ack_size
.
acked_size
;
if
((
ret
=
send_and_free_packet
(
pkt
,
0
))
!=
ERROR_SUCCESS
)
{
// cache the message and use flush to send.
if
(
!
auto_response_when_recv
)
{
manual_response_queue
.
push_back
(
pkt
);
return
ret
;
}
// use underlayer api to send, donot flush again.
if
((
ret
=
do_send_and_free_packet
(
pkt
,
0
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send acknowledgement failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1718,7 +1795,14 @@ int SrsProtocol::response_ping_message(int32_t timestamp)
pkt
->
event_type
=
SrcPCUCPingResponse
;
pkt
->
event_data
=
timestamp
;
if
((
ret
=
send_and_free_packet
(
pkt
,
0
))
!=
ERROR_SUCCESS
)
{
// cache the message and use flush to send.
if
(
!
auto_response_when_recv
)
{
manual_response_queue
.
push_back
(
pkt
);
return
ret
;
}
// use underlayer api to send, donot flush again.
if
((
ret
=
do_send_and_free_packet
(
pkt
,
0
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send ping response failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
trunk/src/rtmp/srs_protocol_stack.hpp
查看文件 @
622218c
...
...
@@ -31,6 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <map>
#include <vector>
#include <string>
// for srs-librtmp, @see https://github.com/winlinvip/simple-rtmp-server/issues/213
...
...
@@ -213,6 +214,16 @@ private:
* input ack size, when to send the acked packet.
*/
AckWindowSize
in_ack_size
;
/**
* whether auto response when recv messages.
* default to true for it's very easy to use the protocol stack.
* @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
*/
bool
auto_response_when_recv
;
/**
* when not auto response message, manual flush the messages in queue.
*/
std
::
vector
<
SrsPacket
*>
manual_response_queue
;
// peer out
private:
/**
...
...
@@ -246,6 +257,20 @@ public:
virtual
~
SrsProtocol
();
public
:
/**
* set the auto response message when recv for protocol stack.
* @param v, whether auto response message when recv message.
* @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
*/
virtual
void
set_auto_response
(
bool
v
);
/**
* flush for manual response when the auto response is disabled
* by set_auto_response(false), we default use auto response, so donot
* need to call this api(the protocol sdk will auto send message).
* @see the auto_response_when_recv and manual_response_queue.
*/
virtual
int
manual_response_flush
();
public
:
/**
* set/get the recv timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
...
...
@@ -371,6 +396,10 @@ private:
*/
virtual
int
do_send_messages
(
SrsMessage
**
msgs
,
int
nb_msgs
);
/**
* underlayer api for send and free packet.
*/
virtual
int
do_send_and_free_packet
(
SrsPacket
*
packet
,
int
stream_id
);
/**
* generate the chunk header for msg.
* @param mh, the header of msg to send.
* @param c0, whether the first chunk, the c0 chunk.
...
...
请
注册
或
登录
后发表评论