Toggle navigation
Toggle navigation
此项目
正在载入...
Sign in
胡斌
/
srs
转到一个项目
Toggle navigation
项目
群组
代码片段
帮助
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
winlin
2015-09-15 15:58:57 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
2a1b2b69a08d4f1121bfb3477b85931f01cdd66b
2a1b2b69
1 parent
8f9cfcdc
for #459, dvr support apply filter for ng-control dvr module.
隐藏空白字符变更
内嵌
并排对比
正在显示
13 个修改的文件
包含
273 行增加
和
98 行删除
README.md
trunk/conf/full.conf
trunk/src/app/srs_app_config.cpp
trunk/src/app/srs_app_config.hpp
trunk/src/app/srs_app_dvr.cpp
trunk/src/app/srs_app_dvr.hpp
trunk/src/app/srs_app_hls.cpp
trunk/src/app/srs_app_hls.hpp
trunk/src/app/srs_app_reload.cpp
trunk/src/app/srs_app_reload.hpp
trunk/src/app/srs_app_source.cpp
trunk/src/app/srs_app_source.hpp
trunk/src/core/srs_core.hpp
README.md
查看文件 @
2a1b2b6
...
...
@@ -344,8 +344,9 @@ Remark:
## History
*
v3.0, 2015-09-14, fix
[
#319
][
bug #319
]
, http raw api support update global and vhost. 3.0.4
*
v3.0, 2015-08-31, fix
[
#319
][
bug #319
]
, http raw api support query global and vhost. 3.0.3
*
v3.0, 2015-09-14, fix
[
#459
][
bug #459
]
, dvr support apply filter for ng-control dvr module.
*
v3.0, 2015-09-14, fix
[
#319
][
bug #319
]
, http raw api support update global and vhost. 3.0.3
*
v3.0, 2015-08-31, fix
[
#319
][
bug #319
]
, http raw api support query global and vhost.
*
v3.0, 2015-08-28, fix
[
#471
][
bug #471
]
, api response the width and height. 3.0.2
*
v3.0, 2015-08-25, fix
[
#367
][
bug #367
]
, support nginx-rtmp exec. 3.0.1
*
<strong>
v2.0, 2015-08-23,
[
2.0 alpha0(2.0.185)
][
r2.0a0
]
released. 89022 lines.
</strong>
...
...
trunk/conf/full.conf
查看文件 @
2a1b2b6
...
...
@@ -1063,6 +1063,13 @@ vhost dvr.srs.com {
# whether enabled dvr features
# default: off
enabled
on
;
# the filter for dvr to aplly to.
# all, dvr all streams of all apps.
# <app>/<stream>, apply to specified stream of app.
# for example, to dvr the following two streams:
# live/stream1 live/stream2
# default: all
dvr_apply
all
;
# the dvr plan. canbe:
# session reap flv when session end(unpublish).
# segment reap flv when flv duration exceed the specified dvr_duration.
...
...
trunk/src/app/srs_app_config.cpp
查看文件 @
2a1b2b6
...
...
@@ -832,8 +832,8 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
srs_trace
(
"vhost %s reload hds success."
,
vhost
.
c_str
());
}
// dvr, only one per vhost
if
(
!
srs_directive_equals
(
new_vhost
->
get
(
"dvr"
),
old_vhost
->
get
(
"dvr"
)))
{
// dvr, only one per vhost, except the dvr_apply
if
(
!
srs_directive_equals
(
new_vhost
->
get
(
"dvr"
),
old_vhost
->
get
(
"dvr"
),
"dvr_apply"
))
{
for
(
it
=
subscribes
.
begin
();
it
!=
subscribes
.
end
();
++
it
)
{
ISrsReloadHandler
*
subscribe
=
*
it
;
if
((
ret
=
subscribe
->
on_reload_vhost_dvr
(
vhost
))
!=
ERROR_SUCCESS
)
{
...
...
@@ -843,6 +843,24 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
}
srs_trace
(
"vhost %s reload dvr success."
,
vhost
.
c_str
());
}
// dvr_apply, the dynamic dvr filter.
if
(
true
)
{
// we must reload the dvr_apply, for it's apply to specified stream,
// and we donot want one stream reload take effect on another one.
// @see https://github.com/simple-rtmp-server/srs/issues/459#issuecomment-140296597
SrsConfDirective
*
nda
=
new_vhost
->
get
(
"dvr"
)
?
new_vhost
->
get
(
"dvr"
)
->
get
(
"dvr_apply"
)
:
NULL
;
SrsConfDirective
*
oda
=
old_vhost
->
get
(
"dvr"
)
?
old_vhost
->
get
(
"dvr"
)
->
get
(
"dvr_apply"
)
:
NULL
;
if
(
!
srs_directive_equals
(
nda
,
oda
))
{
for
(
it
=
subscribes
.
begin
();
it
!=
subscribes
.
end
();
++
it
)
{
ISrsReloadHandler
*
subscribe
=
*
it
;
if
((
ret
=
subscribe
->
on_reload_vhost_dvr_apply
(
vhost
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"vhost %s notify subscribes dvr_apply failed. ret=%d"
,
vhost
.
c_str
(),
ret
);
return
ret
;
}
}
srs_trace
(
"vhost %s reload dvr_apply success."
,
vhost
.
c_str
());
}
}
// exec, only one per vhost
if
(
!
srs_directive_equals
(
new_vhost
->
get
(
"exec"
),
old_vhost
->
get
(
"exec"
)))
{
...
...
@@ -2070,6 +2088,8 @@ int SrsConfig::vhost_to_json(SrsConfDirective* vhost, SrsAmf0Object* obj)
if
(
sdir
->
name
==
"dvr_plan"
)
{
dvr
->
set
(
"dvr_plan"
,
sdir
->
dumps_arg0_to_str
());
}
else
if
(
sdir
->
name
==
"dvr_apply"
)
{
dvr
->
set
(
"dvr_apply"
,
sdir
->
dumps_args
());
}
else
if
(
sdir
->
name
==
"dvr_path"
)
{
dvr
->
set
(
"dvr_path"
,
sdir
->
dumps_arg0_to_str
());
}
else
if
(
sdir
->
name
==
"dvr_duration"
)
{
...
...
@@ -3141,7 +3161,7 @@ int SrsConfig::check_config()
if
(
n
==
"dvr"
)
{
for
(
int
j
=
0
;
j
<
(
int
)
conf
->
directives
.
size
();
j
++
)
{
string
m
=
conf
->
at
(
j
)
->
name
.
c_str
();
if
(
m
!=
"enabled"
&&
m
!=
"dvr_path"
&&
m
!=
"dvr_plan"
if
(
m
!=
"enabled"
&&
m
!=
"dvr_apply"
&&
m
!=
"dvr_path"
&&
m
!=
"dvr_plan"
&&
m
!=
"dvr_duration"
&&
m
!=
"dvr_wait_keyframe"
&&
m
!=
"time_jitter"
)
{
ret
=
ERROR_SYSTEM_CONFIG_INVALID
;
...
...
@@ -5558,6 +5578,22 @@ bool SrsConfig::get_dvr_enabled(string vhost)
return
SRS_CONF_PERFER_FALSE
(
conf
->
arg0
());
}
SrsConfDirective
*
SrsConfig
::
get_dvr_apply
(
string
vhost
)
{
SrsConfDirective
*
conf
=
get_dvr
(
vhost
);
if
(
!
conf
)
{
return
NULL
;
}
conf
=
conf
->
get
(
"dvr_apply"
);
if
(
!
conf
||
conf
->
arg0
().
empty
())
{
return
NULL
;
}
return
conf
;
}
string
SrsConfig
::
get_dvr_path
(
string
vhost
)
{
static
string
DEFAULT
=
"./objs/nginx/html/[app]/[stream].[timestamp].flv"
;
...
...
@@ -6179,7 +6215,7 @@ namespace _srs_internal
}
};
bool
srs_directive_equals
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
)
bool
srs_directive_equals
_self
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
)
{
// both NULL, equal.
if
(
!
a
&&
!
b
)
{
...
...
@@ -6208,6 +6244,20 @@ bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b)
return
false
;
}
return
true
;
}
bool
srs_directive_equals
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
)
{
// both NULL, equal.
if
(
!
a
&&
!
b
)
{
return
true
;
}
if
(
!
srs_directive_equals_self
(
a
,
b
))
{
return
false
;
}
for
(
int
i
=
0
;
i
<
(
int
)
a
->
directives
.
size
();
i
++
)
{
SrsConfDirective
*
a0
=
a
->
at
(
i
);
SrsConfDirective
*
b0
=
b
->
at
(
i
);
...
...
@@ -6220,6 +6270,34 @@ bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b)
return
true
;
}
bool
srs_directive_equals
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
,
string
except
)
{
// both NULL, equal.
if
(
!
a
&&
!
b
)
{
return
true
;
}
if
(
!
srs_directive_equals_self
(
a
,
b
))
{
return
false
;
}
for
(
int
i
=
0
;
i
<
(
int
)
a
->
directives
.
size
();
i
++
)
{
SrsConfDirective
*
a0
=
a
->
at
(
i
);
SrsConfDirective
*
b0
=
b
->
at
(
i
);
// donot compare the except child directive.
if
(
a0
->
name
==
except
)
{
continue
;
}
if
(
!
srs_directive_equals
(
a0
,
b0
,
except
))
{
return
false
;
}
}
return
true
;
}
bool
srs_config_hls_is_on_error_ignore
(
string
strategy
)
{
return
strategy
==
"ignore"
;
...
...
@@ -6270,6 +6348,27 @@ bool srs_stream_caster_is_flv(string caster)
return
caster
==
"flv"
;
}
bool
srs_config_apply_filter
(
SrsConfDirective
*
dvr_apply
,
SrsRequest
*
req
)
{
static
bool
DEFAULT
=
true
;
if
(
!
dvr_apply
||
dvr_apply
->
args
.
empty
())
{
return
DEFAULT
;
}
vector
<
string
>&
args
=
dvr_apply
->
args
;
if
(
args
.
size
()
==
1
&&
dvr_apply
->
arg0
()
==
"all"
)
{
return
true
;
}
string
id
=
req
->
app
+
"/"
+
req
->
stream
;
if
(
::
find
(
args
.
begin
(),
args
.
end
(),
id
)
!=
args
.
end
())
{
return
true
;
}
return
false
;
}
string
srs_config_bool2switch
(
const
string
&
sbool
)
{
return
sbool
==
"true"
?
"on"
:
"off"
;
...
...
trunk/src/app/srs_app_config.hpp
查看文件 @
2a1b2b6
...
...
@@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_reload.hpp>
class
SrsRequest
;
class
SrsFileWriter
;
class
SrsAmf0Object
;
class
SrsAmf0StrictArray
;
...
...
@@ -1116,6 +1117,11 @@ public:
*/
virtual
bool
get_dvr_enabled
(
std
::
string
vhost
);
/**
* get the filter of dvr to apply to.
* @remark user can use srs_config_apply_filter(conf, req):bool to check it.
*/
virtual
SrsConfDirective
*
get_dvr_apply
(
std
::
string
vhost
);
/**
* get the dvr path, the flv file to save in.
*/
virtual
std
::
string
get_dvr_path
(
std
::
string
vhost
);
...
...
@@ -1308,8 +1314,9 @@ namespace _srs_internal
/**
* deep compare directive.
*/
*/
extern
bool
srs_directive_equals
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
);
extern
bool
srs_directive_equals
(
SrsConfDirective
*
a
,
SrsConfDirective
*
b
,
std
::
string
except
);
/**
* helper utilities, used for compare the consts values.
...
...
@@ -1324,6 +1331,8 @@ extern bool srs_config_dvr_is_plan_append(std::string plan);
extern
bool
srs_stream_caster_is_udp
(
std
::
string
caster
);
extern
bool
srs_stream_caster_is_rtsp
(
std
::
string
caster
);
extern
bool
srs_stream_caster_is_flv
(
std
::
string
caster
);
// whether the dvr_apply active the stream specified by req.
extern
bool
srs_config_apply_filter
(
SrsConfDirective
*
dvr_apply
,
SrsRequest
*
req
);
/**
* convert bool in str to on/off
...
...
trunk/src/app/srs_app_dvr.cpp
查看文件 @
2a1b2b6
...
...
@@ -972,10 +972,16 @@ SrsDvr::SrsDvr()
{
source
=
NULL
;
plan
=
NULL
;
req
=
NULL
;
actived
=
false
;
_srs_config
->
subscribe
(
this
);
}
SrsDvr
::~
SrsDvr
()
{
_srs_config
->
unsubscribe
(
this
);
srs_freep
(
plan
);
}
...
...
@@ -983,30 +989,39 @@ int SrsDvr::initialize(SrsSource* s, SrsRequest* r)
{
int
ret
=
ERROR_SUCCESS
;
req
=
r
;
source
=
s
;
SrsConfDirective
*
conf
=
_srs_config
->
get_dvr_apply
(
r
->
vhost
);
actived
=
srs_config_apply_filter
(
conf
,
r
);
srs_freep
(
plan
);
plan
=
SrsDvrPlan
::
create_plan
(
r
->
vhost
);
if
((
ret
=
plan
->
initialize
(
r
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
if
((
ret
=
source
->
on_dvr_request_sh
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
return
ret
;
}
int
SrsDvr
::
on_publish
(
SrsRequest
*
/*r*/
)
int
SrsDvr
::
on_publish
(
bool
fetch_sequence_header
)
{
int
ret
=
ERROR_SUCCESS
;
// the dvr for this stream is not actived.
if
(
!
actived
)
{
return
ret
;
}
if
((
ret
=
plan
->
on_publish
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
if
(
fetch_sequence_header
&&
(
ret
=
source
->
on_dvr_request_sh
())
!=
ERROR_SUCCESS
)
{
return
ret
;
}
return
ret
;
}
...
...
@@ -1019,6 +1034,11 @@ void SrsDvr::on_unpublish()
int
SrsDvr
::
on_meta_data
(
SrsOnMetaDataPacket
*
m
)
{
int
ret
=
ERROR_SUCCESS
;
// the dvr for this stream is not actived.
if
(
!
actived
)
{
return
ret
;
}
int
size
=
0
;
char
*
payload
=
NULL
;
...
...
@@ -1040,14 +1060,42 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m)
int
SrsDvr
::
on_audio
(
SrsSharedPtrMessage
*
shared_audio
)
{
// the dvr for this stream is not actived.
if
(
!
actived
)
{
return
ERROR_SUCCESS
;
}
return
plan
->
on_audio
(
shared_audio
);
}
int
SrsDvr
::
on_video
(
SrsSharedPtrMessage
*
shared_video
)
{
// the dvr for this stream is not actived.
if
(
!
actived
)
{
return
ERROR_SUCCESS
;
}
return
plan
->
on_video
(
shared_video
);
}
int
SrsDvr
::
on_reload_vhost_dvr_apply
(
string
vhost
)
{
int
ret
=
ERROR_SUCCESS
;
SrsConfDirective
*
conf
=
_srs_config
->
get_dvr_apply
(
req
->
vhost
);
bool
v
=
srs_config_apply_filter
(
conf
,
req
);
// the apply changed, republish the dvr.
if
(
v
!=
actived
)
{
actived
=
v
;
on_unpublish
();
return
on_publish
(
true
);
}
return
ret
;
}
#endif
...
...
trunk/src/app/srs_app_dvr.hpp
查看文件 @
2a1b2b6
...
...
@@ -297,27 +297,33 @@ private:
* dvr(digital video recorder) to record RTMP stream to flv file.
* TODO: FIXME: add utest for it.
*/
class
SrsDvr
class
SrsDvr
:
public
ISrsReloadHandler
{
private
:
SrsSource
*
source
;
private
:
SrsDvrPlan
*
plan
;
SrsRequest
*
req
;
private
:
// whether the dvr is actived by filter, which is specified by dvr_apply.
// we always initialize the dvr, which crote plan and segment object,
// but they never create actual piece of file util the apply active it.
bool
actived
;
public
:
SrsDvr
();
virtual
~
SrsDvr
();
public
:
/**
* initialize dvr, create dvr plan.
* when system initialize(encoder publish at first time, or reload),
* initialize the dvr will reinitialize the plan, the whole dvr framework.
*/
* initialize dvr, create dvr plan.
* when system initialize(encoder publish at first time, or reload),
* initialize the dvr will reinitialize the plan, the whole dvr framework.
*/
virtual
int
initialize
(
SrsSource
*
s
,
SrsRequest
*
r
);
/**
* publish stream event,
* when encoder start to publish RTMP stream.
*/
virtual
int
on_publish
(
SrsRequest
*
r
);
* publish stream event,
* when encoder start to publish RTMP stream.
* @param fetch_sequence_header whether fetch sequence from source.
*/
virtual
int
on_publish
(
bool
fetch_sequence_header
);
/**
* the unpublish event.,
* when encoder stop(unpublish) to publish RTMP stream.
...
...
@@ -337,6 +343,9 @@ public:
* @param shared_video, directly ptr, copy it if need to save it.
*/
virtual
int
on_video
(
SrsSharedPtrMessage
*
shared_video
);
// interface ISrsReloadHandler
public:
virtual
int
on_reload_vhost_dvr_apply
(
std
::
string
vhost
);
};
#endif
...
...
trunk/src/app/srs_app_hls.cpp
查看文件 @
2a1b2b6
...
...
@@ -1163,7 +1163,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
SrsHls
::
SrsHls
()
{
_
req
=
NULL
;
req
=
NULL
;
source
=
NULL
;
handler
=
NULL
;
...
...
@@ -1184,7 +1184,6 @@ SrsHls::SrsHls()
SrsHls
::~
SrsHls
()
{
srs_freep
(
_req
);
srs_freep
(
codec
);
srs_freep
(
sample
);
srs_freep
(
jitter
);
...
...
@@ -1214,11 +1213,11 @@ int SrsHls::cycle()
last_update_time
=
srs_get_system_time_ms
();
}
if
(
!
_
req
)
{
if
(
!
req
)
{
return
ret
;
}
int
hls_dispose
=
_srs_config
->
get_hls_dispose
(
_
req
->
vhost
)
*
1000
;
int
hls_dispose
=
_srs_config
->
get_hls_dispose
(
req
->
vhost
)
*
1000
;
if
(
hls_dispose
<=
0
)
{
return
ret
;
}
...
...
@@ -1232,18 +1231,19 @@ int SrsHls::cycle()
}
hls_can_dispose
=
false
;
srs_trace
(
"hls cycle to dispose hls %s, timeout=%dms"
,
_
req
->
get_stream_url
().
c_str
(),
hls_dispose
);
srs_trace
(
"hls cycle to dispose hls %s, timeout=%dms"
,
req
->
get_stream_url
().
c_str
(),
hls_dispose
);
dispose
();
return
ret
;
}
int
SrsHls
::
initialize
(
SrsSource
*
s
,
ISrsHlsHandler
*
h
)
int
SrsHls
::
initialize
(
SrsSource
*
s
,
ISrsHlsHandler
*
h
,
SrsRequest
*
r
)
{
int
ret
=
ERROR_SUCCESS
;
source
=
s
;
handler
=
h
;
req
=
r
;
if
((
ret
=
muxer
->
initialize
(
h
))
!=
ERROR_SUCCESS
)
{
return
ret
;
...
...
@@ -1252,13 +1252,10 @@ int SrsHls::initialize(SrsSource* s, ISrsHlsHandler* h)
return
ret
;
}
int
SrsHls
::
on_publish
(
SrsRequest
*
req
,
bool
fetch_sequence_header
)
int
SrsHls
::
on_publish
(
bool
fetch_sequence_header
)
{
int
ret
=
ERROR_SUCCESS
;
srs_freep
(
_req
);
_req
=
req
->
copy
();
// update the hls time, for hls_dispose.
last_update_time
=
srs_get_system_time_ms
();
...
...
@@ -1412,7 +1409,7 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps)
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/simple-rtmp-server/srs/issues/474
if
(
is_sps_pps
)
{
codec
->
avc_parse_sps
=
_srs_config
->
get_parse_sps
(
_
req
->
vhost
);
codec
->
avc_parse_sps
=
_srs_config
->
get_parse_sps
(
req
->
vhost
);
}
sample
->
clear
();
...
...
trunk/src/app/srs_app_hls.hpp
查看文件 @
2a1b2b6
...
...
@@ -388,7 +388,7 @@ private:
SrsHlsCache
*
hls_cache
;
ISrsHlsHandler
*
handler
;
private
:
SrsRequest
*
_
req
;
SrsRequest
*
req
;
bool
hls_enabled
;
bool
hls_can_dispose
;
int64_t
last_update_time
;
...
...
@@ -422,13 +422,13 @@ public:
/**
* initialize the hls by handler and source.
*/
virtual
int
initialize
(
SrsSource
*
s
,
ISrsHlsHandler
*
h
);
virtual
int
initialize
(
SrsSource
*
s
,
ISrsHlsHandler
*
h
,
SrsRequest
*
r
);
/**
* publish stream event, continue to write the m3u8,
* for the muxer object not destroyed.
* @param fetch_sequence_header whether fetch sequence from source.
*/
virtual
int
on_publish
(
SrsRequest
*
req
,
bool
fetch_sequence_header
);
virtual
int
on_publish
(
bool
fetch_sequence_header
);
/**
* the unpublish event, only close the muxer, donot destroy the
* muxer, for when we continue to publish, the m3u8 will continue.
...
...
trunk/src/app/srs_app_reload.cpp
查看文件 @
2a1b2b6
...
...
@@ -155,6 +155,11 @@ int ISrsReloadHandler::on_reload_vhost_dvr(string /*vhost*/)
return
ERROR_SUCCESS
;
}
int
ISrsReloadHandler
::
on_reload_vhost_dvr_apply
(
string
/*vhost*/
)
{
return
ERROR_SUCCESS
;
}
int
ISrsReloadHandler
::
on_reload_vhost_publish
(
string
/*vhost*/
)
{
return
ERROR_SUCCESS
;
...
...
trunk/src/app/srs_app_reload.hpp
查看文件 @
2a1b2b6
...
...
@@ -70,6 +70,7 @@ public:
virtual
int
on_reload_vhost_hls
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_hds
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_dvr
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_dvr_apply
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_publish
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_tcp_nodelay
(
std
::
string
vhost
);
virtual
int
on_reload_vhost_realtime
(
std
::
string
vhost
);
...
...
trunk/src/app/srs_app_source.cpp
查看文件 @
2a1b2b6
...
...
@@ -771,7 +771,7 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source
->
_
req
->
update_auth
(
r
);
source
->
req
->
update_auth
(
r
);
return
source
;
}
...
...
@@ -900,7 +900,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
SrsSource
::
SrsSource
()
{
_
req
=
NULL
;
req
=
NULL
;
jitter_algorithm
=
SrsRtmpJitterAlgorithmOFF
;
mix_correct
=
false
;
mix_queue
=
new
SrsMixQueue
();
...
...
@@ -977,7 +977,7 @@ SrsSource::~SrsSource()
srs_freep
(
hds
);
#endif
srs_freep
(
_
req
);
srs_freep
(
req
);
}
void
SrsSource
::
dispose
()
...
...
@@ -1014,36 +1014,36 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* h
srs_assert
(
h
);
srs_assert
(
hh
);
srs_assert
(
!
_
req
);
srs_assert
(
!
req
);
handler
=
h
;
_req
=
r
->
copy
();
atc
=
_srs_config
->
get_atc
(
_req
->
vhost
);
req
=
r
->
copy
();
atc
=
_srs_config
->
get_atc
(
req
->
vhost
);
#ifdef SRS_AUTO_HLS
if
((
ret
=
hls
->
initialize
(
this
,
hh
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
hls
->
initialize
(
this
,
hh
,
req
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
#endif
#ifdef SRS_AUTO_DVR
if
((
ret
=
dvr
->
initialize
(
this
,
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
dvr
->
initialize
(
this
,
req
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
#endif
if
((
ret
=
play_edge
->
initialize
(
this
,
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
play_edge
->
initialize
(
this
,
req
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
if
((
ret
=
publish_edge
->
initialize
(
this
,
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
publish_edge
->
initialize
(
this
,
req
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
double
queue_size
=
_srs_config
->
get_queue_length
(
_
req
->
vhost
);
double
queue_size
=
_srs_config
->
get_queue_length
(
req
->
vhost
);
publish_edge
->
set_queue_size
(
queue_size
);
jitter_algorithm
=
(
SrsRtmpJitterAlgorithm
)
_srs_config
->
get_time_jitter
(
_req
->
vhost
);
mix_correct
=
_srs_config
->
get_mix_correct
(
_req
->
vhost
);
jitter_algorithm
=
(
SrsRtmpJitterAlgorithm
)
_srs_config
->
get_time_jitter
(
req
->
vhost
);
mix_correct
=
_srs_config
->
get_mix_correct
(
req
->
vhost
);
return
ret
;
}
...
...
@@ -1052,16 +1052,16 @@ int SrsSource::on_reload_vhost_play(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
// time_jitter
jitter_algorithm
=
(
SrsRtmpJitterAlgorithm
)
_srs_config
->
get_time_jitter
(
_
req
->
vhost
);
jitter_algorithm
=
(
SrsRtmpJitterAlgorithm
)
_srs_config
->
get_time_jitter
(
req
->
vhost
);
// mix_correct
if
(
true
)
{
bool
v
=
_srs_config
->
get_mix_correct
(
_
req
->
vhost
);
bool
v
=
_srs_config
->
get_mix_correct
(
req
->
vhost
);
// when changed, clear the mix queue.
if
(
v
!=
mix_correct
)
{
...
...
@@ -1086,7 +1086,7 @@ int SrsSource::on_reload_vhost_play(string vhost)
bool
v
=
_srs_config
->
get_gop_cache
(
vhost
);
if
(
v
!=
gop_cache
->
enabled
())
{
string
url
=
_
req
->
get_stream_url
();
string
url
=
req
->
get_stream_url
();
srs_trace
(
"vhost %s gop_cache changed to %d, source url=%s"
,
vhost
.
c_str
(),
v
,
url
.
c_str
());
gop_cache
->
set
(
v
);
}
...
...
@@ -1094,7 +1094,7 @@ int SrsSource::on_reload_vhost_play(string vhost)
// queue length
if
(
true
)
{
double
v
=
_srs_config
->
get_queue_length
(
_
req
->
vhost
);
double
v
=
_srs_config
->
get_queue_length
(
req
->
vhost
);
if
(
true
)
{
std
::
vector
<
SrsConsumer
*>::
iterator
it
;
...
...
@@ -1131,7 +1131,7 @@ int SrsSource::on_reload_vhost_forward(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
...
...
@@ -1153,7 +1153,7 @@ int SrsSource::on_reload_vhost_hls(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
...
...
@@ -1161,7 +1161,7 @@ int SrsSource::on_reload_vhost_hls(string vhost)
#ifdef SRS_AUTO_HLS
hls
->
on_unpublish
();
if
((
ret
=
hls
->
on_publish
(
_req
,
true
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
hls
->
on_publish
(
true
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"hls publish failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1175,7 +1175,7 @@ int SrsSource::on_reload_vhost_hds(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
...
...
@@ -1183,7 +1183,7 @@ int SrsSource::on_reload_vhost_hds(string vhost)
#ifdef SRS_AUTO_HDS
hds
->
on_unpublish
();
if
((
ret
=
hds
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
hds
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"hds publish failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1197,7 +1197,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
...
...
@@ -1208,12 +1208,12 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
dvr
->
on_unpublish
();
// reinitialize the dvr, update plan.
if
((
ret
=
dvr
->
initialize
(
this
,
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
dvr
->
initialize
(
this
,
req
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
// start to publish by new plan.
if
((
ret
=
dvr
->
on_publish
(
_req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
dvr
->
on_publish
(
true
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"dvr publish failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1228,7 +1228,7 @@ int SrsSource::on_reload_vhost_transcode(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
...
...
@@ -1236,7 +1236,7 @@ int SrsSource::on_reload_vhost_transcode(string vhost)
#ifdef SRS_AUTO_TRANSCODE
encoder
->
on_unpublish
();
if
((
ret
=
encoder
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
encoder
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start encoder failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1250,14 +1250,14 @@ int SrsSource::on_reload_vhost_exec(string vhost)
{
int
ret
=
ERROR_SUCCESS
;
if
(
_
req
->
vhost
!=
vhost
)
{
if
(
req
->
vhost
!=
vhost
)
{
return
ret
;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
ng_exec
->
on_unpublish
();
if
((
ret
=
ng_exec
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
ng_exec
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start exec failed. ret=%d"
,
ret
);
return
ret
;
}
...
...
@@ -1433,8 +1433,8 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
metadata
->
metadata
->
set
(
"server_version"
,
SrsAmf0Any
::
str
(
RTMP_SIG_SRS_VERSION
));
// if allow atc_auto and bravo-atc detected, open atc for vhost.
atc
=
_srs_config
->
get_atc
(
_req
->
vhost
);
if
(
_srs_config
->
get_atc_auto
(
_req
->
vhost
))
{
atc
=
_srs_config
->
get_atc
(
req
->
vhost
);
if
(
_srs_config
->
get_atc_auto
(
req
->
vhost
))
{
if
((
prop
=
metadata
->
metadata
->
get_property
(
"bravo_atc"
))
!=
NULL
)
{
if
(
prop
->
is_string
()
&&
prop
->
to_str
()
==
"true"
)
{
atc
=
true
;
...
...
@@ -1459,7 +1459,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
// when already got metadata, drop when reduce sequence header.
bool
drop_for_reduce
=
false
;
if
(
cache_metadata
&&
_srs_config
->
get_reduce_sequence_header
(
_
req
->
vhost
))
{
if
(
cache_metadata
&&
_srs_config
->
get_reduce_sequence_header
(
req
->
vhost
))
{
drop_for_reduce
=
true
;
srs_warn
(
"drop for reduce sh metadata, size=%d"
,
msg
->
size
);
}
...
...
@@ -1560,7 +1560,7 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
// whether consumer should drop for the duplicated sequence header.
bool
drop_for_reduce
=
false
;
if
(
is_sequence_header
&&
cache_sh_audio
&&
_srs_config
->
get_reduce_sequence_header
(
_
req
->
vhost
))
{
if
(
is_sequence_header
&&
cache_sh_audio
&&
_srs_config
->
get_reduce_sequence_header
(
req
->
vhost
))
{
if
(
cache_sh_audio
->
size
==
msg
->
size
)
{
drop_for_reduce
=
srs_bytes_equals
(
cache_sh_audio
->
payload
,
msg
->
payload
,
msg
->
size
);
srs_warn
(
"drop for reduce sh audio, size=%d"
,
msg
->
size
);
...
...
@@ -1583,7 +1583,7 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
// when got audio stream info.
SrsStatistic
*
stat
=
SrsStatistic
::
instance
();
if
((
ret
=
stat
->
on_audio_info
(
_
req
,
SrsCodecAudioAAC
,
sample
.
sound_rate
,
sample
.
sound_type
,
codec
.
aac_object
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
stat
->
on_audio_info
(
req
,
SrsCodecAudioAAC
,
sample
.
sound_rate
,
sample
.
sound_type
,
codec
.
aac_object
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
...
...
@@ -1600,7 +1600,7 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
if
((
ret
=
hls
->
on_audio
(
msg
))
!=
ERROR_SUCCESS
)
{
// apply the error strategy for hls.
// @see https://github.com/simple-rtmp-server/srs/issues/264
std
::
string
hls_error_strategy
=
_srs_config
->
get_hls_on_error
(
_
req
->
vhost
);
std
::
string
hls_error_strategy
=
_srs_config
->
get_hls_on_error
(
req
->
vhost
);
if
(
srs_config_hls_is_on_error_ignore
(
hls_error_strategy
))
{
srs_warn
(
"hls process audio message failed, ignore and disable hls. ret=%d"
,
ret
);
...
...
@@ -1775,7 +1775,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// whether consumer should drop for the duplicated sequence header.
bool
drop_for_reduce
=
false
;
if
(
is_sequence_header
&&
cache_sh_video
&&
_srs_config
->
get_reduce_sequence_header
(
_
req
->
vhost
))
{
if
(
is_sequence_header
&&
cache_sh_video
&&
_srs_config
->
get_reduce_sequence_header
(
req
->
vhost
))
{
if
(
cache_sh_video
->
size
==
msg
->
size
)
{
drop_for_reduce
=
srs_bytes_equals
(
cache_sh_video
->
payload
,
msg
->
payload
,
msg
->
size
);
srs_warn
(
"drop for reduce sh video, size=%d"
,
msg
->
size
);
...
...
@@ -1793,7 +1793,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/simple-rtmp-server/srs/issues/474
codec
.
avc_parse_sps
=
_srs_config
->
get_parse_sps
(
_
req
->
vhost
);
codec
.
avc_parse_sps
=
_srs_config
->
get_parse_sps
(
req
->
vhost
);
SrsCodecSample
sample
;
if
((
ret
=
codec
.
video_avc_demux
(
msg
->
payload
,
msg
->
size
,
&
sample
))
!=
ERROR_SUCCESS
)
{
...
...
@@ -1803,7 +1803,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// when got video stream info.
SrsStatistic
*
stat
=
SrsStatistic
::
instance
();
if
((
ret
=
stat
->
on_video_info
(
_
req
,
SrsCodecVideoAVC
,
codec
.
avc_profile
,
codec
.
avc_level
,
codec
.
width
,
codec
.
height
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
stat
->
on_video_info
(
req
,
SrsCodecVideoAVC
,
codec
.
avc_profile
,
codec
.
avc_level
,
codec
.
width
,
codec
.
height
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
...
...
@@ -1818,7 +1818,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
if
((
ret
=
hls
->
on_video
(
msg
,
is_sequence_header
))
!=
ERROR_SUCCESS
)
{
// apply the error strategy for hls.
// @see https://github.com/simple-rtmp-server/srs/issues/264
std
::
string
hls_error_strategy
=
_srs_config
->
get_hls_on_error
(
_
req
->
vhost
);
std
::
string
hls_error_strategy
=
_srs_config
->
get_hls_on_error
(
req
->
vhost
);
if
(
srs_config_hls_is_on_error_ignore
(
hls_error_strategy
))
{
srs_warn
(
"hls process video message failed, ignore and disable hls. ret=%d"
,
ret
);
...
...
@@ -2028,7 +2028,7 @@ int SrsSource::on_publish()
int
ret
=
ERROR_SUCCESS
;
// update the request object.
srs_assert
(
_
req
);
srs_assert
(
req
);
_can_publish
=
false
;
...
...
@@ -2051,49 +2051,48 @@ int SrsSource::on_publish()
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_TRANSCODE
if
((
ret
=
encoder
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
encoder
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start encoder failed. ret=%d"
,
ret
);
return
ret
;
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HLS
if
((
ret
=
hls
->
on_publish
(
_req
,
false
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
hls
->
on_publish
(
false
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start hls failed. ret=%d"
,
ret
);
return
ret
;
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_DVR
if
((
ret
=
dvr
->
on_publish
(
_req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
dvr
->
on_publish
(
false
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start dvr failed. ret=%d"
,
ret
);
return
ret
;
}
#endif
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HDS
if
((
ret
=
hds
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
hds
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start hds failed. ret=%d"
,
ret
);
return
ret
;
}
#endif
// TODO: FIXME: use initialize to set req.
if
((
ret
=
ng_exec
->
on_publish
(
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
ng_exec
->
on_publish
(
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"start exec failed. ret=%d"
,
ret
);
return
ret
;
}
// notify the handler.
srs_assert
(
handler
);
if
((
ret
=
handler
->
on_publish
(
this
,
_
req
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
handler
->
on_publish
(
this
,
req
))
!=
ERROR_SUCCESS
)
{
srs_error
(
"handle on publish failed. ret=%d"
,
ret
);
return
ret
;
}
SrsStatistic
*
stat
=
SrsStatistic
::
instance
();
stat
->
on_stream_publish
(
_
req
,
_source_id
);
stat
->
on_stream_publish
(
req
,
_source_id
);
return
ret
;
}
...
...
@@ -2135,8 +2134,8 @@ void SrsSource::on_unpublish()
// notify the handler.
srs_assert
(
handler
);
SrsStatistic
*
stat
=
SrsStatistic
::
instance
();
stat
->
on_stream_close
(
_req
);
handler
->
on_unpublish
(
this
,
_req
);
stat
->
on_stream_close
(
req
);
handler
->
on_unpublish
(
this
,
req
);
}
int
SrsSource
::
create_consumer
(
SrsConsumer
*&
consumer
,
bool
ds
,
bool
dm
,
bool
dg
)
...
...
@@ -2146,7 +2145,7 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg
consumer
=
new
SrsConsumer
(
this
);
consumers
.
push_back
(
consumer
);
double
queue_size
=
_srs_config
->
get_queue_length
(
_
req
->
vhost
);
double
queue_size
=
_srs_config
->
get_queue_length
(
req
->
vhost
);
consumer
->
set_queue_size
(
queue_size
);
// if atc, update the sequence header to gop cache time.
...
...
@@ -2197,7 +2196,7 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg
}
// for edge, when play edge stream, check the state
if
(
_srs_config
->
get_vhost_is_edge
(
_
req
->
vhost
))
{
if
(
_srs_config
->
get_vhost_is_edge
(
req
->
vhost
))
{
// notice edge to start for the first client.
if
((
ret
=
play_edge
->
on_client_play
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"notice edge start play stream failed. ret=%d"
,
ret
);
...
...
@@ -2251,11 +2250,11 @@ int SrsSource::create_forwarders()
{
int
ret
=
ERROR_SUCCESS
;
if
(
_srs_config
->
get_forward_enabled
(
_
req
->
vhost
))
{
if
(
_srs_config
->
get_forward_enabled
(
req
->
vhost
))
{
return
ret
;
}
SrsConfDirective
*
conf
=
_srs_config
->
get_forwards
(
_
req
->
vhost
);
SrsConfDirective
*
conf
=
_srs_config
->
get_forwards
(
req
->
vhost
);
for
(
int
i
=
0
;
conf
&&
i
<
(
int
)
conf
->
args
.
size
();
i
++
)
{
std
::
string
forward_server
=
conf
->
args
.
at
(
i
);
...
...
@@ -2263,17 +2262,17 @@ int SrsSource::create_forwarders()
forwarders
.
push_back
(
forwarder
);
// initialize the forwarder with request.
if
((
ret
=
forwarder
->
initialize
(
_
req
,
forward_server
))
!=
ERROR_SUCCESS
)
{
if
((
ret
=
forwarder
->
initialize
(
req
,
forward_server
))
!=
ERROR_SUCCESS
)
{
return
ret
;
}
double
queue_size
=
_srs_config
->
get_queue_length
(
_
req
->
vhost
);
double
queue_size
=
_srs_config
->
get_queue_length
(
req
->
vhost
);
forwarder
->
set_queue_size
(
queue_size
);
if
((
ret
=
forwarder
->
on_publish
())
!=
ERROR_SUCCESS
)
{
srs_error
(
"start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s"
,
_req
->
vhost
.
c_str
(),
_req
->
app
.
c_str
(),
_
req
->
stream
.
c_str
(),
req
->
vhost
.
c_str
(),
req
->
app
.
c_str
(),
req
->
stream
.
c_str
(),
forward_server
.
c_str
());
return
ret
;
}
...
...
trunk/src/app/srs_app_source.hpp
查看文件 @
2a1b2b6
...
...
@@ -451,7 +451,7 @@ private:
// invoke the on_source_id_changed() to let all clients know.
int
_source_id
;
// deep copy of client request.
SrsRequest
*
_
req
;
SrsRequest
*
req
;
// to delivery stream to clients.
std
::
vector
<
SrsConsumer
*>
consumers
;
// the time jitter algorithm for vhost.
...
...
trunk/src/core/srs_core.hpp
查看文件 @
2a1b2b6
...
...
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
#define VERSION_REVISION
4
#define VERSION_REVISION
3
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
...
...
请
注册
或
登录
后发表评论