Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2014-04-27 14:57:28 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
bc7648724d56cd8ffa56edbeac0b8e3d8ef6dd2e
bc764872
1 parent
2295ebb3
use directly send publish edge mode.
隐藏空白字符变更
内嵌
并排对比
正在显示
6 个修改的文件
包含
89 行增加
和
199 行删除
trunk/src/app/srs_app_edge.cpp
trunk/src/app/srs_app_edge.hpp
trunk/src/app/srs_app_rtmp_conn.cpp
trunk/src/app/srs_app_rtmp_conn.hpp
trunk/src/app/srs_app_source.cpp
trunk/src/app/srs_app_source.hpp
trunk/src/app/srs_app_edge.cpp
查看文件 @
bc76487
...
...
@@ -307,23 +307,6 @@ int SrsEdgeIngester::connect_server()
return
ret
;
}
SrsEdgeProxyContext
::
SrsEdgeProxyContext
()
{
edge_stream_id
=
0
;
edge_io
=
NULL
;
edge_rtmp
=
NULL
;
edge_stfd
=
NULL
;
origin_stream_id
=
0
;
origin_io
=
NULL
;
origin_rtmp
=
NULL
;
origin_stfd
=
NULL
;
}
SrsEdgeProxyContext
::~
SrsEdgeProxyContext
()
{
}
SrsEdgeForwarder
::
SrsEdgeForwarder
()
{
io
=
NULL
;
...
...
@@ -395,129 +378,30 @@ void SrsEdgeForwarder::stop()
srs_freep
(
io
);
}
int
SrsEdgeForwarder
::
proxy
(
Srs
EdgeProxyContext
*
context
)
int
SrsEdgeForwarder
::
proxy
(
Srs
CommonMessage
*
msg
)
{
int
ret
=
ERROR_SUCCESS
;
context
->
origin_io
=
io
;
context
->
origin_rtmp
=
client
;
context
->
origin_stream_id
=
stream_id
;
context
->
origin_stfd
=
stfd
;
context
->
origin_rtmp
->
set_recv_timeout
(
SRS_RECV_TIMEOUT_US
);
context
->
edge_rtmp
->
set_recv_timeout
(
SRS_RECV_TIMEOUT_US
);
SrsPithyPrint
pithy_print
(
SRS_STAGE_EDGE
);
pollfd
fds
[
2
];
fds
[
0
].
fd
=
st_netfd_fileno
(
context
->
edge_stfd
);
fds
[
0
].
events
=
POLLIN
;
fds
[
1
].
fd
=
st_netfd_fileno
(
context
->
origin_stfd
);
fds
[
1
].
events
=
POLLIN
;
while
(
true
)
{
// switch to other st-threads.
st_usleep
(
0
);
pithy_print
.
elapse
();
// pithy print
if
(
pithy_print
.
can_print
())
{
srs_trace
(
"<- time=%"
PRId64
", obytes=%"
PRId64
", ibytes=%"
PRId64
", okbps=%d, ikbps=%d"
,
pithy_print
.
age
(),
client
->
get_send_bytes
(),
client
->
get_recv_bytes
(),
client
->
get_send_kbps
(),
client
->
get_recv_kbps
());
}
fds
[
0
].
revents
=
0
;
fds
[
1
].
revents
=
0
;
// Upon successful completion, a non-negative value is returned.
// A positive value indicates the total number of OS file descriptors in pds that have events.
// A value of 0 indicates that the call timed out.
// Upon failure, a value of -1 is returned and errno is set to indicate the error
if
(
st_poll
(
fds
,
2
,
ST_UTIME_NO_TIMEOUT
)
<=
0
){
ret
=
ERROR_RTMP_EDGE_PROXY_PULL
;
srs_error
(
"edge wait for st_poll error. ret=%d"
,
ret
);
return
ret
;
}
// edge active
if
(
fds
[
0
].
revents
&
POLLIN
){
if
((
ret
=
proxy_edge_message
(
context
))
!=
ERROR_SUCCESS
){
return
ret
;
}
}
// origin active
if
(
fds
[
1
].
revents
&
POLLIN
){
if
((
ret
=
proxy_origin_message
(
context
))
!=
ERROR_SUCCESS
){
return
ret
;
}
}
}
return
ret
;
}
int
SrsEdgeForwarder
::
proxy_origin_message
(
SrsEdgeProxyContext
*
context
)
{
int
ret
=
ERROR_SUCCESS
;
SrsCommonMessage
*
msg
=
NULL
;
// process origin message.
ret
=
context
->
origin_rtmp
->
recv_message
(
&
msg
);
if
(
ret
!=
ERROR_SUCCESS
&&
ret
!=
ERROR_SOCKET_TIMEOUT
)
{
srs_error
(
"forward recv origin server message failed. ret=%d"
,
ret
);
return
ret
;
}
srs_assert
(
msg
);
if
(
msg
->
size
<=
0
// the msg is auto free by source,
// so we just ignore, or copy then send it.
if
(
msg
->
size
<=
0
||
msg
->
header
.
is_set_chunk_size
()
||
msg
->
header
.
is_window_ackledgement_size
()
||
msg
->
header
.
is_ackledgement
()
)
{
srs_freep
(
msg
);
return
ret
;
}
msg
->
header
.
stream_id
=
context
->
edge_stream_id
;
if
((
ret
=
context
->
edge_rtmp
->
send_message
(
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send origin message to client failed. ret=%d"
,
ret
);
return
ret
;
}
return
ret
;
}
int
SrsEdgeForwarder
::
proxy_edge_message
(
SrsEdgeProxyContext
*
context
)
{
int
ret
=
ERROR_SUCCESS
;
SrsCommonMessage
*
msg
=
NULL
;
// proxy client message to origin
ret
=
context
->
edge_rtmp
->
recv_message
(
&
msg
);
if
(
ret
!=
ERROR_SUCCESS
&&
ret
!=
ERROR_SOCKET_TIMEOUT
)
{
srs_error
(
"recv client message failed. ret=%d"
,
ret
);
return
ret
;
}
srs_assert
(
msg
);
if
(
msg
->
size
<=
0
||
msg
->
header
.
is_set_chunk_size
()
||
msg
->
header
.
is_window_ackledgement_size
()
||
msg
->
header
.
is_ackledgement
()
)
{
srs_freep
(
msg
);
SrsSharedPtrMessage
*
copy
=
new
SrsSharedPtrMessage
();
SrsAutoFree
(
SrsSharedPtrMessage
,
copy
,
false
);
if
((
ret
=
copy
->
initialize
(
msg
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"initialize the msg failed. ret=%d"
,
ret
);
return
ret
;
}
srs_verbose
(
"initialize shared ptr msg success."
);
msg
->
header
.
stream_id
=
context
->
origin_stream_id
;
if
((
ret
=
context
->
origin_rtmp
->
send_message
(
msg
))
!=
ERROR_SUCCESS
)
{
copy
->
header
.
stream_id
=
stream_id
;
if
((
ret
=
client
->
send_message
(
copy
->
copy
()))
!=
ERROR_SUCCESS
)
{
srs_error
(
"send client message to origin failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -730,13 +614,18 @@ int SrsPublishEdge::on_client_publish()
return
forwarder
->
start
();
}
int
SrsPublishEdge
::
on_proxy_publish
(
Srs
EdgeProxyContext
*
context
)
int
SrsPublishEdge
::
on_proxy_publish
(
Srs
CommonMessage
*
msg
)
{
int
ret
=
forwarder
->
proxy
(
context
);
return
forwarder
->
proxy
(
msg
);
}
void
SrsPublishEdge
::
on_proxy_unpublish
()
{
if
(
state
==
SrsEdgeStatePublish
)
{
forwarder
->
stop
();
}
SrsEdgeState
pstate
=
state
;
state
=
SrsEdgeStateInit
;
srs_trace
(
"edge change from %d to state %d (init)."
,
pstate
,
state
);
return
ret
;
}
...
...
trunk/src/app/srs_app_edge.hpp
查看文件 @
bc76487
...
...
@@ -101,23 +101,6 @@ private:
virtual
int
process_publish_message
(
SrsCommonMessage
*
msg
);
};
class
SrsEdgeProxyContext
{
public
:
int
edge_stream_id
;
st_netfd_t
edge_stfd
;
ISrsProtocolReaderWriter
*
edge_io
;
SrsRtmpServer
*
edge_rtmp
;
public
:
int
origin_stream_id
;
st_netfd_t
origin_stfd
;
ISrsProtocolReaderWriter
*
origin_io
;
SrsRtmpClient
*
origin_rtmp
;
public
:
SrsEdgeProxyContext
();
virtual
~
SrsEdgeProxyContext
();
};
/**
* edge used to forward stream to origin.
*/
...
...
@@ -141,10 +124,8 @@ public:
virtual
int
start
();
virtual
void
stop
();
public
:
virtual
int
proxy
(
Srs
EdgeProxyContext
*
context
);
virtual
int
proxy
(
Srs
CommonMessage
*
msg
);
private
:
virtual
int
proxy_origin_message
(
SrsEdgeProxyContext
*
context
);
virtual
int
proxy_edge_message
(
SrsEdgeProxyContext
*
context
);
virtual
void
close_underlayer_socket
();
virtual
int
connect_server
();
};
...
...
@@ -201,7 +182,11 @@ public:
/**
* proxy publish stream to edge
*/
virtual
int
on_proxy_publish
(
SrsEdgeProxyContext
*
context
);
virtual
int
on_proxy_publish
(
SrsCommonMessage
*
msg
);
/**
* proxy unpublish stream to edge.
*/
virtual
void
on_proxy_unpublish
();
};
#endif
...
...
trunk/src/app/srs_app_rtmp_conn.cpp
查看文件 @
bc76487
...
...
@@ -145,7 +145,7 @@ int SrsRtmpConn::do_cycle()
req
->
app
.
c_str
());
ret
=
service_cycle
();
on_close
();
http_hooks_
on_close
();
return
ret
;
}
...
...
@@ -312,14 +312,14 @@ int SrsRtmpConn::stream_service_cycle()
srs_error
(
"start to play stream failed. ret=%d"
,
ret
);
return
ret
;
}
if
((
ret
=
on_play
())
!=
ERROR_SUCCESS
)
{
if
((
ret
=
http_hooks_
on_play
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"http hook on_play failed. ret=%d"
,
ret
);
return
ret
;
}
srs_info
(
"start to play stream %s success"
,
req
->
stream
.
c_str
());
ret
=
playing
(
source
);
on_stop
();
http_hooks_
on_stop
();
return
ret
;
}
...
...
@@ -338,23 +338,23 @@ int SrsRtmpConn::stream_service_cycle()
return
ret
;
}
SrsEdgeProxyContext
context
;
context
.
edge_io
=
skt
;
context
.
edge_stream_id
=
res
->
stream_id
;
context
.
edge_rtmp
=
rtmp
;
context
.
edge_stfd
=
stfd
;
if
(
vhost_is_edge
)
{
return
source
->
on_edge_proxy_publish
(
&
context
);
}
if
((
ret
=
on_publish
())
!=
ERROR_SUCCESS
)
{
if
((
ret
=
http_hooks_on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"http hook on_publish failed. ret=%d"
,
ret
);
return
ret
;
}
srs_info
(
"start to publish stream %s success"
,
req
->
stream
.
c_str
());
ret
=
fmle_publish
(
source
);
source
->
on_unpublish
();
on_unpublish
();
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
if
(
vhost_is_edge
)
{
source
->
on_edge_proxy_unpublish
();
}
else
{
source
->
on_unpublish
();
}
http_hooks_on_unpublish
();
return
ret
;
}
case
SrsRtmpConnFlashPublish
:
{
...
...
@@ -372,23 +372,23 @@ int SrsRtmpConn::stream_service_cycle()
return
ret
;
}
SrsEdgeProxyContext
context
;
context
.
edge_io
=
skt
;
context
.
edge_stream_id
=
res
->
stream_id
;
context
.
edge_rtmp
=
rtmp
;
context
.
edge_stfd
=
stfd
;
if
(
vhost_is_edge
)
{
return
source
->
on_edge_proxy_publish
(
&
context
);
}
if
((
ret
=
on_publish
())
!=
ERROR_SUCCESS
)
{
if
((
ret
=
http_hooks_on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"http hook on_publish failed. ret=%d"
,
ret
);
return
ret
;
}
srs_info
(
"flash start to publish stream %s success"
,
req
->
stream
.
c_str
());
ret
=
flash_publish
(
source
);
source
->
on_unpublish
();
on_unpublish
();
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
if
(
vhost_is_edge
)
{
source
->
on_edge_proxy_unpublish
();
}
else
{
source
->
on_unpublish
();
}
http_hooks_on_unpublish
();
return
ret
;
}
default
:
{
...
...
@@ -431,7 +431,7 @@ int SrsRtmpConn::check_vhost()
}
srs_verbose
(
"check refer success."
);
if
((
ret
=
on_connect
())
!=
ERROR_SUCCESS
)
{
if
((
ret
=
http_hooks_
on_connect
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
...
...
@@ -562,6 +562,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
}
srs_verbose
(
"fmle hls on_publish success."
);
bool
vhost_is_edge
=
_srs_config
->
get_vhost_is_edge
(
req
->
vhost
);
while
(
true
)
{
// switch to other st-threads.
st_usleep
(
0
);
...
...
@@ -604,7 +606,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
}
// video, audio, data message
if
((
ret
=
process_publish_message
(
source
,
msg
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
process_publish_message
(
source
,
msg
,
vhost_is_edge
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"fmle process publish message failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -632,6 +634,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
}
srs_verbose
(
"flash hls on_publish success."
);
bool
vhost_is_edge
=
_srs_config
->
get_vhost_is_edge
(
req
->
vhost
);
while
(
true
)
{
// switch to other st-threads.
st_usleep
(
0
);
...
...
@@ -668,7 +672,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
}
// video, audio, data message
if
((
ret
=
process_publish_message
(
source
,
msg
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
process_publish_message
(
source
,
msg
,
vhost_is_edge
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"flash process publish message failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -677,10 +681,15 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
return
ret
;
}
int
SrsRtmpConn
::
process_publish_message
(
SrsSource
*
source
,
SrsCommonMessage
*
msg
)
int
SrsRtmpConn
::
process_publish_message
(
SrsSource
*
source
,
SrsCommonMessage
*
msg
,
bool
vhost_is_edge
)
{
int
ret
=
ERROR_SUCCESS
;
// for edge, directly proxy message to origin.
if
(
vhost_is_edge
)
{
return
source
->
on_edge_proxy_publish
(
msg
);
}
// process audio packet
if
(
msg
->
header
.
is_audio
())
{
if
((
ret
=
source
->
on_audio
(
msg
))
!=
ERROR_SUCCESS
)
{
...
...
@@ -771,7 +780,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
return
ret
;
}
int
SrsRtmpConn
::
on_connect
()
int
SrsRtmpConn
::
http_hooks_
on_connect
()
{
int
ret
=
ERROR_SUCCESS
;
...
...
@@ -795,7 +804,7 @@ int SrsRtmpConn::on_connect()
return
ret
;
}
void
SrsRtmpConn
::
on_close
()
void
SrsRtmpConn
::
http_hooks_
on_close
()
{
#ifdef SRS_AUTO_HTTP_CALLBACK
// whatever the ret code, notify the api hooks.
...
...
@@ -813,7 +822,7 @@ void SrsRtmpConn::on_close()
#endif
}
int
SrsRtmpConn
::
on_publish
()
int
SrsRtmpConn
::
http_hooks_
on_publish
()
{
int
ret
=
ERROR_SUCCESS
;
...
...
@@ -837,7 +846,7 @@ int SrsRtmpConn::on_publish()
return
ret
;
}
void
SrsRtmpConn
::
on_unpublish
()
void
SrsRtmpConn
::
http_hooks_
on_unpublish
()
{
#ifdef SRS_AUTO_HTTP_CALLBACK
// whatever the ret code, notify the api hooks.
...
...
@@ -855,7 +864,7 @@ void SrsRtmpConn::on_unpublish()
#endif
}
int
SrsRtmpConn
::
on_play
()
int
SrsRtmpConn
::
http_hooks_
on_play
()
{
int
ret
=
ERROR_SUCCESS
;
...
...
@@ -879,7 +888,7 @@ int SrsRtmpConn::on_play()
return
ret
;
}
void
SrsRtmpConn
::
on_stop
()
void
SrsRtmpConn
::
http_hooks_
on_stop
()
{
#ifdef SRS_AUTO_HTTP_CALLBACK
// whatever the ret code, notify the api hooks.
...
...
trunk/src/app/srs_app_rtmp_conn.hpp
查看文件 @
bc76487
...
...
@@ -80,15 +80,15 @@ private:
virtual
int
playing
(
SrsSource
*
source
);
virtual
int
fmle_publish
(
SrsSource
*
source
);
virtual
int
flash_publish
(
SrsSource
*
source
);
virtual
int
process_publish_message
(
SrsSource
*
source
,
SrsCommonMessage
*
msg
);
virtual
int
process_publish_message
(
SrsSource
*
source
,
SrsCommonMessage
*
msg
,
bool
vhost_is_edge
);
virtual
int
process_play_control_msg
(
SrsConsumer
*
consumer
,
SrsCommonMessage
*
msg
);
private
:
virtual
int
on_connect
();
virtual
void
on_close
();
virtual
int
on_publish
();
virtual
void
on_unpublish
();
virtual
int
on_play
();
virtual
void
on_stop
();
virtual
int
http_hooks_on_connect
();
virtual
void
http_hooks_on_close
();
virtual
int
http_hooks_on_publish
();
virtual
void
http_hooks_on_unpublish
();
virtual
int
http_hooks_on_play
();
virtual
void
http_hooks_on_stop
();
};
#endif
...
...
trunk/src/app/srs_app_source.cpp
查看文件 @
bc76487
...
...
@@ -1199,9 +1199,14 @@ int SrsSource::on_edge_start_publish()
return
publish_edge
->
on_client_publish
();
}
int
SrsSource
::
on_edge_proxy_publish
(
Srs
EdgeProxyContext
*
context
)
int
SrsSource
::
on_edge_proxy_publish
(
Srs
CommonMessage
*
msg
)
{
return
publish_edge
->
on_proxy_publish
(
context
);
return
publish_edge
->
on_proxy_publish
(
msg
);
}
void
SrsSource
::
on_edge_proxy_unpublish
()
{
publish_edge
->
on_proxy_unpublish
();
}
int
SrsSource
::
create_forwarders
()
...
...
trunk/src/app/srs_app_source.hpp
查看文件 @
bc76487
...
...
@@ -322,7 +322,9 @@ public:
// for edge, when publish edge stream, check the state
virtual
int
on_edge_start_publish
();
// for edge, proxy the publish
virtual
int
on_edge_proxy_publish
(
SrsEdgeProxyContext
*
context
);
virtual
int
on_edge_proxy_publish
(
SrsCommonMessage
*
msg
);
// for edge, proxy stop publish
virtual
void
on_edge_proxy_unpublish
();
private
:
virtual
int
create_forwarders
();
virtual
void
destroy_forwarders
();
...
...
请
注册
或
登录
后发表评论