winlin

Merge branch '2.0release' into develop

@@ -84,9 +84,8 @@ void SrsHttpHeartbeat::heartbeat() @@ -84,9 +84,8 @@ void SrsHttpHeartbeat::heartbeat()
84 84
85 ISrsHttpMessage* msg = NULL; 85 ISrsHttpMessage* msg = NULL;
86 if ((ret = http.post(uri.get_path(), req, &msg)) != ERROR_SUCCESS) { 86 if ((ret = http.post(uri.get_path(), req, &msg)) != ERROR_SUCCESS) {
87 - srs_info("http post hartbeart uri failed. "  
88 - "url=%s, request=%s, response=%s, ret=%d",  
89 - url.c_str(), req.c_str(), res.c_str(), ret); 87 + srs_info("http post hartbeart uri failed. url=%s, request=%s, ret=%d",
  88 + url.c_str(), req.c_str(), ret);
90 return; 89 return;
91 } 90 }
92 SrsAutoFree(ISrsHttpMessage, msg); 91 SrsAutoFree(ISrsHttpMessage, msg);
@@ -96,9 +95,8 @@ void SrsHttpHeartbeat::heartbeat() @@ -96,9 +95,8 @@ void SrsHttpHeartbeat::heartbeat()
96 return; 95 return;
97 } 96 }
98 97
99 - srs_info("http hook hartbeart success. "  
100 - "url=%s, request=%s, status_code=%d, response=%s, ret=%d",  
101 - url.c_str(), req.c_str(), status_code, res.c_str(), ret); 98 + srs_info("http hook hartbeart success. url=%s, request=%s, response=%s, ret=%d",
  99 + url.c_str(), req.c_str(), res.c_str(), ret);
102 100
103 return; 101 return;
104 } 102 }
@@ -187,7 +187,7 @@ int SrsHttpClient::connect() @@ -187,7 +187,7 @@ int SrsHttpClient::connect()
187 host.c_str(), port, timeout_us, ret); 187 host.c_str(), port, timeout_us, ret);
188 return ret; 188 return ret;
189 } 189 }
190 - srs_info("connect to server success. server=%s, port=%d", host, port); 190 + srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
191 191
192 srs_assert(!skt); 192 srs_assert(!skt);
193 skt = new SrsStSocket(stfd); 193 skt = new SrsStSocket(stfd);
@@ -134,7 +134,7 @@ int SrsStreamCache::cycle() @@ -134,7 +134,7 @@ int SrsStreamCache::cycle()
134 } 134 }
135 135
136 if (count <= 0) { 136 if (count <= 0) {
137 - srs_info("http: mw sleep %dms for no msg", mw_sleep); 137 + srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
138 // directly use sleep, donot use consumer wait. 138 // directly use sleep, donot use consumer wait.
139 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 139 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
140 140
@@ -522,7 +522,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -522,7 +522,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
522 } 522 }
523 523
524 if (count <= 0) { 524 if (count <= 0) {
525 - srs_info("http: mw sleep %dms for no msg", mw_sleep); 525 + srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
526 // directly use sleep, donot use consumer wait. 526 // directly use sleep, donot use consumer wait.
527 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 527 st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
528 528
@@ -55,22 +55,25 @@ public: @@ -55,22 +55,25 @@ public:
55 }; 55 };
56 56
57 /** 57 /**
58 -* the stage is used for a collection of object to do print,  
59 -* the print time in a stage is constant and not changed.  
60 -* for example, stage #1 for all play clients, print time is 3s,  
61 -* if there is 10clients, then all clients should print in 10*3s.  
62 -* Usage:  
63 - SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();  
64 - SrsAutoFree(SrsPithyPrint, pprint);  
65 - while (true) {  
66 - pprint->elapse();  
67 - if (pprint->can_print()) {  
68 - // print pithy message.  
69 - // user can get the elapse time by: pprint->age()  
70 - }  
71 - // read and write RTMP messages.  
72 - }  
73 -*/ 58 + * the stage is used for a collection of object to do print,
  59 + * the print time in a stage is constant and not changed,
  60 + * that is, we always got one message to print every specified time.
  61 + *
  62 + * for example, stage #1 for all play clients, print time is 3s,
  63 + * if there is 1client, it will print every 3s.
  64 + * if there is 10clients, random select one to print every 3s.
  65 + * Usage:
  66 + SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
  67 + SrsAutoFree(SrsPithyPrint, pprint);
  68 + while (true) {
  69 + pprint->elapse();
  70 + if (pprint->can_print()) {
  71 + // print pithy message.
  72 + // user can get the elapse time by: pprint->age()
  73 + }
  74 + // read and write RTMP messages.
  75 + }
  76 + */
74 class SrsPithyPrint 77 class SrsPithyPrint
75 { 78 {
76 private: 79 private:
@@ -414,8 +414,6 @@ int SrsRtmpConn::stream_service_cycle() @@ -414,8 +414,6 @@ int SrsRtmpConn::stream_service_cycle()
414 } 414 }
415 srs_info("set chunk_size=%d success", chunk_size); 415 srs_info("set chunk_size=%d success", chunk_size);
416 416
417 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
418 -  
419 // find a source to serve. 417 // find a source to serve.
420 SrsSource* source = SrsSource::fetch(req); 418 SrsSource* source = SrsSource::fetch(req);
421 if (!source) { 419 if (!source) {
@@ -432,19 +430,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -432,19 +430,7 @@ int SrsRtmpConn::stream_service_cycle()
432 return ret; 430 return ret;
433 } 431 }
434 432
435 - // check ASAP, to fail it faster if invalid.  
436 - if (type != SrsRtmpConnPlay) {  
437 - // check publish available  
438 - if (!source->can_publish(vhost_is_edge)) {  
439 - ret = ERROR_SYSTEM_STREAM_BUSY;  
440 - srs_warn("stream %s is already publishing. ret=%d",  
441 - req->get_stream_url().c_str(), ret);  
442 - // to delay request  
443 - st_usleep(SRS_STREAM_BUSY_SLEEP_US);  
444 - return ret;  
445 - }  
446 - }  
447 - 433 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
448 bool enabled_cache = _srs_config->get_gop_cache(req->vhost); 434 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
449 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", 435 srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
450 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 436 req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
@@ -479,19 +465,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -479,19 +465,7 @@ int SrsRtmpConn::stream_service_cycle()
479 return ret; 465 return ret;
480 } 466 }
481 467
482 - if (!vhost_is_edge) {  
483 - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {  
484 - return ret;  
485 - }  
486 - }  
487 -  
488 - ret = fmle_publishing(source);  
489 -  
490 - if (!vhost_is_edge) {  
491 - source->release_publish();  
492 - }  
493 -  
494 - return ret; 468 + return publishing(source);
495 } 469 }
496 case SrsRtmpConnFlashPublish: { 470 case SrsRtmpConnFlashPublish: {
497 srs_verbose("flash start to publish stream %s.", req->stream.c_str()); 471 srs_verbose("flash start to publish stream %s.", req->stream.c_str());
@@ -501,19 +475,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -501,19 +475,7 @@ int SrsRtmpConn::stream_service_cycle()
501 return ret; 475 return ret;
502 } 476 }
503 477
504 - if (!vhost_is_edge) {  
505 - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) {  
506 - return ret;  
507 - }  
508 - }  
509 -  
510 - ret = flash_publishing(source);  
511 -  
512 - if (!vhost_is_edge) {  
513 - source->release_publish();  
514 - }  
515 -  
516 - return ret; 478 + return publishing(source);
517 } 479 }
518 default: { 480 default: {
519 ret = ERROR_SYSTEM_CLIENT_INVALID; 481 ret = ERROR_SYSTEM_CLIENT_INVALID;
@@ -767,69 +729,35 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe @@ -767,69 +729,35 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
767 return ret; 729 return ret;
768 } 730 }
769 731
770 -int SrsRtmpConn::fmle_publishing(SrsSource* source) 732 +int SrsRtmpConn::publishing(SrsSource* source)
771 { 733 {
772 int ret = ERROR_SUCCESS; 734 int ret = ERROR_SUCCESS;
773 -  
774 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
775 -  
776 - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {  
777 - srs_error("http hook on_publish failed. ret=%d", ret);  
778 - return ret;  
779 - }  
780 -  
781 - // use isolate thread to recv,  
782 - // @see: https://github.com/simple-rtmp-server/srs/issues/237  
783 - SrsPublishRecvThread trd(rtmp, req,  
784 - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);  
785 735
786 - srs_info("start to publish stream %s success", req->stream.c_str());  
787 - ret = do_publishing(source, &trd);  
788 -  
789 - // stop isolate recv thread  
790 - trd.stop();  
791 -  
792 - // when edge, notice edge to change state.  
793 - // when origin, notice all service to unpublish.  
794 - if (vhost_is_edge) {  
795 - source->on_edge_proxy_unpublish();  
796 - } else {  
797 - source->on_unpublish(); 736 + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
  737 + srs_error("check publish_refer failed. ret=%d", ret);
  738 + return ret;
798 } 739 }
799 -  
800 - http_hooks_on_unpublish();  
801 -  
802 - return ret;  
803 -}  
804 -  
805 -int SrsRtmpConn::flash_publishing(SrsSource* source)  
806 -{  
807 - int ret = ERROR_SUCCESS;  
808 -  
809 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); 740 + srs_verbose("check publish_refer success.");
810 741
811 if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { 742 if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
812 srs_error("http hook on_publish failed. ret=%d", ret); 743 srs_error("http hook on_publish failed. ret=%d", ret);
813 return ret; 744 return ret;
814 } 745 }
815 746
816 - // use isolate thread to recv,  
817 - // @see: https://github.com/simple-rtmp-server/srs/issues/237  
818 - SrsPublishRecvThread trd(rtmp, req,  
819 - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); 747 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
  748 + if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
  749 + // use isolate thread to recv,
  750 + // @see: https://github.com/simple-rtmp-server/srs/issues/237
  751 + SrsPublishRecvThread trd(rtmp, req,
  752 + st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge);
820 753
821 - srs_info("start to publish stream %s success", req->stream.c_str());  
822 - ret = do_publishing(source, &trd); 754 + srs_info("start to publish stream %s success", req->stream.c_str());
  755 + ret = do_publishing(source, &trd);
823 756
824 - // stop isolate recv thread  
825 - trd.stop(); 757 + // stop isolate recv thread
  758 + trd.stop();
826 759
827 - // when edge, notice edge to change state.  
828 - // when origin, notice all service to unpublish.  
829 - if (vhost_is_edge) {  
830 - source->on_edge_proxy_unpublish();  
831 - } else {  
832 - source->on_unpublish(); 760 + release_publish(source, vhost_is_edge);
833 } 761 }
834 762
835 http_hooks_on_unpublish(); 763 http_hooks_on_unpublish();
@@ -840,33 +768,10 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -840,33 +768,10 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
840 int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) 768 int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
841 { 769 {
842 int ret = ERROR_SUCCESS; 770 int ret = ERROR_SUCCESS;
843 -  
844 - if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {  
845 - srs_error("check publish_refer failed. ret=%d", ret);  
846 - return ret;  
847 - }  
848 - srs_verbose("check publish_refer success.");  
849 771
850 SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); 772 SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
851 SrsAutoFree(SrsPithyPrint, pprint); 773 SrsAutoFree(SrsPithyPrint, pprint);
852 774
853 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
854 -  
855 - // when edge, ignore the publish event, directly proxy it.  
856 - if (!vhost_is_edge) {  
857 - // notify the hls to prepare when publish start.  
858 - if ((ret = source->on_publish()) != ERROR_SUCCESS) {  
859 - srs_error("hls on_publish failed. ret=%d", ret);  
860 - return ret;  
861 - }  
862 - srs_verbose("hls on_publish success.");  
863 - } else {  
864 - if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {  
865 - srs_error("notice edge start publish stream failed. ret=%d", ret);  
866 - return ret;  
867 - }  
868 - }  
869 -  
870 // start isolate recv thread. 775 // start isolate recv thread.
871 if ((ret = trd->start()) != ERROR_SUCCESS) { 776 if ((ret = trd->start()) != ERROR_SUCCESS) {
872 srs_error("start isolate recv thread failed. ret=%d", ret); 777 srs_error("start isolate recv thread failed. ret=%d", ret);
@@ -914,6 +819,43 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -914,6 +819,43 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
914 return ret; 819 return ret;
915 } 820 }
916 821
  822 +int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
  823 +{
  824 + int ret = ERROR_SUCCESS;
  825 +
  826 + if (!source->can_publish(is_edge)) {
  827 + ret = ERROR_SYSTEM_STREAM_BUSY;
  828 + srs_warn("stream %s is already publishing. ret=%d",
  829 + req->get_stream_url().c_str(), ret);
  830 + return ret;
  831 + }
  832 +
  833 + // when edge, ignore the publish event, directly proxy it.
  834 + if (is_edge) {
  835 + if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
  836 + srs_error("notice edge start publish stream failed. ret=%d", ret);
  837 + }
  838 + return ret;
  839 + } else {
  840 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
  841 + srs_error("notify publish failed. ret=%d", ret);
  842 + }
  843 + }
  844 +
  845 + return ret;
  846 +}
  847 +
  848 +void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge)
  849 +{
  850 + // when edge, notice edge to change state.
  851 + // when origin, notice all service to unpublish.
  852 + if (is_edge) {
  853 + source->on_edge_proxy_unpublish();
  854 + } else {
  855 + source->on_unpublish();
  856 + }
  857 +}
  858 +
917 int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge) 859 int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
918 { 860 {
919 int ret = ERROR_SUCCESS; 861 int ret = ERROR_SUCCESS;
@@ -111,9 +111,10 @@ private: @@ -111,9 +111,10 @@ private:
111 virtual int check_vhost(); 111 virtual int check_vhost();
112 virtual int playing(SrsSource* source); 112 virtual int playing(SrsSource* source);
113 virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); 113 virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd);
114 - virtual int fmle_publishing(SrsSource* source);  
115 - virtual int flash_publishing(SrsSource* source); 114 + virtual int publishing(SrsSource* source);
116 virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); 115 virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
  116 + virtual int acquire_publish(SrsSource* source, bool is_edge);
  117 + virtual void release_publish(SrsSource* source, bool is_edge);
117 virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge); 118 virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge);
118 virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); 119 virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge);
119 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); 120 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
@@ -1958,26 +1958,6 @@ int SrsSource::on_aggregate(SrsCommonMessage* msg) @@ -1958,26 +1958,6 @@ int SrsSource::on_aggregate(SrsCommonMessage* msg)
1958 return ret; 1958 return ret;
1959 } 1959 }
1960 1960
1961 -int SrsSource::acquire_publish()  
1962 -{  
1963 - int ret = ERROR_SUCCESS;  
1964 -  
1965 - if (!_can_publish) {  
1966 - ret = ERROR_SYSTEM_STREAM_BUSY;  
1967 - srs_warn("publish lock stream failed, ret=%d", ret);  
1968 - return ret;  
1969 - }  
1970 -  
1971 - _can_publish = false;  
1972 -  
1973 - return ret;  
1974 -}  
1975 -  
1976 -void SrsSource::release_publish()  
1977 -{  
1978 - _can_publish = true;  
1979 -}  
1980 -  
1981 int SrsSource::on_publish() 1961 int SrsSource::on_publish()
1982 { 1962 {
1983 int ret = ERROR_SUCCESS; 1963 int ret = ERROR_SUCCESS;
@@ -550,13 +550,6 @@ private: @@ -550,13 +550,6 @@ private:
550 public: 550 public:
551 virtual int on_aggregate(SrsCommonMessage* msg); 551 virtual int on_aggregate(SrsCommonMessage* msg);
552 /** 552 /**
553 - * the pre-publish is we are very sure we are  
554 - * trying to publish stream, please lock the resource,  
555 - * and we use release_publish() to release the resource.  
556 - */  
557 - virtual int acquire_publish();  
558 - virtual void release_publish();  
559 - /**  
560 * publish stream event notify. 553 * publish stream event notify.
561 * @param _req the request from client, the source will deep copy it, 554 * @param _req the request from client, the source will deep copy it,
562 * for when reload the request of client maybe invalid. 555 * for when reload the request of client maybe invalid.