zhengfl

rewrite #438

@@ -121,6 +121,7 @@ void SrsEdgeIngester::stop() @@ -121,6 +121,7 @@ void SrsEdgeIngester::stop()
121 int SrsEdgeIngester::cycle() 121 int SrsEdgeIngester::cycle()
122 { 122 {
123 int ret = ERROR_SUCCESS; 123 int ret = ERROR_SUCCESS;
  124 +
124 _source->on_source_id_changed(_srs_context->get_id()); 125 _source->on_source_id_changed(_srs_context->get_id());
125 126
126 std::string ep_server, ep_port; 127 std::string ep_server, ep_port;
@@ -384,8 +385,8 @@ int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port) @@ -384,8 +385,8 @@ int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port)
384 385
385 kbps->set_io(io, io); 386 kbps->set_io(io, io);
386 387
387 - srs_trace("edge pull connected, can_publish=%d, url=%s/%s, server=%s:%d",  
388 - _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port); 388 + srs_trace("edge pull connected, url=%s/%s, server=%s:%d",
  389 + _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port);
389 390
390 return ret; 391 return ret;
391 } 392 }
@@ -424,13 +424,6 @@ int SrsRtmpConn::stream_service_cycle() @@ -424,13 +424,6 @@ int SrsRtmpConn::stream_service_cycle()
424 } 424 }
425 srs_assert(source != NULL); 425 srs_assert(source != NULL);
426 426
427 - // check ASAP, to fail it faster if invalid.  
428 - if (type != SrsRtmpConnPlay) {  
429 - if ((ret = prepare_publish(source, vhost_is_edge)) != ERROR_SUCCESS) {  
430 - return ret;  
431 - }  
432 - }  
433 -  
434 // update the statistic when source disconveried. 427 // update the statistic when source disconveried.
435 SrsStatistic* stat = SrsStatistic::instance(); 428 SrsStatistic* stat = SrsStatistic::instance();
436 if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) { 429 if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) {
@@ -438,6 +431,19 @@ int SrsRtmpConn::stream_service_cycle() @@ -438,6 +431,19 @@ int SrsRtmpConn::stream_service_cycle()
438 return ret; 431 return ret;
439 } 432 }
440 433
  434 + // check ASAP, to fail it faster if invalid.
  435 + if (type != SrsRtmpConnPlay) {
  436 + // check publish available
  437 + if (!source->can_publish(vhost_is_edge)) {
  438 + ret = ERROR_SYSTEM_STREAM_BUSY;
  439 + srs_warn("stream %s is already publishing. ret=%d",
  440 + req->get_stream_url().c_str(), ret);
  441 + // to delay request
  442 + st_usleep(SRS_STREAM_BUSY_SLEEP_US);
  443 + return ret;
  444 + }
  445 + }
  446 +
441 bool enabled_cache = _srs_config->get_gop_cache(req->vhost); 447 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
442 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", 448 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
443 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 449 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
@@ -1253,31 +1259,6 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client) @@ -1253,31 +1259,6 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
1253 return ret; 1259 return ret;
1254 } 1260 }
1255 1261
1256 -int SrsRtmpConn::prepare_publish(SrsSource* source, bool vhost_is_edge)  
1257 -{  
1258 - int ret = ERROR_SUCCESS;  
1259 - srs_assert(source);  
1260 -  
1261 - // check publish available  
1262 - bool can_publish = false;  
1263 - if (vhost_is_edge) {  
1264 - can_publish = source->proxy_can_publish();  
1265 - } else {  
1266 - can_publish = source->can_publish();  
1267 - }  
1268 -  
1269 - if (!can_publish) {  
1270 - ret = ERROR_SYSTEM_STREAM_BUSY;  
1271 - srs_warn("stream %s is already publishing. ret=%d",  
1272 - req->get_stream_url().c_str(), ret);  
1273 - // to delay request  
1274 - st_usleep(SRS_STREAM_BUSY_SLEEP_US);  
1275 - return ret;  
1276 - }  
1277 -  
1278 - return ret;  
1279 -}  
1280 -  
1281 int SrsRtmpConn::http_hooks_on_connect() 1262 int SrsRtmpConn::http_hooks_on_connect()
1282 { 1263 {
1283 int ret = ERROR_SUCCESS; 1264 int ret = ERROR_SUCCESS;
@@ -123,7 +123,6 @@ private: @@ -123,7 +123,6 @@ private:
123 virtual int check_edge_token_traverse_auth(); 123 virtual int check_edge_token_traverse_auth();
124 virtual int connect_server(int origin_index, st_netfd_t* pstsock); 124 virtual int connect_server(int origin_index, st_netfd_t* pstsock);
125 virtual int do_token_traverse_auth(SrsRtmpClient* client); 125 virtual int do_token_traverse_auth(SrsRtmpClient* client);
126 - virtual int prepare_publish(SrsSource* source, bool vhost_is_edge);  
127 private: 126 private:
128 virtual int http_hooks_on_connect(); 127 virtual int http_hooks_on_connect();
129 virtual void http_hooks_on_close(); 128 virtual void http_hooks_on_close();
@@ -1350,14 +1350,13 @@ int SrsSource::source_id() @@ -1350,14 +1350,13 @@ int SrsSource::source_id()
1350 return _source_id; 1350 return _source_id;
1351 } 1351 }
1352 1352
1353 -bool SrsSource::can_publish()  
1354 -{  
1355 - return _can_publish;  
1356 -}  
1357 -  
1358 -bool SrsSource::proxy_can_publish() 1353 +bool SrsSource::can_publish(bool is_edge)
1359 { 1354 {
  1355 + if (is_edge) {
1360 return publish_edge->can_publish(); 1356 return publish_edge->can_publish();
  1357 + }
  1358 +
  1359 + return _can_publish;
1361 } 1360 }
1362 1361
1363 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) 1362 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
@@ -537,8 +537,7 @@ public: @@ -537,8 +537,7 @@ public:
537 virtual int source_id(); 537 virtual int source_id();
538 // logic data methods 538 // logic data methods
539 public: 539 public:
540 - virtual bool can_publish();  
541 - virtual bool proxy_can_publish(); 540 + virtual bool can_publish(bool is_edge);
542 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 541 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
543 public: 542 public:
544 virtual int on_audio(SrsCommonMessage* audio); 543 virtual int on_audio(SrsCommonMessage* audio);