zhengfl

fix #438

问题:edge模式,推流时异常断开。
    解决方法:增加edge模式推流检测。
@@ -806,6 +806,11 @@ int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req) @@ -806,6 +806,11 @@ int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
806 return ret; 806 return ret;
807 } 807 }
808 808
  809 +bool SrsPublishEdge::can_publish()
  810 +{
  811 + return state != SrsEdgeStatePublish;
  812 +}
  813 +
809 int SrsPublishEdge::on_client_publish() 814 int SrsPublishEdge::on_client_publish()
810 { 815 {
811 int ret = ERROR_SUCCESS; 816 int ret = ERROR_SUCCESS;
@@ -207,6 +207,7 @@ public: @@ -207,6 +207,7 @@ public:
207 virtual void set_queue_size(double queue_size); 207 virtual void set_queue_size(double queue_size);
208 public: 208 public:
209 virtual int initialize(SrsSource* source, SrsRequest* req); 209 virtual int initialize(SrsSource* source, SrsRequest* req);
  210 + virtual bool can_publish();
210 /** 211 /**
211 * when client publish stream on edge. 212 * when client publish stream on edge.
212 */ 213 */
@@ -424,6 +424,13 @@ int SrsRtmpConn::stream_service_cycle() @@ -424,6 +424,13 @@ int SrsRtmpConn::stream_service_cycle()
424 } 424 }
425 srs_assert(source != NULL); 425 srs_assert(source != NULL);
426 426
  427 + // check ASAP, to fail it faster if invalid.
  428 + if (type != SrsRtmpConnPlay) {
  429 + if ((ret = prepare_publish(source, vhost_is_edge)) != ERROR_SUCCESS) {
  430 + return ret;
  431 + }
  432 + }
  433 +
427 // update the statistic when source disconveried. 434 // update the statistic when source disconveried.
428 SrsStatistic* stat = SrsStatistic::instance(); 435 SrsStatistic* stat = SrsStatistic::instance();
429 if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) { 436 if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) {
@@ -431,20 +438,6 @@ int SrsRtmpConn::stream_service_cycle() @@ -431,20 +438,6 @@ int SrsRtmpConn::stream_service_cycle()
431 return ret; 438 return ret;
432 } 439 }
433 440
434 - // check ASAP, to fail it faster if invalid.  
435 - if (type != SrsRtmpConnPlay && !vhost_is_edge) {  
436 - // check publish available  
437 - // for edge, never check it, for edge use proxy mode.  
438 - if (!source->can_publish()) {  
439 - ret = ERROR_SYSTEM_STREAM_BUSY;  
440 - srs_warn("stream %s is already publishing. ret=%d",  
441 - req->get_stream_url().c_str(), ret);  
442 - // to delay request  
443 - st_usleep(SRS_STREAM_BUSY_SLEEP_US);  
444 - return ret;  
445 - }  
446 - }  
447 -  
448 bool enabled_cache = _srs_config->get_gop_cache(req->vhost); 441 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
449 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", 442 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
450 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 443 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
@@ -1260,6 +1253,31 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client) @@ -1260,6 +1253,31 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
1260 return ret; 1253 return ret;
1261 } 1254 }
1262 1255
  1256 +int SrsRtmpConn::prepare_publish(SrsSource* source, bool vhost_is_edge)
  1257 +{
  1258 + int ret = ERROR_SUCCESS;
  1259 + srs_assert(source);
  1260 +
  1261 + // check publish available
  1262 + bool can_publish = false;
  1263 + if (vhost_is_edge) {
  1264 + can_publish = source->proxy_can_publish();
  1265 + } else {
  1266 + can_publish = source->can_publish();
  1267 + }
  1268 +
  1269 + if (!can_publish) {
  1270 + ret = ERROR_SYSTEM_STREAM_BUSY;
  1271 + srs_warn("stream %s is already publishing. ret=%d",
  1272 + req->get_stream_url().c_str(), ret);
  1273 + // to delay request
  1274 + st_usleep(SRS_STREAM_BUSY_SLEEP_US);
  1275 + return ret;
  1276 + }
  1277 +
  1278 + return ret;
  1279 +}
  1280 +
1263 int SrsRtmpConn::http_hooks_on_connect() 1281 int SrsRtmpConn::http_hooks_on_connect()
1264 { 1282 {
1265 int ret = ERROR_SUCCESS; 1283 int ret = ERROR_SUCCESS;
@@ -123,6 +123,7 @@ private: @@ -123,6 +123,7 @@ private:
123 virtual int check_edge_token_traverse_auth(); 123 virtual int check_edge_token_traverse_auth();
124 virtual int connect_server(int origin_index, st_netfd_t* pstsock); 124 virtual int connect_server(int origin_index, st_netfd_t* pstsock);
125 virtual int do_token_traverse_auth(SrsRtmpClient* client); 125 virtual int do_token_traverse_auth(SrsRtmpClient* client);
  126 + virtual int prepare_publish(SrsSource* source, bool vhost_is_edge);
126 private: 127 private:
127 virtual int http_hooks_on_connect(); 128 virtual int http_hooks_on_connect();
128 virtual void http_hooks_on_close(); 129 virtual void http_hooks_on_close();
@@ -1355,6 +1355,11 @@ bool SrsSource::can_publish() @@ -1355,6 +1355,11 @@ bool SrsSource::can_publish()
1355 return _can_publish; 1355 return _can_publish;
1356 } 1356 }
1357 1357
  1358 +bool SrsSource::proxy_can_publish()
  1359 +{
  1360 + return publish_edge->can_publish();
  1361 +}
  1362 +
1358 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) 1363 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
1359 { 1364 {
1360 int ret = ERROR_SUCCESS; 1365 int ret = ERROR_SUCCESS;
@@ -538,6 +538,7 @@ public: @@ -538,6 +538,7 @@ public:
538 // logic data methods 538 // logic data methods
539 public: 539 public:
540 virtual bool can_publish(); 540 virtual bool can_publish();
  541 + virtual bool proxy_can_publish();
541 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 542 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
542 public: 543 public:
543 virtual int on_audio(SrsCommonMessage* audio); 544 virtual int on_audio(SrsCommonMessage* audio);