winlin

for bug #237, use isolate thread to recv message. 2.0.41

@@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #include <srs_protocol_rtmp.hpp> 26 #include <srs_protocol_rtmp.hpp>
27 #include <srs_protocol_stack.hpp> 27 #include <srs_protocol_stack.hpp>
  28 +#include <srs_app_rtmp_conn.hpp>
28 29
29 ISrsMessageHandler::ISrsMessageHandler() 30 ISrsMessageHandler::ISrsMessageHandler()
30 { 31 {
@@ -89,6 +90,11 @@ int SrsRecvThread::cycle() @@ -89,6 +90,11 @@ int SrsRecvThread::cycle()
89 return ret; 90 return ret;
90 } 91 }
91 92
  93 +void SrsRecvThread::stop_loop()
  94 +{
  95 + trd->stop_loop();
  96 +}
  97 +
92 void SrsRecvThread::on_thread_start() 98 void SrsRecvThread::on_thread_start()
93 { 99 {
94 // the multiple messages writev improve performance large, 100 // the multiple messages writev improve performance large,
@@ -179,3 +185,72 @@ int SrsQueueRecvThread::handle(SrsMessage* msg) @@ -179,3 +185,72 @@ int SrsQueueRecvThread::handle(SrsMessage* msg)
179 185
180 return ERROR_SUCCESS; 186 return ERROR_SUCCESS;
181 } 187 }
  188 +
  189 +SrsPublishRecvThread::SrsPublishRecvThread(
  190 + SrsRtmpServer* rtmp_sdk, int timeout_ms,
  191 + SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
  192 +): trd(this, rtmp_sdk, timeout_ms)
  193 +{
  194 + _conn = conn;
  195 + _source = source;
  196 + _is_fmle = is_fmle;
  197 + _is_edge = is_edge;
  198 +
  199 + recv_error_code = ERROR_SUCCESS;
  200 + _nb_msgs = 0;
  201 +}
  202 +
  203 +SrsPublishRecvThread::~SrsPublishRecvThread()
  204 +{
  205 + trd.stop();
  206 +}
  207 +
  208 +int64_t SrsPublishRecvThread::nb_msgs()
  209 +{
  210 + return _nb_msgs;
  211 +}
  212 +
  213 +int SrsPublishRecvThread::error_code()
  214 +{
  215 + return recv_error_code;
  216 +}
  217 +
  218 +int SrsPublishRecvThread::start()
  219 +{
  220 + return trd.start();
  221 +}
  222 +
  223 +void SrsPublishRecvThread::stop()
  224 +{
  225 + trd.stop();
  226 +}
  227 +
  228 +bool SrsPublishRecvThread::can_handle()
  229 +{
  230 + // publish thread always can handle message.
  231 + return true;
  232 +}
  233 +
  234 +int SrsPublishRecvThread::handle(SrsMessage* msg)
  235 +{
  236 + int ret = ERROR_SUCCESS;
  237 +
  238 + _nb_msgs++;
  239 +
  240 + // the rtmp connection will handle this message,
  241 + // quit the thread loop when error.
  242 + recv_error_code = ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
  243 +
  244 + // when error, use stop loop to terminate the thread normally,
  245 + // for we are in the thread loop now, and should never use stop() to terminate it.
  246 + if (ret != ERROR_SUCCESS) {
  247 + trd.stop_loop();
  248 + }
  249 +
  250 + // must always free it,
  251 + // the source will copy it if need to use.
  252 + srs_freep(msg);
  253 +
  254 + // TODO: FIXME: implements it.
  255 + return ret;
  256 +}
@@ -36,6 +36,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,6 +36,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 36
37 class SrsRtmpServer; 37 class SrsRtmpServer;
38 class SrsMessage; 38 class SrsMessage;
  39 +class SrsRtmpConn;
  40 +class SrsSource;
39 41
40 /** 42 /**
41 * for the recv thread to handle the message. 43 * for the recv thread to handle the message.
@@ -76,6 +78,7 @@ public: @@ -76,6 +78,7 @@ public:
76 virtual int start(); 78 virtual int start();
77 virtual void stop(); 79 virtual void stop();
78 virtual int cycle(); 80 virtual int cycle();
  81 + virtual void stop_loop();
79 public: 82 public:
80 virtual void on_thread_start(); 83 virtual void on_thread_start();
81 virtual void on_thread_stop(); 84 virtual void on_thread_stop();
@@ -87,7 +90,7 @@ public: @@ -87,7 +90,7 @@ public:
87 * @see: SrsRtmpConn::playing 90 * @see: SrsRtmpConn::playing
88 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 91 * @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
89 */ 92 */
90 -class SrsQueueRecvThread : virtual public ISrsMessageHandler 93 +class SrsQueueRecvThread : public ISrsMessageHandler
91 { 94 {
92 private: 95 private:
93 std::vector<SrsMessage*> queue; 96 std::vector<SrsMessage*> queue;
@@ -107,5 +110,36 @@ public: @@ -107,5 +110,36 @@ public:
107 virtual int handle(SrsMessage* msg); 110 virtual int handle(SrsMessage* msg);
108 }; 111 };
109 112
  113 +/**
  114 + * the publish recv thread got message and callback the source method to process message.
  115 +* @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
  116 + */
  117 +class SrsPublishRecvThread : public ISrsMessageHandler
  118 +{
  119 +private:
  120 + SrsRecvThread trd;
  121 + // the msgs already got.
  122 + int64_t _nb_msgs;
  123 + // the recv thread error code.
  124 + int recv_error_code;
  125 + SrsRtmpConn* _conn;
  126 + SrsSource* _source;
  127 + bool _is_fmle;
  128 + bool _is_edge;
  129 +public:
  130 + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms,
  131 + SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge);
  132 + virtual ~SrsPublishRecvThread();
  133 +public:
  134 + virtual int64_t nb_msgs();
  135 + virtual int error_code();
  136 +public:
  137 + virtual int start();
  138 + virtual void stop();
  139 +public:
  140 + virtual bool can_handle();
  141 + virtual int handle(SrsMessage* msg);
  142 +};
  143 +
110 #endif 144 #endif
111 145
@@ -644,8 +644,15 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) @@ -644,8 +644,15 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source)
644 return ret; 644 return ret;
645 } 645 }
646 646
  647 + // use isolate thread to recv,
  648 + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
  649 + SrsPublishRecvThread trd(rtmp, SRS_CONSTS_RTMP_RECV_TIMEOUT_US, this, source, true, vhost_is_edge);
  650 +
647 srs_info("start to publish stream %s success", req->stream.c_str()); 651 srs_info("start to publish stream %s success", req->stream.c_str());
648 - ret = do_fmle_publishing(source); 652 + ret = do_publishing(source, &trd);
  653 +
  654 + // stop isolate recv thread
  655 + trd.stop();
649 656
650 // when edge, notice edge to change state. 657 // when edge, notice edge to change state.
651 // when origin, notice all service to unpublish. 658 // when origin, notice all service to unpublish.
@@ -660,82 +667,6 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) @@ -660,82 +667,6 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source)
660 return ret; 667 return ret;
661 } 668 }
662 669
663 -int SrsRtmpConn::do_fmle_publishing(SrsSource* source)  
664 -{  
665 - int ret = ERROR_SUCCESS;  
666 -  
667 - if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {  
668 - srs_error("fmle check publish_refer failed. ret=%d", ret);  
669 - return ret;  
670 - }  
671 - srs_verbose("fmle check publish_refer success.");  
672 -  
673 - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER);  
674 -  
675 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
676 -  
677 - // when edge, ignore the publish event, directly proxy it.  
678 - if (!vhost_is_edge) {  
679 - // notify the hls to prepare when publish start.  
680 - if ((ret = source->on_publish()) != ERROR_SUCCESS) {  
681 - srs_error("fmle hls on_publish failed. ret=%d", ret);  
682 - return ret;  
683 - }  
684 - srs_verbose("fmle hls on_publish success.");  
685 - }  
686 -  
687 - while (true) {  
688 - SrsMessage* msg = NULL;  
689 - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {  
690 - srs_error("fmle recv identify client message failed. ret=%d", ret);  
691 - return ret;  
692 - }  
693 -  
694 - SrsAutoFree(SrsMessage, msg);  
695 -  
696 - pithy_print.elapse();  
697 -  
698 - // reportable  
699 - if (pithy_print.can_print()) {  
700 - kbps->sample();  
701 - srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH  
702 - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(),  
703 - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),  
704 - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());  
705 - }  
706 -  
707 - // process UnPublish event.  
708 - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {  
709 - SrsPacket* pkt = NULL;  
710 - if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {  
711 - srs_error("fmle decode unpublish message failed. ret=%d", ret);  
712 - return ret;  
713 - }  
714 -  
715 - SrsAutoFree(SrsPacket, pkt);  
716 -  
717 - if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {  
718 - SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);  
719 - if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {  
720 - return ret;  
721 - }  
722 - return ERROR_CONTROL_REPUBLISH;  
723 - }  
724 -  
725 - srs_trace("fmle ignore AMF0/AMF3 command message.");  
726 - continue;  
727 - }  
728 -  
729 - // video, audio, data message  
730 - if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {  
731 - srs_error("fmle process publish message failed. ret=%d", ret);  
732 - return ret;  
733 - }  
734 - }  
735 -  
736 - return ret;  
737 -}  
738 -  
739 int SrsRtmpConn::flash_publishing(SrsSource* source) 670 int SrsRtmpConn::flash_publishing(SrsSource* source)
740 { 671 {
741 int ret = ERROR_SUCCESS; 672 int ret = ERROR_SUCCESS;
@@ -747,8 +678,15 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -747,8 +678,15 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
747 return ret; 678 return ret;
748 } 679 }
749 680
750 - srs_info("flash start to publish stream %s success", req->stream.c_str());  
751 - ret = do_flash_publishing(source); 681 + // use isolate thread to recv,
  682 + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237
  683 + SrsPublishRecvThread trd(rtmp, SRS_CONSTS_RTMP_RECV_TIMEOUT_US, this, source, false, vhost_is_edge);
  684 +
  685 + srs_info("start to publish stream %s success", req->stream.c_str());
  686 + ret = do_publishing(source, &trd);
  687 +
  688 + // stop isolate recv thread
  689 + trd.stop();
752 690
753 // when edge, notice edge to change state. 691 // when edge, notice edge to change state.
754 // when origin, notice all service to unpublish. 692 // when origin, notice all service to unpublish.
@@ -763,15 +701,15 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) @@ -763,15 +701,15 @@ int SrsRtmpConn::flash_publishing(SrsSource* source)
763 return ret; 701 return ret;
764 } 702 }
765 703
766 -int SrsRtmpConn::do_flash_publishing(SrsSource* source) 704 +int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
767 { 705 {
768 int ret = ERROR_SUCCESS; 706 int ret = ERROR_SUCCESS;
769 707
770 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { 708 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
771 - srs_error("flash check publish_refer failed. ret=%d", ret); 709 + srs_error("check publish_refer failed. ret=%d", ret);
772 return ret; 710 return ret;
773 } 711 }
774 - srs_verbose("flash check publish_refer success."); 712 + srs_verbose("check publish_refer success.");
775 713
776 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER); 714 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER);
777 715
@@ -781,57 +719,94 @@ int SrsRtmpConn::do_flash_publishing(SrsSource* source) @@ -781,57 +719,94 @@ int SrsRtmpConn::do_flash_publishing(SrsSource* source)
781 if (!vhost_is_edge) { 719 if (!vhost_is_edge) {
782 // notify the hls to prepare when publish start. 720 // notify the hls to prepare when publish start.
783 if ((ret = source->on_publish()) != ERROR_SUCCESS) { 721 if ((ret = source->on_publish()) != ERROR_SUCCESS) {
784 - srs_error("flash hls on_publish failed. ret=%d", ret); 722 + srs_error("hls on_publish failed. ret=%d", ret);
785 return ret; 723 return ret;
786 } 724 }
787 - srs_verbose("flash hls on_publish success."); 725 + srs_verbose("hls on_publish success.");
788 } 726 }
789 727
790 - while (true) {  
791 - SrsMessage* msg = NULL;  
792 - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {  
793 - if (!srs_is_client_gracefully_close(ret)) {  
794 - srs_error("flash recv identify client message failed. ret=%d", ret); 728 + // start isolate recv thread.
  729 + if ((ret = trd->start()) != ERROR_SUCCESS) {
  730 + srs_error("start isolate recv thread failed. ret=%d", ret);
  731 + return ret;
795 } 732 }
  733 +
  734 + int64_t nb_msgs = 0;
  735 + while (true) {
  736 + // use small loop to check the error code, interval = 30s/100 = 300ms.
  737 + for (int i = 0; i < 100; i++) {
  738 + st_usleep(SRS_CONSTS_RTMP_RECV_TIMEOUT_US * 1000 / 100);
  739 +
  740 + // check the thread error code.
  741 + if ((ret = trd->error_code()) != ERROR_SUCCESS) {
796 return ret; 742 return ret;
797 } 743 }
  744 + }
798 745
799 - SrsAutoFree(SrsMessage, msg); 746 + // when not got any messages, timeout.
  747 + if (trd->nb_msgs() <= nb_msgs) {
  748 + ret = ERROR_SOCKET_TIMEOUT;
  749 + srs_warn("publish timeout %"PRId64"us, nb_msgs=%"PRId64", ret=%d",
  750 + SRS_CONSTS_RTMP_RECV_TIMEOUT_US, nb_msgs, ret);
  751 + break;
  752 + }
  753 + nb_msgs = trd->nb_msgs();
800 754
801 pithy_print.elapse(); 755 pithy_print.elapse();
802 756
803 // reportable 757 // reportable
804 if (pithy_print.can_print()) { 758 if (pithy_print.can_print()) {
805 kbps->sample(); 759 kbps->sample();
806 - srs_trace("<- "SRS_CONSTS_LOG_WEB_PUBLISH  
807 - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",  
808 - pithy_print.age(), 760 + srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
  761 + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(),
809 kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), 762 kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
810 kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); 763 kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
811 } 764 }
  765 + }
812 766
813 - // process UnPublish event. 767 + return ret;
  768 +}
  769 +
  770 +int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge)
  771 +{
  772 + int ret = ERROR_SUCCESS;
  773 +
  774 + // process publish event.
814 if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { 775 if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
815 SrsPacket* pkt = NULL; 776 SrsPacket* pkt = NULL;
816 if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { 777 if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
817 - srs_error("flash decode unpublish message failed. ret=%d", ret); 778 + srs_error("fmle decode unpublish message failed. ret=%d", ret);
818 return ret; 779 return ret;
819 } 780 }
820 781
821 SrsAutoFree(SrsPacket, pkt); 782 SrsAutoFree(SrsPacket, pkt);
822 783
  784 + // for flash, any packet is republish.
  785 + if (!is_fmle) {
823 // flash unpublish. 786 // flash unpublish.
824 // TODO: maybe need to support republish. 787 // TODO: maybe need to support republish.
825 srs_trace("flash flash publish finished."); 788 srs_trace("flash flash publish finished.");
826 return ERROR_CONTROL_REPUBLISH; 789 return ERROR_CONTROL_REPUBLISH;
827 } 790 }
828 791
  792 + // for fmle, drop others except the fmle start packet.
  793 + if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
  794 + SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
  795 + if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
  796 + return ret;
  797 + }
  798 + return ERROR_CONTROL_REPUBLISH;
  799 + }
  800 +
  801 + srs_trace("fmle ignore AMF0/AMF3 command message.");
  802 + return ret;
  803 + }
  804 +
829 // video, audio, data message 805 // video, audio, data message
830 if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { 806 if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {
831 - srs_error("flash process publish message failed. ret=%d", ret); 807 + srs_error("fmle process publish message failed. ret=%d", ret);
832 return ret; 808 return ret;
833 } 809 }
834 - }  
835 810
836 return ret; 811 return ret;
837 } 812 }
@@ -50,12 +50,15 @@ class SrsKbps; @@ -50,12 +50,15 @@ class SrsKbps;
50 class SrsRtmpClient; 50 class SrsRtmpClient;
51 class SrsSharedPtrMessage; 51 class SrsSharedPtrMessage;
52 class SrsQueueRecvThread; 52 class SrsQueueRecvThread;
  53 +class SrsPublishRecvThread;
53 54
54 /** 55 /**
55 * the client provides the main logic control for RTMP clients. 56 * the client provides the main logic control for RTMP clients.
56 */ 57 */
57 class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler 58 class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler
58 { 59 {
  60 + // for the thread to directly access any field of connection.
  61 + friend class SrsPublishRecvThread;
59 private: 62 private:
60 SrsRequest* req; 63 SrsRequest* req;
61 SrsResponse* res; 64 SrsResponse* res;
@@ -91,9 +94,9 @@ private: @@ -91,9 +94,9 @@ private:
91 virtual int playing(SrsSource* source); 94 virtual int playing(SrsSource* source);
92 virtual int do_playing(SrsSource* source, SrsQueueRecvThread* trd); 95 virtual int do_playing(SrsSource* source, SrsQueueRecvThread* trd);
93 virtual int fmle_publishing(SrsSource* source); 96 virtual int fmle_publishing(SrsSource* source);
94 - virtual int do_fmle_publishing(SrsSource* source);  
95 virtual int flash_publishing(SrsSource* source); 97 virtual int flash_publishing(SrsSource* source);
96 - virtual int do_flash_publishing(SrsSource* source); 98 + virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
  99 + virtual int handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge);
97 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); 100 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
98 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); 101 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
99 private: 102 private:
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 40 34 +#define VERSION_REVISION 41
35 // server info. 35 // server info.
36 #define RTMP_SIG_SRS_KEY "SRS" 36 #define RTMP_SIG_SRS_KEY "SRS"
37 #define RTMP_SIG_SRS_ROLE "origin/edge server" 37 #define RTMP_SIG_SRS_ROLE "origin/edge server"