Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2014-08-19 11:54:33 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
ab965655aa33a5cd0d33983bd0c6b46b0157ff75
ab965655
1 parent
e9e0cd75
refine forwarder, add tracable debug info. 0.9.202
隐藏空白字符变更
内嵌
并排对比
正在显示
9 个修改的文件
包含
146 行增加
和
55 行删除
trunk/src/app/srs_app_edge.cpp
trunk/src/app/srs_app_forward.cpp
trunk/src/app/srs_app_forward.hpp
trunk/src/app/srs_app_rtmp_conn.cpp
trunk/src/app/srs_app_source.cpp
trunk/src/core/srs_core.hpp
trunk/src/libs/srs_librtmp.cpp
trunk/src/rtmp/srs_protocol_rtmp.cpp
trunk/src/rtmp/srs_protocol_rtmp.hpp
trunk/src/app/srs_app_edge.cpp
查看文件 @
ab96565
...
...
@@ -212,6 +212,7 @@ int SrsEdgeIngester::ingest()
return
ret
;
}
// TODO: FIXME: refine the connect_app.
int
SrsEdgeIngester
::
connect_app
(
string
ep_server
,
string
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
...
...
@@ -641,6 +642,7 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port)
return
ret
;
}
// TODO: FIXME: refine the connect_app.
int
SrsEdgeForwarder
::
connect_app
(
string
ep_server
,
string
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
...
...
trunk/src/app/srs_app_forward.cpp
查看文件 @
ab96565
...
...
@@ -28,6 +28,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <netinet/in.h>
#include <arpa/inet.h>
using
namespace
std
;
#include <srs_app_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_st_socket.hpp>
...
...
@@ -43,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>
#include <srs_app_utility.hpp>
#include <srs_protocol_amf0.hpp>
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
...
...
@@ -51,6 +54,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
{
source
=
_source
;
_req
=
NULL
;
io
=
NULL
;
client
=
NULL
;
stfd
=
NULL
;
...
...
@@ -72,37 +76,34 @@ SrsForwarder::~SrsForwarder()
srs_freep
(
kbps
);
}
int
SrsForwarder
::
initialize
(
SrsRequest
*
req
,
string
ep_forward
)
{
int
ret
=
ERROR_SUCCESS
;
// it's ok to use the request object,
// SrsSource already copy it and never delete it.
_req
=
req
;
// the ep(endpoint) to forward to
_ep_forward
=
ep_forward
;
return
ret
;
}
void
SrsForwarder
::
set_queue_size
(
double
queue_size
)
{
queue
->
set_queue_size
(
queue_size
);
}
int
SrsForwarder
::
on_publish
(
SrsRequest
*
req
,
std
::
string
forward_server
)
int
SrsForwarder
::
on_publish
()
{
int
ret
=
ERROR_SUCCESS
;
// TODO: FIXME: directly use the req object.
// forward app
app
=
req
->
app
;
vhost
=
req
->
vhost
;
SrsRequest
*
req
=
_req
;
stream_name
=
req
->
stream
;
server
=
forward_server
;
std
::
string
s_port
=
SRS_CONSTS_RTMP_DEFAULT_PORT
;
port
=
::
atoi
(
SRS_CONSTS_RTMP_DEFAULT_PORT
);
// TODO: FIXME: parse complex params
size_t
pos
=
forward_server
.
find
(
":"
);
if
(
pos
!=
std
::
string
::
npos
)
{
s_port
=
forward_server
.
substr
(
pos
+
1
);
server
=
forward_server
.
substr
(
0
,
pos
);
}
// discovery vhost
std
::
string
vhost
=
req
->
vhost
;
port
=
::
atoi
(
s_port
.
c_str
());
// generate tcUrl
tc_url
=
srs_generate_tc_url
(
forward_server
,
vhost
,
req
->
app
,
s_port
,
req
->
param
);
// discovery the server port and tcUrl from req and ep_forward.
std
::
string
server
,
port
,
tc_url
;
discovery_ep
(
server
,
port
,
tc_url
);
// dead loop check
std
::
string
source_ep
=
"rtmp://"
;
...
...
@@ -113,15 +114,15 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
source_ep
+=
req
->
vhost
;
std
::
string
dest_ep
=
"rtmp://"
;
if
(
forward_server
==
SRS_CONSTS_LOCALHOST
)
{
if
(
_ep_forward
==
SRS_CONSTS_LOCALHOST
)
{
dest_ep
+=
req
->
host
;
}
else
{
dest_ep
+=
forward_server
;
dest_ep
+=
_ep_forward
;
}
dest_ep
+=
":"
;
dest_ep
+=
s_
port
;
dest_ep
+=
port
;
dest_ep
+=
"?vhost="
;
dest_ep
+=
vhost
;
dest_ep
+=
req
->
vhost
;
if
(
source_ep
==
dest_ep
)
{
ret
=
ERROR_SYSTEM_FORWARD_LOOP
;
...
...
@@ -131,7 +132,7 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
}
srs_trace
(
"start forward %s to %s, tcUrl=%s, stream=%s"
,
source_ep
.
c_str
(),
dest_ep
.
c_str
(),
tc_url
.
c_str
(),
stream_name
.
c_str
());
req
->
stream
.
c_str
());
if
((
ret
=
pthread
->
start
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"start srs thread failed. ret=%d"
,
ret
);
...
...
@@ -205,7 +206,8 @@ int SrsForwarder::cycle()
{
int
ret
=
ERROR_SUCCESS
;
if
((
ret
=
connect_server
())
!=
ERROR_SUCCESS
)
{
std
::
string
ep_server
,
ep_port
;
if
((
ret
=
connect_server
(
ep_server
,
ep_port
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
srs_assert
(
client
);
...
...
@@ -217,12 +219,8 @@ int SrsForwarder::cycle()
srs_error
(
"handshake with server failed. ret=%d"
,
ret
);
return
ret
;
}
// TODO: FIXME: take debug info for srs, @see SrsEdgeForwarder.connect_server.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool
debug_srs_upnode
=
_srs_config
->
get_debug_srs_upnode
(
vhost
);
if
((
ret
=
client
->
connect_app
(
app
,
tc_url
,
NULL
,
debug_srs_upnode
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed, tcUrl=%s. ret=%d"
,
tc_url
.
c_str
(),
ret
);
if
((
ret
=
connect_app
(
ep_server
,
ep_port
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
client
->
create_stream
(
stream_id
))
!=
ERROR_SUCCESS
)
{
...
...
@@ -230,9 +228,9 @@ int SrsForwarder::cycle()
return
ret
;
}
if
((
ret
=
client
->
publish
(
stream_name
,
stream_id
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
client
->
publish
(
_req
->
stream
,
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed, stream_name=%s, stream_id=%d. ret=%d"
,
stream_name
.
c_str
(),
stream_id
,
ret
);
_req
->
stream
.
c_str
(),
stream_id
,
ret
);
return
ret
;
}
...
...
@@ -253,18 +251,45 @@ void SrsForwarder::close_underlayer_socket()
srs_close_stfd
(
stfd
);
}
int
SrsForwarder
::
connect_server
()
void
SrsForwarder
::
discovery_ep
(
string
&
server
,
string
&
port
,
string
&
tc_url
)
{
SrsRequest
*
req
=
_req
;
server
=
_ep_forward
;
port
=
SRS_CONSTS_RTMP_DEFAULT_PORT
;
// TODO: FIXME: parse complex params
size_t
pos
=
_ep_forward
.
find
(
":"
);
if
(
pos
!=
std
::
string
::
npos
)
{
port
=
_ep_forward
.
substr
(
pos
+
1
);
server
=
_ep_forward
.
substr
(
0
,
pos
);
}
// generate tcUrl
tc_url
=
srs_generate_tc_url
(
server
,
req
->
vhost
,
req
->
app
,
port
,
req
->
param
);
}
int
SrsForwarder
::
connect_server
(
string
&
ep_server
,
string
&
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
// reopen
close_underlayer_socket
();
// discovery the server port and tcUrl from req and ep_forward.
std
::
string
server
,
s_port
,
tc_url
;
discovery_ep
(
server
,
s_port
,
tc_url
);
int
port
=
::
atoi
(
s_port
.
c_str
());
// output the connected server and port.
ep_server
=
server
;
ep_port
=
s_port
;
// open socket.
int64_t
timeout
=
SRS_FORWARDER_SLEEP_US
;
if
((
ret
=
srs_socket_connect
(
server
,
port
,
timeout
,
&
stfd
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
srs_socket_connect
(
ep_
server
,
port
,
timeout
,
&
stfd
))
!=
ERROR_SUCCESS
)
{
srs_warn
(
"forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"
PRId64
", ret=%d"
,
stream_name
.
c_str
(),
tc_u
rl
.
c_str
(),
server
.
c_str
(),
port
,
timeout
,
ret
);
_req
->
stream
.
c_str
(),
_req
->
tcU
rl
.
c_str
(),
server
.
c_str
(),
port
,
timeout
,
ret
);
return
ret
;
}
...
...
@@ -278,7 +303,58 @@ int SrsForwarder::connect_server()
kbps
->
set_io
(
io
,
io
);
srs_trace
(
"forward connected, stream=%s, tcUrl=%s to server=%s, port=%d"
,
stream_name
.
c_str
(),
tc_url
.
c_str
(),
server
.
c_str
(),
port
);
_req
->
stream
.
c_str
(),
_req
->
tcUrl
.
c_str
(),
server
.
c_str
(),
port
);
return
ret
;
}
// TODO: FIXME: refine the connect_app.
int
SrsForwarder
::
connect_app
(
string
ep_server
,
string
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
SrsRequest
*
req
=
_req
;
// args of request takes the srs info.
if
(
req
->
args
==
NULL
)
{
req
->
args
=
SrsAmf0Any
::
object
();
}
// notify server the edge identity,
// @see https://github.com/winlinvip/simple-rtmp-server/issues/147
SrsAmf0Object
*
data
=
req
->
args
;
data
->
set
(
"srs_sig"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
));
data
->
set
(
"srs_server"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
" "
RTMP_SIG_SRS_VERSION
" ("
RTMP_SIG_SRS_URL_SHORT
")"
));
data
->
set
(
"srs_license"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_LICENSE
));
data
->
set
(
"srs_role"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_ROLE
));
data
->
set
(
"srs_url"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_URL
));
data
->
set
(
"srs_version"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_VERSION
));
data
->
set
(
"srs_site"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_WEB
));
data
->
set
(
"srs_email"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_EMAIL
));
data
->
set
(
"srs_copyright"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_COPYRIGHT
));
data
->
set
(
"srs_primary_authors"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_PRIMARY_AUTHROS
));
// for edge to directly get the id of client.
data
->
set
(
"srs_pid"
,
SrsAmf0Any
::
number
(
getpid
()));
data
->
set
(
"srs_id"
,
SrsAmf0Any
::
number
(
_srs_context
->
get_id
()));
// local ip of edge
std
::
vector
<
std
::
string
>
ips
=
srs_get_local_ipv4_ips
();
assert
(
_srs_config
->
get_stats_network
()
<
(
int
)
ips
.
size
());
std
::
string
local_ip
=
ips
[
_srs_config
->
get_stats_network
()];
data
->
set
(
"srs_server_ip"
,
SrsAmf0Any
::
str
(
local_ip
.
c_str
()));
// generate the tcUrl
std
::
string
param
=
""
;
std
::
string
tc_url
=
srs_generate_tc_url
(
ep_server
,
req
->
vhost
,
req
->
app
,
ep_port
,
param
);
// upnode server identity will show in the connect_app of client.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool
debug_srs_upnode
=
_srs_config
->
get_debug_srs_upnode
(
req
->
vhost
);
if
((
ret
=
client
->
connect_app
(
req
->
app
,
tc_url
,
req
,
debug_srs_upnode
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed, tcUrl=%s. ret=%d"
,
tc_url
.
c_str
(),
ret
);
return
ret
;
}
return
ret
;
}
...
...
trunk/src/app/srs_app_forward.hpp
查看文件 @
ab96565
...
...
@@ -51,13 +51,10 @@ class SrsKbps;
class
SrsForwarder
:
public
ISrsThreadHandler
{
private
:
std
::
string
app
;
std
::
string
tc_url
;
std
::
string
vhost
;
std
::
string
stream_name
;
// the ep to forward, server[:port].
std
::
string
_ep_forward
;
SrsRequest
*
_req
;
int
stream_id
;
std
::
string
server
;
int
port
;
private
:
st_netfd_t
stfd
;
SrsThread
*
pthread
;
...
...
@@ -72,9 +69,10 @@ public:
SrsForwarder
(
SrsSource
*
_source
);
virtual
~
SrsForwarder
();
public
:
virtual
int
initialize
(
SrsRequest
*
req
,
std
::
string
ep_forward
);
virtual
void
set_queue_size
(
double
queue_size
);
public
:
virtual
int
on_publish
(
SrsRequest
*
req
,
std
::
string
forward_server
);
virtual
int
on_publish
();
virtual
void
on_unpublish
();
virtual
int
on_meta_data
(
SrsSharedPtrMessage
*
metadata
);
virtual
int
on_audio
(
SrsSharedPtrMessage
*
msg
);
...
...
@@ -84,8 +82,9 @@ public:
virtual
int
cycle
();
private
:
virtual
void
close_underlayer_socket
();
// TODO: FIXME: take debug info for srs, @see SrsEdgeForwarder.connect_server.
virtual
int
connect_server
();
virtual
void
discovery_ep
(
std
::
string
&
server
,
std
::
string
&
port
,
std
::
string
&
tc_url
);
virtual
int
connect_server
(
std
::
string
&
ep_server
,
std
::
string
&
ep_port
);
virtual
int
connect_app
(
std
::
string
ep_server
,
std
::
string
ep_port
);
virtual
int
forward
();
};
...
...
trunk/src/app/srs_app_rtmp_conn.cpp
查看文件 @
ab96565
...
...
@@ -1045,7 +1045,9 @@ int SrsRtmpConn::do_token_traverse_auth(SrsStSocket* io, SrsRtmpClient* client)
srs_error
(
"handshake with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
client
->
connect_app
(
req
->
app
,
req
->
tcUrl
,
req
))
!=
ERROR_SUCCESS
)
{
// for token tranverse, always take the debug info(which carries token).
if
((
ret
=
client
->
connect_app
(
req
->
app
,
req
->
tcUrl
,
req
,
true
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"connect with server failed, tcUrl=%s. ret=%d"
,
req
->
tcUrl
.
c_str
(),
ret
);
return
ret
;
}
...
...
trunk/src/app/srs_app_source.cpp
查看文件 @
ab96565
...
...
@@ -1386,6 +1386,7 @@ int SrsSource::on_publish()
return
ret
;
}
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_TRANSCODE
if
((
ret
=
encoder
->
on_publish
(
_req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start encoder failed. ret=%d"
,
ret
);
...
...
@@ -1393,6 +1394,7 @@ int SrsSource::on_publish()
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HLS
if
((
ret
=
hls
->
on_publish
(
_req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start hls failed. ret=%d"
,
ret
);
...
...
@@ -1400,6 +1402,7 @@ int SrsSource::on_publish()
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_DVR
if
((
ret
=
dvr
->
on_publish
(
_req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start dvr failed. ret=%d"
,
ret
);
...
...
@@ -1548,11 +1551,16 @@ int SrsSource::create_forwarders()
SrsForwarder
*
forwarder
=
new
SrsForwarder
(
this
);
forwarders
.
push_back
(
forwarder
);
// initialize the forwarder with request.
if
((
ret
=
forwarder
->
initialize
(
_req
,
forward_server
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
double
queue_size
=
_srs_config
->
get_queue_length
(
_req
->
vhost
);
forwarder
->
set_queue_size
(
queue_size
);
if
((
ret
=
forwarder
->
on_publish
(
_req
,
forward_server
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
forwarder
->
on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s"
,
_req
->
vhost
.
c_str
(),
_req
->
app
.
c_str
(),
_req
->
stream
.
c_str
(),
...
...
trunk/src/core/srs_core.hpp
查看文件 @
ab96565
...
...
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR "0"
#define VERSION_MINOR "9"
#define VERSION_REVISION "20
1
"
#define VERSION_REVISION "20
2
"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
...
...
trunk/src/libs/srs_librtmp.cpp
查看文件 @
ab96565
...
...
@@ -253,7 +253,10 @@ int srs_connect_app(srs_rtmp_t rtmp)
context
->
ip
,
context
->
vhost
,
context
->
app
,
context
->
port
,
context
->
param
);
if
((
ret
=
context
->
rtmp
->
connect_app
(
context
->
app
,
tcUrl
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
context
->
rtmp
->
connect_app
(
context
->
app
,
tcUrl
,
NULL
,
true
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
...
...
trunk/src/rtmp/srs_protocol_rtmp.cpp
查看文件 @
ab96565
...
...
@@ -434,7 +434,8 @@ int SrsRtmpClient::complex_handshake()
return
ret
;
}
int
SrsRtmpClient
::
connect_app
(
string
app
,
string
tc_url
,
SrsRequest
*
req
,
bool
debug_srs_upnode
)
int
SrsRtmpClient
::
connect_app
(
string
app
,
string
tc_url
,
SrsRequest
*
req
,
bool
debug_srs_upnode
)
{
std
::
string
srs_server_ip
;
std
::
string
srs_server
;
...
...
trunk/src/rtmp/srs_protocol_rtmp.hpp
查看文件 @
ab96565
...
...
@@ -248,7 +248,7 @@ public:
* args for edge to origin traverse auth, @see SrsRequest.args
*/
virtual
int
connect_app
(
std
::
string
app
,
std
::
string
tc_url
,
SrsRequest
*
req
=
NULL
,
bool
debug_srs_upnode
=
tru
e
);
SrsRequest
*
req
,
bool
debug_srs_upnod
e
);
/**
* connect to server, get the debug srs info.
*
...
...
请
注册
或
登录
后发表评论