zhengfl

refine code:

优化 判断重复推流 及 推流流程
... ... @@ -414,8 +414,6 @@ int SrsRtmpConn::stream_service_cycle()
}
srs_info("set chunk_size=%d success", chunk_size);
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
// find a source to serve.
SrsSource* source = SrsSource::fetch(req);
if (!source) {
... ... @@ -432,19 +430,7 @@ int SrsRtmpConn::stream_service_cycle()
return ret;
}
// check ASAP, to fail it faster if invalid.
if (type != SrsRtmpConnPlay) {
// check publish available
if (!source->can_publish(vhost_is_edge)) {
ret = ERROR_SYSTEM_STREAM_BUSY;
srs_warn("stream %s is already publishing. ret=%d",
req->get_stream_url().c_str(), ret);
// to delay request
st_usleep(SRS_STREAM_BUSY_SLEEP_US);
return ret;
}
}
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
... ... @@ -479,19 +465,7 @@ int SrsRtmpConn::stream_service_cycle()
return ret;
}
if (!vhost_is_edge) {
if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
return ret;
}
}
ret = fmle_publishing(source);
if (!vhost_is_edge) {
source->release_publish();
}
return ret;
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
srs_verbose("flash start to publish stream %s.", req->stream.c_str());
... ... @@ -501,19 +475,7 @@ int SrsRtmpConn::stream_service_cycle()
return ret;
}
if (!vhost_is_edge) {
if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
return ret;
}
}
ret = flash_publishing(source);
if (!vhost_is_edge) {
source->release_publish();
}
return ret;
return publishing(source);
}
default: {
ret = ERROR_SYSTEM_CLIENT_INVALID;
... ... @@ -767,52 +729,23 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
return ret;
}
int SrsRtmpConn::fmle_publishing(SrsSource* source)
int SrsRtmpConn::publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
srs_error("http hook on_publish failed. ret=%d", ret);
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
srs_error("check publish_refer failed. ret=%d", ret);
return ret;
}
// use isolate thread to recv,
// @see: https://github.com/simple-rtmp-server/srs/issues/237
SrsPublishRecvThread trd(rtmp, req,
st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &trd);
// stop isolate recv thread
trd.stop();
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
if (vhost_is_edge) {
source->on_edge_proxy_unpublish();
} else {
source->on_unpublish();
}
http_hooks_on_unpublish();
return ret;
}
int SrsRtmpConn::flash_publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
srs_verbose("check publish_refer success.");
if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
srs_error("http hook on_publish failed. ret=%d", ret);
return ret;
}
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
// use isolate thread to recv,
// @see: https://github.com/simple-rtmp-server/srs/issues/237
SrsPublishRecvThread trd(rtmp, req,
... ... @@ -824,12 +757,7 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
// stop isolate recv thread
trd.stop();
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
if (vhost_is_edge) {
source->on_edge_proxy_unpublish();
} else {
source->on_unpublish();
release_publish(source, vhost_is_edge);
}
http_hooks_on_unpublish();
... ... @@ -841,32 +769,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{
int ret = ERROR_SUCCESS;
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
srs_error("check publish_refer failed. ret=%d", ret);
return ret;
}
srs_verbose("check publish_refer success.");
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
// when edge, ignore the publish event, directly proxy it.
if (!vhost_is_edge) {
// notify the hls to prepare when publish start.
if ((ret = source->on_publish()) != ERROR_SUCCESS) {
srs_error("hls on_publish failed. ret=%d", ret);
return ret;
}
srs_verbose("hls on_publish success.");
} else {
if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
srs_error("notice edge start publish stream failed. ret=%d", ret);
return ret;
}
}
// start isolate recv thread.
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret);
... ... @@ -914,6 +819,43 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
return ret;
}
int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
{
int ret = ERROR_SUCCESS;
if (!source->can_publish(is_edge)) {
ret = ERROR_SYSTEM_STREAM_BUSY;
srs_warn("stream %s is already publishing. ret=%d",
req->get_stream_url().c_str(), ret);
return ret;
}
// when edge, ignore the publish event, directly proxy it.
if (is_edge) {
if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
srs_error("notice edge start publish stream failed. ret=%d", ret);
}
return ret;
} else {
if ((ret = source->on_publish()) != ERROR_SUCCESS) {
srs_error("notify publish failed. ret=%d", ret);
}
}
return ret;
}
void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge)
{
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
if (is_edge) {
source->on_edge_proxy_unpublish();
} else {
source->on_unpublish();
}
}
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -111,9 +111,10 @@ private:
virtual int check_vhost();
virtual int playing(SrsSource* source);
virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd);
virtual int fmle_publishing(SrsSource* source);
virtual int flash_publishing(SrsSource* source);
virtual int publishing(SrsSource* source);
virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
virtual int acquire_publish(SrsSource* source, bool is_edge);
virtual void release_publish(SrsSource* source, bool is_edge);
virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
... ...
... ... @@ -1958,26 +1958,6 @@ int SrsSource::on_aggregate(SrsCommonMessage* msg)
return ret;
}
int SrsSource::acquire_publish()
{
int ret = ERROR_SUCCESS;
if (!_can_publish) {
ret = ERROR_SYSTEM_STREAM_BUSY;
srs_warn("publish lock stream failed, ret=%d", ret);
return ret;
}
_can_publish = false;
return ret;
}
void SrsSource::release_publish()
{
_can_publish = true;
}
int SrsSource::on_publish()
{
int ret = ERROR_SUCCESS;
... ...
... ... @@ -550,13 +550,6 @@ private:
public:
virtual int on_aggregate(SrsCommonMessage* msg);
/**
* the pre-publish is we are very sure we are
* trying to publish stream, please lock the resource,
* and we use release_publish() to release the resource.
*/
virtual int acquire_publish();
virtual void release_publish();
/**
* publish stream event notify.
* @param _req the request from client, the source will deep copy it,
* for when reload the request of client maybe invalid.
... ...