Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2014-04-27 11:11:15 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
79c9c6dcb7ad99614c7b8d472f20e20723a81fc3
79c9c6dc
1 parent
ec960724
implements the proxy for edge publish mode
显示空白字符变更
内嵌
并排对比
正在显示
7 个修改的文件
包含
180 行增加
和
58 行删除
trunk/src/app/srs_app_edge.cpp
trunk/src/app/srs_app_edge.hpp
trunk/src/app/srs_app_rtmp_conn.cpp
trunk/src/app/srs_app_source.cpp
trunk/src/app/srs_app_source.hpp
trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
trunk/src/app/srs_app_edge.cpp
查看文件 @
79c9c6d
...
...
@@ -41,12 +41,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_source.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_socket.hpp>
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
// when edge timeout, retry next.
#define SRS_EDGE_TIMEOUT_US (int64_t)(3*1000*1000LL)
#define SRS_EDGE_
INGESTER_
TIMEOUT_US (int64_t)(3*1000*1000LL)
SrsEdgeIngester
::
SrsEdgeIngester
()
{
...
...
@@ -146,7 +147,7 @@ int SrsEdgeIngester::ingest()
{
int
ret
=
ERROR_SUCCESS
;
client
->
set_recv_timeout
(
SRS_EDGE_TIMEOUT_US
);
client
->
set_recv_timeout
(
SRS_EDGE_
INGESTER_
TIMEOUT_US
);
SrsPithyPrint
pithy_print
(
SRS_STAGE_EDGE
);
...
...
@@ -306,6 +307,21 @@ int SrsEdgeIngester::connect_server()
return
ret
;
}
SrsEdgeProxyContext
::
SrsEdgeProxyContext
()
{
edge_stream_id
=
0
;
edge_io
=
NULL
;
edge_rtmp
=
NULL
;
origin_stream_id
=
0
;
origin_io
=
NULL
;
origin_rtmp
=
NULL
;
}
SrsEdgeProxyContext
::~
SrsEdgeProxyContext
()
{
}
SrsEdgeForwarder
::
SrsEdgeForwarder
()
{
io
=
NULL
;
...
...
@@ -315,14 +331,11 @@ SrsEdgeForwarder::SrsEdgeForwarder()
origin_index
=
0
;
stream_id
=
0
;
stfd
=
NULL
;
pthread
=
new
SrsThread
(
this
,
SRS_EDGE_INGESTER_SLEEP_US
);
}
SrsEdgeForwarder
::~
SrsEdgeForwarder
()
{
stop
();
srs_freep
(
pthread
);
}
int
SrsEdgeForwarder
::
initialize
(
SrsSource
*
source
,
SrsPublishEdge
*
edge
,
SrsRequest
*
req
)
...
...
@@ -338,21 +351,6 @@ int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsReq
int
SrsEdgeForwarder
::
start
()
{
return
pthread
->
start
();
}
void
SrsEdgeForwarder
::
stop
()
{
pthread
->
stop
();
close_underlayer_socket
();
srs_freep
(
client
);
srs_freep
(
io
);
}
int
SrsEdgeForwarder
::
cycle
()
{
int
ret
=
ERROR_SUCCESS
;
if
((
ret
=
connect_server
())
!=
ERROR_SUCCESS
)
{
...
...
@@ -378,37 +376,36 @@ int SrsEdgeForwarder::cycle()
return
ret
;
}
if
((
ret
=
client
->
p
lay
(
req
->
stream
,
stream_id
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
client
->
p
ublish
(
req
->
stream
,
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed, stream=%s, stream_id=%d. ret=%d"
,
req
->
stream
.
c_str
(),
stream_id
,
ret
);
return
ret
;
}
if
((
ret
=
_source
->
on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"edge ingester play stream then publish to edge failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
_edge
->
on_forward_publish
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
}
if
((
ret
=
forward
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
void
SrsEdgeForwarder
::
stop
()
{
close_underlayer_socket
();
return
ret
;
srs_freep
(
client
);
srs_freep
(
io
);
}
int
SrsEdgeForwarder
::
forward
(
)
int
SrsEdgeForwarder
::
proxy
(
SrsEdgeProxyContext
*
context
)
{
int
ret
=
ERROR_SUCCESS
;
client
->
set_recv_timeout
(
SRS_EDGE_TIMEOUT_US
);
context
->
origin_io
=
io
;
context
->
origin_rtmp
=
client
;
context
->
origin_stream_id
=
stream_id
;
client
->
set_recv_timeout
(
SRS_PULSE_TIMEOUT_US
);
SrsPithyPrint
pithy_print
(
SRS_STAGE_EDGE
);
while
(
pthread
->
can_loop
()
)
{
while
(
true
)
{
// switch to other st-threads.
st_usleep
(
0
);
...
...
@@ -420,16 +417,58 @@ int SrsEdgeForwarder::forward()
pithy_print
.
age
(),
client
->
get_send_bytes
(),
client
->
get_recv_bytes
(),
client
->
get_send_kbps
(),
client
->
get_recv_kbps
());
}
// read from client.
if
((
ret
=
proxy_message
(
context
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
}
return
ret
;
}
int
SrsEdgeForwarder
::
proxy_message
(
SrsEdgeProxyContext
*
context
)
{
int
ret
=
ERROR_SUCCESS
;
SrsCommonMessage
*
msg
=
NULL
;
if
((
ret
=
client
->
recv_message
(
&
msg
))
!=
ERROR_SUCCESS
)
{
// proxy origin message to client
msg
=
NULL
;
ret
=
context
->
origin_rtmp
->
recv_message
(
&
msg
);
if
(
ret
!=
ERROR_SUCCESS
&&
ret
!=
ERROR_SOCKET_TIMEOUT
)
{
srs_error
(
"recv origin server message failed. ret=%d"
,
ret
);
return
ret
;
}
srs_verbose
(
"edge loop recv message. ret=%d"
,
ret
);
srs_assert
(
msg
);
SrsAutoFree
(
SrsCommonMessage
,
msg
,
false
);
if
(
msg
)
{
if
(
msg
->
size
<=
0
)
{
srs_freep
(
msg
);
}
else
{
msg
->
header
.
stream_id
=
context
->
edge_stream_id
;
if
((
ret
=
context
->
edge_rtmp
->
send_message
(
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send origin message to client failed. ret=%d"
,
ret
);
return
ret
;
}
}
}
// proxy client message to origin
msg
=
NULL
;
ret
=
context
->
edge_rtmp
->
recv_message
(
&
msg
);
if
(
ret
!=
ERROR_SUCCESS
&&
ret
!=
ERROR_SOCKET_TIMEOUT
)
{
srs_error
(
"recv client message failed. ret=%d"
,
ret
);
return
ret
;
}
if
(
msg
)
{
if
(
msg
->
size
<=
0
)
{
srs_freep
(
msg
);
}
else
{
msg
->
header
.
stream_id
=
context
->
origin_stream_id
;
if
((
ret
=
context
->
origin_rtmp
->
send_message
(
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send client message to origin failed. ret=%d"
,
ret
);
return
ret
;
}
}
}
return
ret
;
...
...
@@ -620,21 +659,33 @@ int SrsPublishEdge::on_client_publish()
// error state.
if
(
user_state
!=
SrsEdgeUserStateInit
)
{
ret
=
ERROR_RTMP_EDGE_PUBLISH_STATE
;
srs_error
(
"invalid state for client to p
lay
stream on edge. "
srs_error
(
"invalid state for client to p
ublish
stream on edge. "
"state=%d, user_state=%d, ret=%d"
,
state
,
user_state
,
ret
);
return
ret
;
}
// start ingest when init state.
if
(
state
==
SrsEdgeStateInit
)
{
state
=
SrsEdgeStatePublish
;
// error when not init state.
if
(
state
!=
SrsEdgeStateInit
)
{
ret
=
ERROR_RTMP_EDGE_PUBLISH_STATE
;
srs_error
(
"invalid state for client to publish stream on edge. "
"state=%d, user_state=%d, ret=%d"
,
state
,
user_state
,
ret
);
return
ret
;
}
return
ret
;
SrsEdgeState
pstate
=
state
;
state
=
SrsEdgeStatePublish
;
srs_trace
(
"edge change from %d to state %d (forward publish)."
,
pstate
,
state
);
return
forwarder
->
start
();
}
int
SrsPublishEdge
::
on_
forward_publish
(
)
int
SrsPublishEdge
::
on_
proxy_publish
(
SrsEdgeProxyContext
*
context
)
{
int
ret
=
ERROR_SUCCESS
;
int
ret
=
forwarder
->
proxy
(
context
);
SrsEdgeState
pstate
=
state
;
state
=
SrsEdgeStateInit
;
srs_trace
(
"edge change from %d to state %d (init)."
,
pstate
,
state
);
return
ret
;
}
...
...
trunk/src/app/srs_app_edge.hpp
查看文件 @
79c9c6d
...
...
@@ -33,6 +33,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_st.hpp>
#include <srs_app_thread.hpp>
class
SrsSocket
;
class
SrsRtmpServer
;
class
SrsSource
;
class
SrsRequest
;
class
SrsPlayEdge
;
...
...
@@ -55,8 +57,6 @@ enum SrsEdgeState
// for publish edge
SrsEdgeStatePublish
=
200
,
// publish stream to edge, forward to origin
SrsEdgeStateForwardConnected
,
};
/**
...
...
@@ -101,10 +101,25 @@ private:
virtual
int
process_publish_message
(
SrsCommonMessage
*
msg
);
};
class
SrsEdgeProxyContext
{
public
:
int
edge_stream_id
;
ISrsProtocolReaderWriter
*
edge_io
;
SrsRtmpServer
*
edge_rtmp
;
public
:
int
origin_stream_id
;
ISrsProtocolReaderWriter
*
origin_io
;
SrsRtmpClient
*
origin_rtmp
;
public
:
SrsEdgeProxyContext
();
virtual
~
SrsEdgeProxyContext
();
};
/**
* edge used to forward stream to origin.
*/
class
SrsEdgeForwarder
:
public
ISrsThreadHandler
class
SrsEdgeForwarder
{
private
:
int
stream_id
;
...
...
@@ -112,7 +127,6 @@ private:
SrsSource
*
_source
;
SrsPublishEdge
*
_edge
;
SrsRequest
*
_req
;
SrsThread
*
pthread
;
st_netfd_t
stfd
;
ISrsProtocolReaderWriter
*
io
;
SrsRtmpClient
*
client
;
...
...
@@ -124,11 +138,10 @@ public:
virtual
int
initialize
(
SrsSource
*
source
,
SrsPublishEdge
*
edge
,
SrsRequest
*
req
);
virtual
int
start
();
virtual
void
stop
();
// interface ISrsThreadHandler
public
:
virtual
int
cycle
(
);
virtual
int
proxy
(
SrsEdgeProxyContext
*
context
);
private
:
virtual
int
forward
(
);
virtual
int
proxy_message
(
SrsEdgeProxyContext
*
context
);
virtual
void
close_underlayer_socket
();
virtual
int
connect_server
();
};
...
...
@@ -182,11 +195,10 @@ public:
* when client publish stream on edge.
*/
virtual
int
on_client_publish
();
public
:
/**
*
when forwarder start to publish stream.
*
proxy publish stream to edge
*/
virtual
int
on_
forward_publish
(
);
virtual
int
on_
proxy_publish
(
SrsEdgeProxyContext
*
context
);
};
#endif
...
...
trunk/src/app/srs_app_rtmp_conn.cpp
查看文件 @
79c9c6d
...
...
@@ -42,6 +42,7 @@ using namespace std;
#include <srs_app_bandwidth.hpp>
#include <srs_app_socket.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_edge.hpp>
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
...
...
@@ -332,6 +333,15 @@ int SrsRtmpConn::stream_service_cycle()
srs_error
(
"start to publish stream failed. ret=%d"
,
ret
);
return
ret
;
}
SrsEdgeProxyContext
context
;
context
.
edge_io
=
skt
;
context
.
edge_stream_id
=
res
->
stream_id
;
context
.
edge_rtmp
=
rtmp
;
if
(
vhost_is_edge
)
{
return
source
->
on_edge_proxy_publish
(
&
context
);
}
if
((
ret
=
on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"http hook on_publish failed. ret=%d"
,
ret
);
return
ret
;
...
...
@@ -345,10 +355,26 @@ int SrsRtmpConn::stream_service_cycle()
case
SrsRtmpConnFlashPublish
:
{
srs_verbose
(
"flash start to publish stream %s."
,
req
->
stream
.
c_str
());
if
(
vhost_is_edge
)
{
if
((
ret
=
source
->
on_edge_start_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"notice edge start publish stream failed. ret=%d"
,
ret
);
return
ret
;
}
}
if
((
ret
=
rtmp
->
start_flash_publish
(
res
->
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"flash start to publish stream failed. ret=%d"
,
ret
);
return
ret
;
}
SrsEdgeProxyContext
context
;
context
.
edge_io
=
skt
;
context
.
edge_stream_id
=
res
->
stream_id
;
context
.
edge_rtmp
=
rtmp
;
if
(
vhost_is_edge
)
{
return
source
->
on_edge_proxy_publish
(
&
context
);
}
if
((
ret
=
on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"http hook on_publish failed. ret=%d"
,
ret
);
return
ret
;
...
...
trunk/src/app/srs_app_source.cpp
查看文件 @
79c9c6d
...
...
@@ -1199,6 +1199,11 @@ int SrsSource::on_edge_start_publish()
return
publish_edge
->
on_client_publish
();
}
int
SrsSource
::
on_edge_proxy_publish
(
SrsEdgeProxyContext
*
context
)
{
return
publish_edge
->
on_proxy_publish
(
context
);
}
int
SrsSource
::
create_forwarders
()
{
int
ret
=
ERROR_SUCCESS
;
...
...
trunk/src/app/srs_app_source.hpp
查看文件 @
79c9c6d
...
...
@@ -45,6 +45,9 @@ class SrsOnMetaDataPacket;
class
SrsSharedPtrMessage
;
class
SrsForwarder
;
class
SrsRequest
;
class
SrsSocket
;
class
SrsRtmpServer
;
class
SrsEdgeProxyContext
;
#ifdef SRS_AUTO_HLS
class
SrsHls
;
#endif
...
...
@@ -318,6 +321,8 @@ public:
virtual
int
on_edge_start_play
();
// for edge, when publish edge stream, check the state
virtual
int
on_edge_start_publish
();
// for edge, proxy the publish
virtual
int
on_edge_proxy_publish
(
SrsEdgeProxyContext
*
context
);
private
:
virtual
int
create_forwarders
();
virtual
void
destroy_forwarders
();
...
...
trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
查看文件 @
79c9c6d
...
...
@@ -652,11 +652,17 @@ int SrsProtocol::on_send_message(ISrsMessage* msg)
}
SrsCommonMessage
*
common_msg
=
dynamic_cast
<
SrsCommonMessage
*>
(
msg
);
if
(
!
msg
)
{
if
(
!
common_
msg
)
{
srs_verbose
(
"ignore the shared ptr message."
);
return
ret
;
}
// for proxy, the common msg is not decoded, ignore.
if
(
!
common_msg
->
has_packet
())
{
srs_verbose
(
"ignore the proxy common message."
);
return
ret
;
}
srs_assert
(
common_msg
!=
NULL
);
switch
(
common_msg
->
header
.
message_type
)
{
...
...
@@ -1459,6 +1465,11 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
return
ret
;
}
bool
SrsCommonMessage
::
has_packet
()
{
return
packet
!=
NULL
;
}
SrsPacket
*
SrsCommonMessage
::
get_packet
()
{
if
(
!
packet
)
{
...
...
@@ -1501,6 +1512,14 @@ int SrsCommonMessage::encode_packet()
{
int
ret
=
ERROR_SUCCESS
;
// sometimes, for example, the edge proxy,
// the payload is not decoded, so directly sent out.
if
(
payload
!=
NULL
)
{
header
.
payload_length
=
size
;
return
ret
;
}
// encode packet to payload and size.
if
(
packet
==
NULL
)
{
srs_warn
(
"packet is empty, send out empty message."
);
return
ret
;
...
...
trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
查看文件 @
79c9c6d
...
...
@@ -352,6 +352,10 @@ public:
// TODO: use protocol to decode it.
virtual
int
decode_packet
(
SrsProtocol
*
protocol
);
/**
* whether msg has decoded packet.
*/
virtual
bool
has_packet
();
/**
* get the decoded packet which decoded by decode_packet().
* @remark, user never free the pkt, the message will auto free it.
*/
...
...
请
注册
或
登录
后发表评论