Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2015-10-14 16:06:45 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
a9fdb630d958e05a515a3a6d20d9964f05e57681
a9fdb630
1 parent
a08d8f83
refine code, replace all rtmp connect by simple rtmp client.
显示空白字符变更
内嵌
并排对比
正在显示
3 个修改的文件
包含
33 行增加
和
202 行删除
trunk/src/app/srs_app_rtsp.cpp
trunk/src/app/srs_app_rtsp.hpp
trunk/src/main/srs_main_ingest_hls.cpp
trunk/src/app/srs_app_rtsp.cpp
查看文件 @
a9fdb63
...
...
@@ -42,6 +42,7 @@ using namespace std;
#include <srs_raw_avc.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtmp_conn.hpp>
#ifdef SRS_AUTO_STREAM_CASTER
...
...
@@ -167,7 +168,7 @@ int SrsRtspJitter::correct(int64_t& ts)
previous_timestamp
=
ts
;
}
delta
=
srs_max
(
0
,
ts
-
previous_timestamp
);
delta
=
srs_max
(
0
,
(
int
)(
ts
-
previous_timestamp
)
);
if
(
delta
>
90000
)
{
delta
=
0
;
}
...
...
@@ -195,9 +196,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
trd
=
new
SrsOneCycleThread
(
"rtsp"
,
this
);
req
=
NULL
;
client
=
NULL
;
transport
=
new
SrsTcpClient
();
stream_id
=
0
;
sdk
=
new
SrsSimpleRtmpClient
();
vjitter
=
new
SrsRtspJitter
();
ajitter
=
new
SrsRtspJitter
();
...
...
@@ -218,8 +217,7 @@ SrsRtspConn::~SrsRtspConn()
srs_freep
(
skt
);
srs_freep
(
rtsp
);
srs_freep
(
client
);
srs_freep
(
transport
);
srs_freep
(
sdk
);
srs_freep
(
req
);
srs_freep
(
vjitter
);
...
...
@@ -626,14 +624,14 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i
SrsSharedPtrMessage
*
msg
=
NULL
;
if
((
ret
=
s
rs_rtmp_create_msg
(
type
,
timestamp
,
data
,
size
,
stream_id
,
&
msg
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
s
dk
->
rtmp_create_msg
(
type
,
timestamp
,
data
,
size
,
&
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: create shared ptr msg failed. ret=%d"
,
ret
);
return
ret
;
}
srs_assert
(
msg
);
// send out encoded msg.
if
((
ret
=
client
->
send_and_free_message
(
msg
,
stream_id
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
sdk
->
send_and_free_message
(
msg
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
...
...
@@ -647,11 +645,12 @@ int SrsRtspConn::connect()
// when ok, ignore.
// TODO: FIXME: support reconnect.
if
(
transport
->
connected
())
{
if
(
sdk
->
connected
())
{
return
ret
;
}
// parse uri
// generate rtmp url to connect to.
std
::
string
url
;
if
(
!
req
)
{
std
::
string
schema
,
host
,
vhost
,
app
,
param
;
int
port
;
...
...
@@ -661,99 +660,25 @@ int SrsRtspConn::connect()
std
::
string
output
=
output_template
;
output
=
srs_string_replace
(
output
,
"[app]"
,
app
);
output
=
srs_string_replace
(
output
,
"[stream]"
,
rtsp_stream
);
req
=
new
SrsRequest
();
srs_parse_rtmp_url
(
output
,
req
->
tcUrl
,
req
->
stream
);
srs_discovery_tc_url
(
req
->
tcUrl
,
req
->
schema
,
req
->
host
,
req
->
vhost
,
req
->
app
,
req
->
port
,
req
->
param
);
}
// connect host.
if
((
ret
=
transport
->
connect
(
req
->
host
,
req
->
port
,
ST_UTIME_NO_TIMEOUT
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: connect server %s:%d failed. ret=%d"
,
req
->
host
.
c_str
(),
req
->
port
,
ret
);
return
ret
;
}
srs_freep
(
client
);
client
=
new
SrsRtmpClient
(
transport
);
client
->
set_recv_timeout
(
SRS_CONSTS_RTMP_TIMEOUT_US
);
client
->
set_send_timeout
(
SRS_CONSTS_RTMP_TIMEOUT_US
);
// connect to vhost/app
if
((
ret
=
client
->
handshake
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: handshake with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
connect_app
(
req
->
host
,
req
->
port
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: connect with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
client
->
create_stream
(
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: connect with server failed, stream_id=%d. ret=%d"
,
stream_id
,
ret
);
int64_t
cto
=
SRS_CONSTS_RTMP_TIMEOUT_US
;
int64_t
sto
=
SRS_CONSTS_RTMP_PULSE_TIMEOUT_US
;
if
((
ret
=
sdk
->
connect
(
url
,
cto
,
sto
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: connect %s failed, cto=%"
PRId64
", sto=%"
PRId64
". ret=%d"
,
url
.
c_str
(),
cto
,
sto
,
ret
);
return
ret
;
}
// publish.
if
((
ret
=
client
->
publish
(
req
->
stream
,
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: publish failed, stream=%s, stream_id=%d. ret=%d"
,
req
->
stream
.
c_str
(),
stream_id
,
ret
);
if
((
ret
=
sdk
->
publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: publish %s failed. ret=%d"
,
url
.
c_str
(),
ret
);
return
ret
;
}
return
write_sequence_header
();
}
// TODO: FIXME: refine the connect_app.
int
SrsRtspConn
::
connect_app
(
string
ep_server
,
int
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
// args of request takes the srs info.
if
(
req
->
args
==
NULL
)
{
req
->
args
=
SrsAmf0Any
::
object
();
}
// notify server the edge identity,
// @see https://github.com/simple-rtmp-server/srs/issues/147
SrsAmf0Object
*
data
=
req
->
args
;
data
->
set
(
"srs_sig"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
));
data
->
set
(
"srs_server"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
" "
RTMP_SIG_SRS_VERSION
" ("
RTMP_SIG_SRS_URL_SHORT
")"
));
data
->
set
(
"srs_license"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_LICENSE
));
data
->
set
(
"srs_role"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_ROLE
));
data
->
set
(
"srs_url"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_URL
));
data
->
set
(
"srs_version"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_VERSION
));
data
->
set
(
"srs_site"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_WEB
));
data
->
set
(
"srs_email"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_EMAIL
));
data
->
set
(
"srs_copyright"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_COPYRIGHT
));
data
->
set
(
"srs_primary"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_PRIMARY
));
data
->
set
(
"srs_authors"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_AUTHROS
));
// for edge to directly get the id of client.
data
->
set
(
"srs_pid"
,
SrsAmf0Any
::
number
(
getpid
()));
data
->
set
(
"srs_id"
,
SrsAmf0Any
::
number
(
_srs_context
->
get_id
()));
// local ip of edge
std
::
vector
<
std
::
string
>
ips
=
srs_get_local_ipv4_ips
();
assert
(
_srs_config
->
get_stats_network
()
<
(
int
)
ips
.
size
());
std
::
string
local_ip
=
ips
[
_srs_config
->
get_stats_network
()];
data
->
set
(
"srs_server_ip"
,
SrsAmf0Any
::
str
(
local_ip
.
c_str
()));
// generate the tcUrl
std
::
string
param
=
""
;
std
::
string
tc_url
=
srs_generate_tc_url
(
ep_server
,
req
->
vhost
,
req
->
app
,
ep_port
,
param
);
// upnode server identity will show in the connect_app of client.
// @see https://github.com/simple-rtmp-server/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool
debug_srs_upnode
=
_srs_config
->
get_debug_srs_upnode
(
req
->
vhost
);
if
((
ret
=
client
->
connect_app
(
req
->
app
,
tc_url
,
req
,
debug_srs_upnode
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"rtsp: connect with server failed, tcUrl=%s, dsu=%d. ret=%d"
,
tc_url
.
c_str
(),
debug_srs_upnode
,
ret
);
return
ret
;
}
return
ret
;
}
SrsRtspCaster
::
SrsRtspCaster
(
SrsConfDirective
*
c
)
{
// TODO: FIXME: support reload.
...
...
trunk/src/app/srs_app_rtsp.hpp
查看文件 @
a9fdb63
...
...
@@ -56,6 +56,7 @@ class SrsSharedPtrMessage;
class
SrsCodecSample
;
class
SrsSimpleStream
;
class
SrsPithyPrint
;
class
SrsSimpleRtmpClient
;
/**
* a rtp connection which transport a stream.
...
...
@@ -139,11 +140,9 @@ private:
SrsOneCycleThread
*
trd
;
private
:
SrsRequest
*
req
;
SrsTcpClient
*
transport
;
SrsRtmpClient
*
client
;
SrsSimpleRtmpClient
*
sdk
;
SrsRtspJitter
*
vjitter
;
SrsRtspJitter
*
ajitter
;
int
stream_id
;
private
:
SrsRawH264Stream
*
avc
;
std
::
string
h264_sps
;
...
...
@@ -181,7 +180,6 @@ private:
// connect to rtmp output url.
// @remark ignore when not connected, reconnect when disconnected.
virtual
int
connect
();
virtual
int
connect_app
(
std
::
string
ep_server
,
int
ep_port
);
};
/**
...
...
trunk/src/main/srs_main_ingest_hls.cpp
查看文件 @
a9fdb63
...
...
@@ -47,6 +47,7 @@ using namespace std;
#include <srs_protocol_amf0.hpp>
#include <srs_raw_avc.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_rtmp_conn.hpp>
// pre-declare
int
proxy_hls2rtmp
(
std
::
string
hls
,
std
::
string
rtmp
);
...
...
@@ -637,9 +638,7 @@ private:
int64_t
raw_aac_dts
;
private
:
SrsRequest
*
req
;
SrsTcpClient
*
transport
;
SrsRtmpClient
*
client
;
int
stream_id
;
SrsSimpleRtmpClient
*
sdk
;
private
:
SrsRawH264Stream
*
avc
;
std
::
string
h264_sps
;
...
...
@@ -657,9 +656,7 @@ public:
raw_aac_dts
=
srs_update_system_time_ms
();
req
=
NULL
;
client
=
NULL
;
transport
=
new
SrsTcpClient
();
stream_id
=
0
;
sdk
=
new
SrsSimpleRtmpClient
();
avc
=
new
SrsRawH264Stream
();
aac
=
new
SrsRawAacStream
();
...
...
@@ -670,7 +667,7 @@ public:
virtual
~
SrsIngestSrsOutput
()
{
close
();
srs_freep
(
transport
);
srs_freep
(
sdk
);
srs_freep
(
avc
);
srs_freep
(
aac
);
...
...
@@ -707,7 +704,6 @@ public:
*/
virtual
int
flush_message_queue
();
private
:
virtual
int
connect_app
(
std
::
string
ep_server
,
int
ep_port
);
// close the connected io and rtmp to ready to be re-connect.
virtual
void
close
();
};
...
...
@@ -1187,7 +1183,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
SrsSharedPtrMessage
*
msg
=
NULL
;
if
((
ret
=
s
rs_rtmp_create_msg
(
type
,
timestamp
,
data
,
size
,
stream_id
,
&
msg
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
s
dk
->
rtmp_create_msg
(
type
,
timestamp
,
data
,
size
,
&
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: create shared ptr msg failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1196,7 +1192,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char*
srs_info
(
"RTMP type=%d, dts=%d, size=%d"
,
type
,
timestamp
,
size
);
// send out encoded msg.
if
((
ret
=
client
->
send_and_free_message
(
msg
,
stream_id
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
sdk
->
send_and_free_message
(
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send RTMP type=%d, dts=%d, size=%d failed. ret=%d"
,
type
,
timestamp
,
size
,
ret
);
return
ret
;
}
...
...
@@ -1210,110 +1206,24 @@ int SrsIngestSrsOutput::connect()
// when ok, ignore.
// TODO: FIXME: should reconnect when disconnected.
if
(
transport
->
connected
())
{
if
(
sdk
->
connected
())
{
return
ret
;
}
srs_trace
(
"connect output=%s"
,
out_rtmp
->
get_url
().
c_str
());
// parse uri
if
(
!
req
)
{
req
=
new
SrsRequest
();
string
uri
=
req
->
tcUrl
=
out_rtmp
->
get_url
();
// tcUrl, stream
if
(
srs_string_contains
(
uri
,
"/"
))
{
req
->
stream
=
srs_path_basename
(
uri
);
req
->
tcUrl
=
uri
=
srs_path_dirname
(
uri
);
}
srs_discovery_tc_url
(
req
->
tcUrl
,
req
->
schema
,
req
->
host
,
req
->
vhost
,
req
->
app
,
req
->
port
,
req
->
param
);
}
std
::
string
url
=
out_rtmp
->
get_url
();
srs_trace
(
"connect output=%s"
,
url
.
c_str
());
// connect host.
if
((
ret
=
transport
->
connect
(
req
->
host
,
req
->
port
,
ST_UTIME_NO_TIMEOUT
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: connect server %s:%d failed. ret=%d"
,
req
->
host
.
c_str
(),
req
->
port
,
ret
);
return
ret
;
}
srs_freep
(
client
);
client
=
new
SrsRtmpClient
(
transport
);
client
->
set_recv_timeout
(
SRS_CONSTS_RTMP_TIMEOUT_US
);
client
->
set_send_timeout
(
SRS_CONSTS_RTMP_TIMEOUT_US
);
// connect to vhost/app
if
((
ret
=
client
->
handshake
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: handshake with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
connect_app
(
req
->
host
,
req
->
port
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: connect with server failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
client
->
create_stream
(
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: connect with server failed, stream_id=%d. ret=%d"
,
stream_id
,
ret
);
int64_t
cto
=
SRS_CONSTS_RTMP_TIMEOUT_US
;
int64_t
sto
=
SRS_CONSTS_RTMP_PULSE_TIMEOUT_US
;
if
((
ret
=
sdk
->
connect
(
url
,
cto
,
sto
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: connect %s failed, cto=%"
PRId64
", sto=%"
PRId64
". ret=%d"
,
url
.
c_str
(),
cto
,
sto
,
ret
);
return
ret
;
}
// publish.
if
((
ret
=
client
->
publish
(
req
->
stream
,
stream_id
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: publish failed, stream=%s, stream_id=%d. ret=%d"
,
req
->
stream
.
c_str
(),
stream_id
,
ret
);
return
ret
;
}
return
ret
;
}
// TODO: FIXME: refine the connect_app.
int
SrsIngestSrsOutput
::
connect_app
(
string
ep_server
,
int
ep_port
)
{
int
ret
=
ERROR_SUCCESS
;
// args of request takes the srs info.
if
(
req
->
args
==
NULL
)
{
req
->
args
=
SrsAmf0Any
::
object
();
}
// notify server the edge identity,
// @see https://github.com/simple-rtmp-server/srs/issues/147
SrsAmf0Object
*
data
=
req
->
args
;
data
->
set
(
"srs_sig"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
));
data
->
set
(
"srs_server"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_KEY
" "
RTMP_SIG_SRS_VERSION
" ("
RTMP_SIG_SRS_URL_SHORT
")"
));
data
->
set
(
"srs_license"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_LICENSE
));
data
->
set
(
"srs_role"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_ROLE
));
data
->
set
(
"srs_url"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_URL
));
data
->
set
(
"srs_version"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_VERSION
));
data
->
set
(
"srs_site"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_WEB
));
data
->
set
(
"srs_email"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_EMAIL
));
data
->
set
(
"srs_copyright"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_COPYRIGHT
));
data
->
set
(
"srs_primary"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_PRIMARY
));
data
->
set
(
"srs_authors"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_AUTHROS
));
// for edge to directly get the id of client.
data
->
set
(
"srs_pid"
,
SrsAmf0Any
::
number
(
getpid
()));
data
->
set
(
"srs_id"
,
SrsAmf0Any
::
number
(
_srs_context
->
get_id
()));
// local ip of edge
std
::
vector
<
std
::
string
>
ips
=
srs_get_local_ipv4_ips
();
assert
(
0
<
(
int
)
ips
.
size
());
std
::
string
local_ip
=
ips
[
0
];
data
->
set
(
"srs_server_ip"
,
SrsAmf0Any
::
str
(
local_ip
.
c_str
()));
// generate the tcUrl
std
::
string
param
=
""
;
std
::
string
tc_url
=
srs_generate_tc_url
(
ep_server
,
req
->
vhost
,
req
->
app
,
ep_port
,
param
);
// upnode server identity will show in the connect_app of client.
// @see https://github.com/simple-rtmp-server/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool
debug_srs_upnode
=
true
;
if
((
ret
=
client
->
connect_app
(
req
->
app
,
tc_url
,
req
,
debug_srs_upnode
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d"
,
tc_url
.
c_str
(),
debug_srs_upnode
,
ret
);
if
((
ret
=
sdk
->
publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"mpegts: publish %s failed. ret=%d"
,
url
.
c_str
(),
ret
);
return
ret
;
}
...
...
@@ -1325,10 +1235,8 @@ void SrsIngestSrsOutput::close()
srs_trace
(
"close output=%s"
,
out_rtmp
->
get_url
().
c_str
());
h264_sps_pps_sent
=
false
;
srs_freep
(
client
);
srs_freep
(
req
);
transport
->
close
();
sdk
->
close
();
}
// the context for ingest hls stream.
...
...
请
注册
或
登录
后发表评论