winlin

fix #57, use lock(acquire/release publish) to avoid duplicated publishing. 0.9.188.

@@ -207,6 +207,7 @@ Supported operating systems and hardware: @@ -207,6 +207,7 @@ Supported operating systems and hardware:
207 * 2013-10-17, Created.<br/> 207 * 2013-10-17, Created.<br/>
208 208
209 ## History 209 ## History
  210 +* v1.0, 2014-08-03, fix [#57](https://github.com/winlinvip/simple-rtmp-server/issues/57), use lock(acquire/release publish) to avoid duplicated publishing. 0.9.188.
210 * v1.0, 2014-08-03, fix [#85](https://github.com/winlinvip/simple-rtmp-server/issues/85), fix the segment-dvr sequence header missing. 0.9.187. 211 * v1.0, 2014-08-03, fix [#85](https://github.com/winlinvip/simple-rtmp-server/issues/85), fix the segment-dvr sequence header missing. 0.9.187.
211 * v1.0, 2014-08-03, fix [#145](https://github.com/winlinvip/simple-rtmp-server/issues/145), refine ffmpeg log, check abitrate for libaacplus. 0.9.186. 212 * v1.0, 2014-08-03, fix [#145](https://github.com/winlinvip/simple-rtmp-server/issues/145), refine ffmpeg log, check abitrate for libaacplus. 0.9.186.
212 * v1.0, 2014-08-03, fix [#143](https://github.com/winlinvip/simple-rtmp-server/issues/143), fix retrieve sys stat bug for all linux. 0.9.185. 213 * v1.0, 2014-08-03, fix [#143](https://github.com/winlinvip/simple-rtmp-server/issues/143), fix retrieve sys stat bug for all linux. 0.9.185.
@@ -74,6 +74,9 @@ using namespace std; @@ -74,6 +74,9 @@ using namespace std;
74 // when edge timeout, retry next. 74 // when edge timeout, retry next.
75 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) 75 #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
76 76
  77 +// to get msgs then totally send out.
  78 +#define SYS_MAX_PLAY_SEND_MSGS 128
  79 +
77 SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) 80 SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
78 : SrsConnection(srs_server, client_stfd) 81 : SrsConnection(srs_server, client_stfd)
79 { 82 {
@@ -318,10 +321,11 @@ int SrsRtmpConn::stream_service_cycle() @@ -318,10 +321,11 @@ int SrsRtmpConn::stream_service_cycle()
318 } 321 }
319 srs_assert(source != NULL); 322 srs_assert(source != NULL);
320 323
321 - // check publish available  
322 - // for edge, never check it, for edge use proxy mode.  
323 - if (!vhost_is_edge) {  
324 - if (type != SrsRtmpConnPlay && !source->can_publish()) { 324 + // check ASAP, to fail it faster if invalid.
  325 + if (type != SrsRtmpConnPlay && !vhost_is_edge) {
  326 + // check publish available
  327 + // for edge, never check it, for edge use proxy mode.
  328 + if (!source->can_publish()) {
325 ret = ERROR_SYSTEM_STREAM_BUSY; 329 ret = ERROR_SYSTEM_STREAM_BUSY;
326 srs_warn("stream %s is already publishing. ret=%d", 330 srs_warn("stream %s is already publishing. ret=%d",
327 req->get_stream_url().c_str(), ret); 331 req->get_stream_url().c_str(), ret);
@@ -379,23 +383,18 @@ int SrsRtmpConn::stream_service_cycle() @@ -379,23 +383,18 @@ int SrsRtmpConn::stream_service_cycle()
379 return ret; 383 return ret;
380 } 384 }
381 385
382 - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {  
383 - srs_error("http hook on_publish failed. ret=%d", ret);  
384 - return ret; 386 + if (!vhost_is_edge) {
  387 + if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
  388 + return ret;
  389 + }
385 } 390 }
386 -  
387 - srs_info("start to publish stream %s success", req->stream.c_str()); 391 +
388 ret = fmle_publishing(source); 392 ret = fmle_publishing(source);
389 -  
390 - // when edge, notice edge to change state.  
391 - // when origin, notice all service to unpublish.  
392 - if (vhost_is_edge) {  
393 - source->on_edge_proxy_unpublish();  
394 - } else {  
395 - source->on_unpublish(); 393 +
  394 + if (!vhost_is_edge) {
  395 + source->release_publish();
396 } 396 }
397 -  
398 - http_hooks_on_unpublish(); 397 +
399 return ret; 398 return ret;
400 } 399 }
401 case SrsRtmpConnFlashPublish: { 400 case SrsRtmpConnFlashPublish: {
@@ -413,23 +412,18 @@ int SrsRtmpConn::stream_service_cycle() @@ -413,23 +412,18 @@ int SrsRtmpConn::stream_service_cycle()
413 return ret; 412 return ret;
414 } 413 }
415 414
416 - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {  
417 - srs_error("http hook on_publish failed. ret=%d", ret);  
418 - return ret; 415 + if (!vhost_is_edge) {
  416 + if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {
  417 + return ret;
  418 + }
419 } 419 }
420 420
421 - srs_info("flash start to publish stream %s success", req->stream.c_str());  
422 ret = flash_publishing(source); 421 ret = flash_publishing(source);
423 -  
424 - // when edge, notice edge to change state.  
425 - // when origin, notice all service to unpublish.  
426 - if (vhost_is_edge) {  
427 - source->on_edge_proxy_unpublish();  
428 - } else {  
429 - source->on_unpublish(); 422 +
  423 + if (!vhost_is_edge) {
  424 + source->release_publish();
430 } 425 }
431 426
432 - http_hooks_on_unpublish();  
433 return ret; 427 return ret;
434 } 428 }
435 default: { 429 default: {
@@ -479,8 +473,6 @@ int SrsRtmpConn::check_vhost() @@ -479,8 +473,6 @@ int SrsRtmpConn::check_vhost()
479 return ret; 473 return ret;
480 } 474 }
481 475
482 -#define SYS_MAX_PLAY_SEND_MSGS 128  
483 -  
484 int SrsRtmpConn::playing(SrsSource* source) 476 int SrsRtmpConn::playing(SrsSource* source)
485 { 477 {
486 int ret = ERROR_SUCCESS; 478 int ret = ERROR_SUCCESS;
@@ -605,6 +597,33 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) @@ -605,6 +597,33 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source)
605 { 597 {
606 int ret = ERROR_SUCCESS; 598 int ret = ERROR_SUCCESS;
607 599
  600 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
  601 +
  602 + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
  603 + srs_error("http hook on_publish failed. ret=%d", ret);
  604 + return ret;
  605 + }
  606 +
  607 + srs_info("start to publish stream %s success", req->stream.c_str());
  608 + ret = do_fmle_publishing(source);
  609 +
  610 + // when edge, notice edge to change state.
  611 + // when origin, notice all service to unpublish.
  612 + if (vhost_is_edge) {
  613 + source->on_edge_proxy_unpublish();
  614 + } else {
  615 + source->on_unpublish();
  616 + }
  617 +
  618 + http_hooks_on_unpublish();
  619 +
  620 + return ret;
  621 +}
  622 +
  623 +int SrsRtmpConn::do_fmle_publishing(SrsSource* source)
  624 +{
  625 + int ret = ERROR_SUCCESS;
  626 +
608 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { 627 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
609 srs_error("fmle check publish_refer failed. ret=%d", ret); 628 srs_error("fmle check publish_refer failed. ret=%d", ret);
610 return ret; 629 return ret;
@@ -684,6 +703,33 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -684,6 +703,33 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
684 { 703 {
685 int ret = ERROR_SUCCESS; 704 int ret = ERROR_SUCCESS;
686 705
  706 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
  707 +
  708 + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
  709 + srs_error("http hook on_publish failed. ret=%d", ret);
  710 + return ret;
  711 + }
  712 +
  713 + srs_info("flash start to publish stream %s success", req->stream.c_str());
  714 + ret = do_flash_publishing(source);
  715 +
  716 + // when edge, notice edge to change state.
  717 + // when origin, notice all service to unpublish.
  718 + if (vhost_is_edge) {
  719 + source->on_edge_proxy_unpublish();
  720 + } else {
  721 + source->on_unpublish();
  722 + }
  723 +
  724 + http_hooks_on_unpublish();
  725 +
  726 + return ret;
  727 +}
  728 +
  729 +int SrsRtmpConn::do_flash_publishing(SrsSource* source)
  730 +{
  731 + int ret = ERROR_SUCCESS;
  732 +
687 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { 733 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
688 srs_error("flash check publish_refer failed. ret=%d", ret); 734 srs_error("flash check publish_refer failed. ret=%d", ret);
689 return ret; 735 return ret;
@@ -89,7 +89,9 @@ private: @@ -89,7 +89,9 @@ private:
89 virtual int check_vhost(); 89 virtual int check_vhost();
90 virtual int playing(SrsSource* source); 90 virtual int playing(SrsSource* source);
91 virtual int fmle_publishing(SrsSource* source); 91 virtual int fmle_publishing(SrsSource* source);
  92 + virtual int do_fmle_publishing(SrsSource* source);
92 virtual int flash_publishing(SrsSource* source); 93 virtual int flash_publishing(SrsSource* source);
  94 + virtual int do_flash_publishing(SrsSource* source);
93 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); 95 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
94 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); 96 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
95 private: 97 private:
@@ -1347,6 +1347,26 @@ int SrsSource::on_aggregate(SrsMessage* msg) @@ -1347,6 +1347,26 @@ int SrsSource::on_aggregate(SrsMessage* msg)
1347 return ret; 1347 return ret;
1348 } 1348 }
1349 1349
  1350 +int SrsSource::acquire_publish()
  1351 +{
  1352 + int ret = ERROR_SUCCESS;
  1353 +
  1354 + if (!_can_publish) {
  1355 + ret = ERROR_SYSTEM_STREAM_BUSY;
  1356 + srs_warn("publish lock stream failed, ret=%d", ret);
  1357 + return ret;
  1358 + }
  1359 +
  1360 + _can_publish = false;
  1361 +
  1362 + return ret;
  1363 +}
  1364 +
  1365 +void SrsSource::release_publish()
  1366 +{
  1367 + _can_publish = true;
  1368 +}
  1369 +
1350 int SrsSource::on_publish() 1370 int SrsSource::on_publish()
1351 { 1371 {
1352 int ret = ERROR_SUCCESS; 1372 int ret = ERROR_SUCCESS;
@@ -369,6 +369,13 @@ public: @@ -369,6 +369,13 @@ public:
369 virtual int on_video(SrsMessage* video); 369 virtual int on_video(SrsMessage* video);
370 virtual int on_aggregate(SrsMessage* msg); 370 virtual int on_aggregate(SrsMessage* msg);
371 /** 371 /**
  372 + * the pre-publish is we are very sure we are
  373 + * trying to publish stream, please lock the resource,
  374 + * and we use release_publish() to release the resource.
  375 + */
  376 + virtual int acquire_publish();
  377 + virtual void release_publish();
  378 + /**
372 * publish stream event notify. 379 * publish stream event notify.
373 * @param _req the request from client, the source will deep copy it, 380 * @param _req the request from client, the source will deep copy it,
374 * for when reload the request of client maybe invalid. 381 * for when reload the request of client maybe invalid.
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "187" 34 +#define VERSION_REVISION "188"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"