winlin

refine the thread model for the retry threads

@@ -81,11 +81,11 @@ vhost __defaultVhost__ { @@ -81,11 +81,11 @@ vhost __defaultVhost__ {
81 vhost dev { 81 vhost dev {
82 enabled on; 82 enabled on;
83 gop_cache on; 83 gop_cache on;
84 - hls on; 84 + hls off;
85 hls_path ./objs/nginx/html; 85 hls_path ./objs/nginx/html;
86 hls_fragment 5; 86 hls_fragment 5;
87 hls_window 30; 87 hls_window 30;
88 - #forward 127.0.0.1:19350; 88 + forward 127.0.0.1:19350;
89 http_hooks { 89 http_hooks {
90 enabled off; 90 enabled off;
91 on_connect http://127.0.0.1:8085/api/v1/clients; 91 on_connect http://127.0.0.1:8085/api/v1/clients;
@@ -96,7 +96,7 @@ vhost dev { @@ -96,7 +96,7 @@ vhost dev {
96 on_stop http://127.0.0.1:8085/api/v1/sessions; 96 on_stop http://127.0.0.1:8085/api/v1/sessions;
97 } 97 }
98 transcode { 98 transcode {
99 - enabled off; 99 + enabled on;
100 ffmpeg ./objs/ffmpeg/bin/ffmpeg; 100 ffmpeg ./objs/ffmpeg/bin/ffmpeg;
101 engine dev { 101 engine dev {
102 enabled on; 102 enabled on;
@@ -116,7 +116,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server" @@ -116,7 +116,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server"
116 "srs_core_handshake" "srs_core_pithy_print" 116 "srs_core_handshake" "srs_core_pithy_print"
117 "srs_core_config" "srs_core_refer" "srs_core_reload" 117 "srs_core_config" "srs_core_refer" "srs_core_reload"
118 "srs_core_hls" "srs_core_forward" "srs_core_encoder" 118 "srs_core_hls" "srs_core_forward" "srs_core_encoder"
119 - "srs_core_http") 119 + "srs_core_http" "srs_core_thread")
120 MODULE_DIR="src/core" . auto/modules.sh 120 MODULE_DIR="src/core" . auto/modules.sh
121 CORE_OBJS="${MODULE_OBJS[@]}" 121 CORE_OBJS="${MODULE_OBJS[@]}"
122 122
不能预览此文件类型
@@ -111,3 +111,17 @@ void srs_vhost_resolve(std::string& vhost, std::string& app) @@ -111,3 +111,17 @@ void srs_vhost_resolve(std::string& vhost, std::string& app)
111 } 111 }
112 } 112 }
113 } 113 }
  114 +
  115 +void srs_close_stfd(st_netfd_t& stfd)
  116 +{
  117 + if (stfd) {
  118 + int fd = st_netfd_fileno(stfd);
  119 + st_netfd_close(stfd);
  120 + stfd = NULL;
  121 +
  122 + // st does not close it sometimes,
  123 + // close it manually.
  124 + close(fd);
  125 + }
  126 +}
  127 +
@@ -46,6 +46,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -46,6 +46,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
46 #include <stddef.h> 46 #include <stddef.h>
47 #include <sys/types.h> 47 #include <sys/types.h>
48 48
  49 +#include <st.h>
  50 +
49 // generated by configure. 51 // generated by configure.
50 #include <srs_auto_headers.hpp> 52 #include <srs_auto_headers.hpp>
51 53
@@ -102,4 +104,7 @@ extern std::string srs_dns_resolve(std::string host); @@ -102,4 +104,7 @@ extern std::string srs_dns_resolve(std::string host);
102 // app...vhost...request_vhost 104 // app...vhost...request_vhost
103 extern void srs_vhost_resolve(std::string& vhost, std::string& app); 105 extern void srs_vhost_resolve(std::string& vhost, std::string& app);
104 106
  107 +// close the netfd, and close the underlayer fd.
  108 +extern void srs_close_stfd(st_netfd_t& stfd);
  109 +
105 #endif 110 #endif
@@ -36,15 +36,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd) @@ -36,15 +36,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
36 36
37 SrsConnection::~SrsConnection() 37 SrsConnection::~SrsConnection()
38 { 38 {
39 - if (stfd) {  
40 - int fd = st_netfd_fileno(stfd);  
41 - st_netfd_close(stfd);  
42 - stfd = NULL;  
43 -  
44 - // st does not close it sometimes,  
45 - // close it manually.  
46 - close(fd);  
47 - } 39 + srs_close_stfd(stfd);
48 } 40 }
49 41
50 int SrsConnection::start() 42 int SrsConnection::start()
@@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
33 -#include <st.h>  
34 -  
35 class SrsServer; 33 class SrsServer;
36 class SrsConnection 34 class SrsConnection
37 { 35 {
@@ -483,52 +483,15 @@ void SrsFFMPEG::stop() @@ -483,52 +483,15 @@ void SrsFFMPEG::stop()
483 483
484 SrsEncoder::SrsEncoder() 484 SrsEncoder::SrsEncoder()
485 { 485 {
486 - tid = NULL;  
487 - loop = false; 486 + pthread = new SrsThread(this, SRS_ENCODER_SLEEP_MS);
  487 + pithy_print = new SrsPithyPrint(SRS_STAGE_ENCODER);
488 } 488 }
489 489
490 SrsEncoder::~SrsEncoder() 490 SrsEncoder::~SrsEncoder()
491 { 491 {
492 on_unpublish(); 492 on_unpublish();
493 -}  
494 -  
495 -int SrsEncoder::parse_scope_engines(SrsRequest* req)  
496 -{  
497 - int ret = ERROR_SUCCESS;  
498 -  
499 - // parse all transcode engines.  
500 - SrsConfDirective* conf = NULL;  
501 -  
502 - // parse vhost scope engines  
503 - std::string scope = "";  
504 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
505 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
506 - srs_error("parse vhost scope=%s transcode engines failed. "  
507 - "ret=%d", scope.c_str(), ret);  
508 - return ret;  
509 - }  
510 - }  
511 - // parse app scope engines  
512 - scope = req->app;  
513 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
514 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
515 - srs_error("parse app scope=%s transcode engines failed. "  
516 - "ret=%d", scope.c_str(), ret);  
517 - return ret;  
518 - }  
519 - }  
520 - // parse stream scope engines  
521 - scope += "/";  
522 - scope += req->stream;  
523 - if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {  
524 - if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {  
525 - srs_error("parse stream scope=%s transcode engines failed. "  
526 - "ret=%d", scope.c_str(), ret);  
527 - return ret;  
528 - }  
529 - }  
530 493
531 - return ret; 494 + srs_freep(pthread);
532 } 495 }
533 496
534 int SrsEncoder::on_publish(SrsRequest* req) 497 int SrsEncoder::on_publish(SrsRequest* req)
@@ -539,6 +502,7 @@ int SrsEncoder::on_publish(SrsRequest* req) @@ -539,6 +502,7 @@ int SrsEncoder::on_publish(SrsRequest* req)
539 502
540 // ignore the loop encoder 503 // ignore the loop encoder
541 if (ret == ERROR_ENCODER_LOOP) { 504 if (ret == ERROR_ENCODER_LOOP) {
  505 + clear_engines();
542 ret = ERROR_SUCCESS; 506 ret = ERROR_SUCCESS;
543 } 507 }
544 508
@@ -548,9 +512,7 @@ int SrsEncoder::on_publish(SrsRequest* req) @@ -548,9 +512,7 @@ int SrsEncoder::on_publish(SrsRequest* req)
548 } 512 }
549 513
550 // start thread to run all encoding engines. 514 // start thread to run all encoding engines.
551 - srs_assert(!tid);  
552 - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {  
553 - ret = ERROR_ST_CREATE_FORWARD_THREAD; 515 + if ((ret = pthread->start()) != ERROR_SUCCESS) {
554 srs_error("st_thread_create failed. ret=%d", ret); 516 srs_error("st_thread_create failed. ret=%d", ret);
555 return ret; 517 return ret;
556 } 518 }
@@ -560,23 +522,58 @@ int SrsEncoder::on_publish(SrsRequest* req) @@ -560,23 +522,58 @@ int SrsEncoder::on_publish(SrsRequest* req)
560 522
561 void SrsEncoder::on_unpublish() 523 void SrsEncoder::on_unpublish()
562 { 524 {
563 - if (tid) {  
564 - loop = false;  
565 - st_thread_interrupt(tid);  
566 - st_thread_join(tid, NULL);  
567 - tid = NULL; 525 + pthread->stop();
  526 + clear_engines();
  527 +}
  528 +
  529 +int SrsEncoder::cycle()
  530 +{
  531 + int ret = ERROR_SUCCESS;
  532 +
  533 + std::vector<SrsFFMPEG*>::iterator it;
  534 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  535 + SrsFFMPEG* ffmpeg = *it;
  536 +
  537 + // start all ffmpegs.
  538 + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
  539 + srs_error("ffmpeg start failed. ret=%d", ret);
  540 + return ret;
568 } 541 }
569 542
570 - clear_engines(); 543 + // check ffmpeg status.
  544 + if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {
  545 + srs_error("ffmpeg cycle failed. ret=%d", ret);
  546 + return ret;
  547 + }
  548 + }
  549 +
  550 + // pithy print
  551 + encoder();
  552 + pithy_print->elapse(SRS_ENCODER_SLEEP_MS);
  553 +
  554 + return ret;
  555 +}
  556 +
  557 +void SrsEncoder::on_leave_loop()
  558 +{
  559 + // kill ffmpeg when finished and it alive
  560 + std::vector<SrsFFMPEG*>::iterator it;
  561 +
  562 + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
  563 + SrsFFMPEG* ffmpeg = *it;
  564 + ffmpeg->stop();
  565 + }
571 } 566 }
572 567
573 void SrsEncoder::clear_engines() 568 void SrsEncoder::clear_engines()
574 { 569 {
575 std::vector<SrsFFMPEG*>::iterator it; 570 std::vector<SrsFFMPEG*>::iterator it;
  571 +
576 for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { 572 for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
577 SrsFFMPEG* ffmpeg = *it; 573 SrsFFMPEG* ffmpeg = *it;
578 srs_freep(ffmpeg); 574 srs_freep(ffmpeg);
579 } 575 }
  576 +
580 ffmpegs.clear(); 577 ffmpegs.clear();
581 } 578 }
582 579
@@ -585,6 +582,45 @@ SrsFFMPEG* SrsEncoder::at(int index) @@ -585,6 +582,45 @@ SrsFFMPEG* SrsEncoder::at(int index)
585 return ffmpegs[index]; 582 return ffmpegs[index];
586 } 583 }
587 584
  585 +int SrsEncoder::parse_scope_engines(SrsRequest* req)
  586 +{
  587 + int ret = ERROR_SUCCESS;
  588 +
  589 + // parse all transcode engines.
  590 + SrsConfDirective* conf = NULL;
  591 +
  592 + // parse vhost scope engines
  593 + std::string scope = "";
  594 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  595 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  596 + srs_error("parse vhost scope=%s transcode engines failed. "
  597 + "ret=%d", scope.c_str(), ret);
  598 + return ret;
  599 + }
  600 + }
  601 + // parse app scope engines
  602 + scope = req->app;
  603 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  604 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  605 + srs_error("parse app scope=%s transcode engines failed. "
  606 + "ret=%d", scope.c_str(), ret);
  607 + return ret;
  608 + }
  609 + }
  610 + // parse stream scope engines
  611 + scope += "/";
  612 + scope += req->stream;
  613 + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
  614 + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
  615 + srs_error("parse stream scope=%s transcode engines failed. "
  616 + "ret=%d", scope.c_str(), ret);
  617 + return ret;
  618 + }
  619 + }
  620 +
  621 + return ret;
  622 +}
  623 +
588 int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf) 624 int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)
589 { 625 {
590 int ret = ERROR_SUCCESS; 626 int ret = ERROR_SUCCESS;
@@ -631,7 +667,6 @@ int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf) @@ -631,7 +667,6 @@ int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)
631 667
632 // if got a loop, donot transcode the whole stream. 668 // if got a loop, donot transcode the whole stream.
633 if (ret == ERROR_ENCODER_LOOP) { 669 if (ret == ERROR_ENCODER_LOOP) {
634 - clear_engines();  
635 break; 670 break;
636 } 671 }
637 672
@@ -646,85 +681,14 @@ int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf) @@ -646,85 +681,14 @@ int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)
646 return ret; 681 return ret;
647 } 682 }
648 683
649 -int SrsEncoder::cycle()  
650 -{  
651 - int ret = ERROR_SUCCESS;  
652 -  
653 - std::vector<SrsFFMPEG*>::iterator it;  
654 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
655 - SrsFFMPEG* ffmpeg = *it;  
656 -  
657 - // start all ffmpegs.  
658 - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {  
659 - srs_error("ffmpeg start failed. ret=%d", ret);  
660 - return ret;  
661 - }  
662 -  
663 - // check ffmpeg status.  
664 - if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {  
665 - srs_error("ffmpeg cycle failed. ret=%d", ret);  
666 - return ret;  
667 - }  
668 - }  
669 -  
670 - return ret;  
671 -}  
672 -  
673 -void SrsEncoder::encoder_cycle()  
674 -{  
675 - int ret = ERROR_SUCCESS;  
676 -  
677 - log_context->generate_id();  
678 - srs_trace("encoder cycle start");  
679 -  
680 - SrsPithyPrint pithy_print(SRS_STAGE_ENCODER);  
681 -  
682 - while (loop) {  
683 - if ((ret = cycle()) != ERROR_SUCCESS) {  
684 - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);  
685 - } else {  
686 - srs_info("encoder cycle success, retry");  
687 - }  
688 -  
689 - if (!loop) {  
690 - break;  
691 - }  
692 -  
693 - encoder(&pithy_print);  
694 - pithy_print.elapse(SRS_ENCODER_SLEEP_MS);  
695 -  
696 - st_usleep(SRS_ENCODER_SLEEP_MS * 1000);  
697 - }  
698 -  
699 - // kill ffmpeg when finished and it alive  
700 - std::vector<SrsFFMPEG*>::iterator it;  
701 - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {  
702 - SrsFFMPEG* ffmpeg = *it;  
703 - ffmpeg->stop();  
704 - }  
705 -  
706 - srs_trace("encoder cycle finished");  
707 -}  
708 -  
709 -void SrsEncoder::encoder(SrsPithyPrint* pithy_print) 684 +void SrsEncoder::encoder()
710 { 685 {
711 // reportable 686 // reportable
712 if (pithy_print->can_print()) { 687 if (pithy_print->can_print()) {
713 - srs_trace("-> time=%"PRId64", encoders=%d",  
714 - pithy_print->get_age(), (int)ffmpegs.size()); 688 + // TODO: FIXME: show more info.
  689 + srs_trace("-> time=%"PRId64", encoders=%d", pithy_print->get_age(), (int)ffmpegs.size());
715 } 690 }
716 } 691 }
717 692
718 -void* SrsEncoder::encoder_thread(void* arg)  
719 -{  
720 - SrsEncoder* obj = (SrsEncoder*)arg;  
721 - srs_assert(obj != NULL);  
722 -  
723 - obj->loop = true;  
724 - obj->encoder_cycle();  
725 -  
726 - return NULL;  
727 -}  
728 -  
729 #endif 693 #endif
730 694
@@ -32,7 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,7 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <string> 32 #include <string>
33 #include <vector> 33 #include <vector>
34 34
35 -#include <st.h> 35 +#include <srs_core_thread.hpp>
36 36
37 class SrsConfDirective; 37 class SrsConfDirective;
38 class SrsRequest; 38 class SrsRequest;
@@ -85,28 +85,29 @@ public: @@ -85,28 +85,29 @@ public:
85 * the encoder for a stream, 85 * the encoder for a stream,
86 * may use multiple ffmpegs to transcode the specified stream. 86 * may use multiple ffmpegs to transcode the specified stream.
87 */ 87 */
88 -class SrsEncoder 88 +class SrsEncoder : public ISrsThreadHandler
89 { 89 {
90 private: 90 private:
91 std::vector<SrsFFMPEG*> ffmpegs; 91 std::vector<SrsFFMPEG*> ffmpegs;
92 private: 92 private:
93 - st_thread_t tid;  
94 - bool loop; 93 + SrsThread* pthread;
  94 + SrsPithyPrint* pithy_print;
95 public: 95 public:
96 SrsEncoder(); 96 SrsEncoder();
97 virtual ~SrsEncoder(); 97 virtual ~SrsEncoder();
98 public: 98 public:
99 virtual int on_publish(SrsRequest* req); 99 virtual int on_publish(SrsRequest* req);
100 virtual void on_unpublish(); 100 virtual void on_unpublish();
  101 +// interface ISrsThreadHandler.
  102 +public:
  103 + virtual int cycle();
  104 + virtual void on_leave_loop();
101 private: 105 private:
102 - virtual int parse_scope_engines(SrsRequest* req);  
103 virtual void clear_engines(); 106 virtual void clear_engines();
104 virtual SrsFFMPEG* at(int index); 107 virtual SrsFFMPEG* at(int index);
  108 + virtual int parse_scope_engines(SrsRequest* req);
105 virtual int parse_transcode(SrsRequest* req, SrsConfDirective* conf); 109 virtual int parse_transcode(SrsRequest* req, SrsConfDirective* conf);
106 - virtual int cycle();  
107 - virtual void encoder_cycle();  
108 - virtual void encoder(SrsPithyPrint* pithy_print);  
109 - static void* encoder_thread(void* arg); 110 + virtual void encoder();
110 }; 111 };
111 112
112 #endif 113 #endif
@@ -37,8 +37,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -37,8 +37,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
37 #define ERROR_ST_OPEN_SOCKET 102 37 #define ERROR_ST_OPEN_SOCKET 102
38 #define ERROR_ST_CREATE_LISTEN_THREAD 103 38 #define ERROR_ST_CREATE_LISTEN_THREAD 103
39 #define ERROR_ST_CREATE_CYCLE_THREAD 104 39 #define ERROR_ST_CREATE_CYCLE_THREAD 104
40 -#define ERROR_ST_CREATE_FORWARD_THREAD 105  
41 -#define ERROR_ST_CONNECT 106 40 +#define ERROR_ST_CONNECT 105
42 41
43 #define ERROR_SOCKET_CREATE 200 42 #define ERROR_SOCKET_CREATE 200
44 #define ERROR_SOCKET_SETREUSE 201 43 #define ERROR_SOCKET_SETREUSE 201
@@ -47,8 +47,7 @@ SrsForwarder::SrsForwarder() @@ -47,8 +47,7 @@ SrsForwarder::SrsForwarder()
47 stfd = NULL; 47 stfd = NULL;
48 stream_id = 0; 48 stream_id = 0;
49 49
50 - tid = NULL;  
51 - loop = false; 50 + pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS);
52 } 51 }
53 52
54 SrsForwarder::~SrsForwarder() 53 SrsForwarder::~SrsForwarder()
@@ -61,6 +60,8 @@ SrsForwarder::~SrsForwarder() @@ -61,6 +60,8 @@ SrsForwarder::~SrsForwarder()
61 srs_freep(msg); 60 srs_freep(msg);
62 } 61 }
63 msgs.clear(); 62 msgs.clear();
  63 +
  64 + srs_freep(pthread);
64 } 65 }
65 66
66 int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) 67 int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
@@ -110,17 +111,8 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) @@ -110,17 +111,8 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
110 source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 111 source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),
111 stream_name.c_str()); 112 stream_name.c_str());
112 113
113 - // TODO: seems bug when republish and reforward.  
114 -  
115 - // start forward  
116 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
117 - return ret;  
118 - }  
119 -  
120 - srs_assert(!tid);  
121 - if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){  
122 - ret = ERROR_ST_CREATE_FORWARD_THREAD;  
123 - srs_error("st_thread_create failed. ret=%d", ret); 114 + if ((ret = pthread->start()) != ERROR_SUCCESS) {
  115 + srs_error("start srs thread failed. ret=%d", ret);
124 return ret; 116 return ret;
125 } 117 }
126 118
@@ -129,22 +121,9 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) @@ -129,22 +121,9 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
129 121
130 void SrsForwarder::on_unpublish() 122 void SrsForwarder::on_unpublish()
131 { 123 {
132 - if (tid) {  
133 - loop = false;  
134 - st_thread_interrupt(tid);  
135 - st_thread_join(tid, NULL);  
136 - tid = NULL;  
137 - }  
138 -  
139 - if (stfd) {  
140 - int fd = st_netfd_fileno(stfd);  
141 - st_netfd_close(stfd);  
142 - stfd = NULL; 124 + pthread->stop();
143 125
144 - // st does not close it sometimes,  
145 - // close it manually.  
146 - close(fd);  
147 - } 126 + close_underlayer_socket();
148 127
149 srs_freep(client); 128 srs_freep(client);
150 } 129 }
@@ -178,10 +157,59 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg) @@ -178,10 +157,59 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
178 return ret; 157 return ret;
179 } 158 }
180 159
181 -int SrsForwarder::open_socket() 160 +int SrsForwarder::cycle()
182 { 161 {
183 int ret = ERROR_SUCCESS; 162 int ret = ERROR_SUCCESS;
184 163
  164 + if ((ret = connect_server()) != ERROR_SUCCESS) {
  165 + return ret;
  166 + }
  167 + srs_assert(client);
  168 +
  169 + client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  170 + client->set_send_timeout(SRS_SEND_TIMEOUT_US);
  171 +
  172 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  173 + srs_error("handshake with server failed. ret=%d", ret);
  174 + return ret;
  175 + }
  176 + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
  177 + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
  178 + return ret;
  179 + }
  180 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  181 + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  182 + return ret;
  183 + }
  184 +
  185 + // TODO: FIXME: need to cache the metadata and sequence header when reconnect.
  186 +
  187 + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
  188 + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
  189 + stream_name.c_str(), stream_id, ret);
  190 + return ret;
  191 + }
  192 +
  193 + if ((ret = forward()) != ERROR_SUCCESS) {
  194 + return ret;
  195 + }
  196 +
  197 + return ret;
  198 +}
  199 +
  200 +void SrsForwarder::close_underlayer_socket()
  201 +{
  202 + srs_close_stfd(stfd);
  203 +}
  204 +
  205 +int SrsForwarder::connect_server()
  206 +{
  207 + int ret = ERROR_SUCCESS;
  208 +
  209 + // reopen
  210 + close_underlayer_socket();
  211 +
  212 + // open socket.
185 srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", 213 srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
186 stream_name.c_str(), tc_url.c_str(), server.c_str(), port); 214 stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
187 215
@@ -192,6 +220,7 @@ int SrsForwarder::open_socket() @@ -192,6 +220,7 @@ int SrsForwarder::open_socket()
192 return ret; 220 return ret;
193 } 221 }
194 222
  223 + srs_assert(!stfd);
195 stfd = st_netfd_open_socket(sock); 224 stfd = st_netfd_open_socket(sock);
196 if(stfd == NULL){ 225 if(stfd == NULL){
197 ret = ERROR_ST_OPEN_SOCKET; 226 ret = ERROR_ST_OPEN_SOCKET;
@@ -202,13 +231,7 @@ int SrsForwarder::open_socket() @@ -202,13 +231,7 @@ int SrsForwarder::open_socket()
202 srs_freep(client); 231 srs_freep(client);
203 client = new SrsRtmpClient(stfd); 232 client = new SrsRtmpClient(stfd);
204 233
205 - return ret;  
206 -}  
207 -  
208 -int SrsForwarder::connect_server()  
209 -{  
210 - int ret = ERROR_SUCCESS;  
211 - 234 + // connect to server.
212 std::string ip = srs_dns_resolve(server); 235 std::string ip = srs_dns_resolve(server);
213 if (ip.empty()) { 236 if (ip.empty()) {
214 ret = ERROR_SYSTEM_IP_INVALID; 237 ret = ERROR_SYSTEM_IP_INVALID;
@@ -231,46 +254,6 @@ int SrsForwarder::connect_server() @@ -231,46 +254,6 @@ int SrsForwarder::connect_server()
231 return ret; 254 return ret;
232 } 255 }
233 256
234 -int SrsForwarder::cycle()  
235 -{  
236 - int ret = ERROR_SUCCESS;  
237 -  
238 - client->set_recv_timeout(SRS_RECV_TIMEOUT_US);  
239 - client->set_send_timeout(SRS_SEND_TIMEOUT_US);  
240 -  
241 - if ((ret = connect_server()) != ERROR_SUCCESS) {  
242 - return ret;  
243 - }  
244 - srs_assert(client);  
245 -  
246 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
247 - srs_error("handshake with server failed. ret=%d", ret);  
248 - return ret;  
249 - }  
250 - if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {  
251 - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);  
252 - return ret;  
253 - }  
254 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
255 - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);  
256 - return ret;  
257 - }  
258 -  
259 - // TODO: FIXME: need to cache the metadata and sequence header when reconnect.  
260 -  
261 - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {  
262 - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",  
263 - stream_name.c_str(), stream_id, ret);  
264 - return ret;  
265 - }  
266 -  
267 - if ((ret = forward()) != ERROR_SUCCESS) {  
268 - return ret;  
269 - }  
270 -  
271 - return ret;  
272 -}  
273 -  
274 int SrsForwarder::forward() 257 int SrsForwarder::forward()
275 { 258 {
276 int ret = ERROR_SUCCESS; 259 int ret = ERROR_SUCCESS;
@@ -279,9 +262,7 @@ int SrsForwarder::forward() @@ -279,9 +262,7 @@ int SrsForwarder::forward()
279 262
280 SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER); 263 SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
281 264
282 - while (loop) {  
283 - pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);  
284 - 265 + while (true) {
285 // switch to other st-threads. 266 // switch to other st-threads.
286 st_usleep(0); 267 st_usleep(0);
287 268
@@ -303,7 +284,8 @@ int SrsForwarder::forward() @@ -303,7 +284,8 @@ int SrsForwarder::forward()
303 continue; 284 continue;
304 } 285 }
305 286
306 - // reportable 287 + // pithy print
  288 + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
307 if (pithy_print.can_print()) { 289 if (pithy_print.can_print()) {
308 srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", 290 srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
309 pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); 291 pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
@@ -345,43 +327,3 @@ int SrsForwarder::forward() @@ -345,43 +327,3 @@ int SrsForwarder::forward()
345 return ret; 327 return ret;
346 } 328 }
347 329
348 -void SrsForwarder::forward_cycle()  
349 -{  
350 - int ret = ERROR_SUCCESS;  
351 -  
352 - log_context->generate_id();  
353 - srs_trace("forward cycle start");  
354 -  
355 - while (loop) {  
356 - if ((ret = cycle()) != ERROR_SUCCESS) {  
357 - srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);  
358 - } else {  
359 - srs_info("forward cycle success, retry");  
360 - }  
361 -  
362 - if (!loop) {  
363 - break;  
364 - }  
365 -  
366 - st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);  
367 -  
368 - if ((ret = open_socket()) != ERROR_SUCCESS) {  
369 - srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);  
370 - } else {  
371 - srs_info("forward cycle reopen success");  
372 - }  
373 - }  
374 - srs_trace("forward cycle finished");  
375 -}  
376 -  
377 -void* SrsForwarder::forward_thread(void* arg)  
378 -{  
379 - SrsForwarder* obj = (SrsForwarder*)arg;  
380 - srs_assert(obj != NULL);  
381 -  
382 - obj->loop = true;  
383 - obj->forward_cycle();  
384 -  
385 - return NULL;  
386 -}  
387 -  
@@ -32,7 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,7 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <string> 32 #include <string>
33 #include <vector> 33 #include <vector>
34 34
35 -#include <st.h> 35 +#include <srs_core_thread.hpp>
36 36
37 class SrsSharedPtrMessage; 37 class SrsSharedPtrMessage;
38 class SrsOnMetaDataPacket; 38 class SrsOnMetaDataPacket;
@@ -42,7 +42,7 @@ class SrsRequest; @@ -42,7 +42,7 @@ class SrsRequest;
42 /** 42 /**
43 * forward the stream to other servers. 43 * forward the stream to other servers.
44 */ 44 */
45 -class SrsForwarder 45 +class SrsForwarder : public ISrsThreadHandler
46 { 46 {
47 private: 47 private:
48 std::string app; 48 std::string app;
@@ -53,8 +53,7 @@ private: @@ -53,8 +53,7 @@ private:
53 int port; 53 int port;
54 private: 54 private:
55 st_netfd_t stfd; 55 st_netfd_t stfd;
56 - st_thread_t tid;  
57 - bool loop; 56 + SrsThread* pthread;
58 private: 57 private:
59 SrsRtmpClient* client; 58 SrsRtmpClient* client;
60 std::vector<SrsSharedPtrMessage*> msgs; 59 std::vector<SrsSharedPtrMessage*> msgs;
@@ -67,14 +66,13 @@ public: @@ -67,14 +66,13 @@ public:
67 virtual int on_meta_data(SrsSharedPtrMessage* metadata); 66 virtual int on_meta_data(SrsSharedPtrMessage* metadata);
68 virtual int on_audio(SrsSharedPtrMessage* msg); 67 virtual int on_audio(SrsSharedPtrMessage* msg);
69 virtual int on_video(SrsSharedPtrMessage* msg); 68 virtual int on_video(SrsSharedPtrMessage* msg);
  69 +// interface ISrsThreadHandler.
  70 +public:
  71 + virtual int cycle();
70 private: 72 private:
71 - virtual int open_socket(); 73 + virtual void close_underlayer_socket();
72 virtual int connect_server(); 74 virtual int connect_server();
73 -private:  
74 - virtual int cycle();  
75 virtual int forward(); 75 virtual int forward();
76 - virtual void forward_cycle();  
77 - static void* forward_thread(void* arg);  
78 }; 76 };
79 77
80 #endif 78 #endif
@@ -184,15 +184,7 @@ void SrsHttpClient::disconnect() @@ -184,15 +184,7 @@ void SrsHttpClient::disconnect()
184 { 184 {
185 connected = false; 185 connected = false;
186 186
187 - if (stfd) {  
188 - int fd = st_netfd_fileno(stfd);  
189 - st_netfd_close(stfd);  
190 - stfd = NULL;  
191 -  
192 - // st does not close it sometimes,  
193 - // close it manually.  
194 - ::close(fd);  
195 - } 187 + srs_close_stfd(stfd);
196 } 188 }
197 189
198 int SrsHttpClient::connect(SrsHttpUri* uri) 190 int SrsHttpClient::connect(SrsHttpUri* uri)
@@ -36,7 +36,6 @@ class SrsSocket; @@ -36,7 +36,6 @@ class SrsSocket;
36 36
37 #include <string> 37 #include <string>
38 38
39 -#include <st.h>  
40 #include <http_parser.h> 39 #include <http_parser.h>
41 40
42 #define SRS_HTTP_HEADER_BUFFER 1024 41 #define SRS_HTTP_HEADER_BUFFER 1024
@@ -29,8 +29,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,8 +29,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 #include <string> 29 #include <string>
30 #include <map> 30 #include <map>
31 31
32 -#include <st.h>  
33 -  
34 ILogContext::ILogContext() 32 ILogContext::ILogContext()
35 { 33 {
36 } 34 }
@@ -33,8 +33,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,8 +33,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 #include <map> 33 #include <map>
34 #include <string> 34 #include <string>
35 35
36 -#include <st.h>  
37 -  
38 #include <srs_core_log.hpp> 36 #include <srs_core_log.hpp>
39 #include <srs_core_error.hpp> 37 #include <srs_core_error.hpp>
40 38
@@ -32,8 +32,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,8 +32,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 32
33 #include <string> 33 #include <string>
34 34
35 -#include <st.h>  
36 -  
37 class SrsProtocol; 35 class SrsProtocol;
38 class ISrsMessage; 36 class ISrsMessage;
39 class SrsCommonMessage; 37 class SrsCommonMessage;
@@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <algorithm> 31 #include <algorithm>
32 32
33 -#include <st.h>  
34 -  
35 #include <srs_core_log.hpp> 33 #include <srs_core_log.hpp>
36 #include <srs_core_error.hpp> 34 #include <srs_core_error.hpp>
37 #include <srs_core_client.hpp> 35 #include <srs_core_client.hpp>
@@ -49,23 +47,15 @@ SrsListener::SrsListener(SrsServer* _server, SrsListenerType _type) @@ -49,23 +47,15 @@ SrsListener::SrsListener(SrsServer* _server, SrsListenerType _type)
49 server = _server; 47 server = _server;
50 type = _type; 48 type = _type;
51 49
52 - tid = NULL;  
53 - loop = false; 50 + pthread = new SrsThread(this, 0);
54 } 51 }
55 52
56 SrsListener::~SrsListener() 53 SrsListener::~SrsListener()
57 { 54 {
58 - if (stfd) {  
59 - st_netfd_close(stfd);  
60 - stfd = NULL;  
61 - } 55 + srs_close_stfd(stfd);
62 56
63 - if (tid) {  
64 - loop = false;  
65 - st_thread_interrupt(tid);  
66 - st_thread_join(tid, NULL);  
67 - tid = NULL;  
68 - } 57 + pthread->stop();
  58 + srs_freep(pthread);
69 59
70 // st does not close it sometimes, 60 // st does not close it sometimes,
71 // close it manually. 61 // close it manually.
@@ -118,8 +108,7 @@ int SrsListener::listen(int _port) @@ -118,8 +108,7 @@ int SrsListener::listen(int _port)
118 } 108 }
119 srs_verbose("st open socket success. fd=%d", fd); 109 srs_verbose("st open socket success. fd=%d", fd);
120 110
121 - if ((tid = st_thread_create(listen_thread, this, 1, 0)) == NULL) {  
122 - ret = ERROR_ST_CREATE_LISTEN_THREAD; 111 + if ((ret = pthread->start()) != ERROR_SUCCESS) {
123 srs_error("st_thread_create listen thread error. ret=%d", ret); 112 srs_error("st_thread_create listen thread error. ret=%d", ret);
124 return ret; 113 return ret;
125 } 114 }
@@ -130,41 +119,32 @@ int SrsListener::listen(int _port) @@ -130,41 +119,32 @@ int SrsListener::listen(int _port)
130 return ret; 119 return ret;
131 } 120 }
132 121
133 -void SrsListener::listen_cycle() 122 +void SrsListener::on_enter_loop()
134 { 123 {
135 - int ret = ERROR_SUCCESS;  
136 -  
137 - log_context->generate_id();  
138 srs_trace("listen cycle start, port=%d, type=%d, fd=%d", port, type, fd); 124 srs_trace("listen cycle start, port=%d, type=%d, fd=%d", port, type, fd);
  125 +}
  126 +
  127 +int SrsListener::cycle()
  128 +{
  129 + int ret = ERROR_SUCCESS;
139 130
140 - while (loop) {  
141 st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); 131 st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
142 132
143 if(client_stfd == NULL){ 133 if(client_stfd == NULL){
144 // ignore error. 134 // ignore error.
145 srs_warn("ignore accept thread stoppped for accept client error"); 135 srs_warn("ignore accept thread stoppped for accept client error");
146 - continue; 136 + return ret;
147 } 137 }
148 srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); 138 srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
149 139
150 if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) { 140 if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) {
151 srs_warn("accept client error. ret=%d", ret); 141 srs_warn("accept client error. ret=%d", ret);
152 - continue; 142 + return ret;
153 } 143 }
154 144
155 srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); 145 srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
156 - }  
157 -}  
158 146
159 -void* SrsListener::listen_thread(void* arg)  
160 -{  
161 - SrsListener* obj = (SrsListener*)arg;  
162 - srs_assert(obj != NULL);  
163 -  
164 - obj->loop = true;  
165 - obj->listen_cycle();  
166 -  
167 - return NULL; 147 + return ret;
168 } 148 }
169 149
170 SrsServer::SrsServer() 150 SrsServer::SrsServer()
@@ -312,8 +292,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -312,8 +292,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
312 srs_error("exceed the max connections, drop client: " 292 srs_error("exceed the max connections, drop client: "
313 "clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd); 293 "clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);
314 294
315 - st_netfd_close(client_stfd);  
316 - ::close(fd); 295 + srs_close_stfd(client_stfd);
317 296
318 return ret; 297 return ret;
319 } 298 }
@@ -32,9 +32,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,9 +32,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 32
33 #include <vector> 33 #include <vector>
34 34
35 -#include <st.h>  
36 -  
37 #include <srs_core_reload.hpp> 35 #include <srs_core_reload.hpp>
  36 +#include <srs_core_thread.hpp>
38 37
39 class SrsServer; 38 class SrsServer;
40 class SrsConnection; 39 class SrsConnection;
@@ -45,7 +44,7 @@ enum SrsListenerType @@ -45,7 +44,7 @@ enum SrsListenerType
45 SrsListenerApi 44 SrsListenerApi
46 }; 45 };
47 46
48 -class SrsListener 47 +class SrsListener : public ISrsThreadHandler
49 { 48 {
50 public: 49 public:
51 SrsListenerType type; 50 SrsListenerType type;
@@ -54,16 +53,16 @@ private: @@ -54,16 +53,16 @@ private:
54 st_netfd_t stfd; 53 st_netfd_t stfd;
55 int port; 54 int port;
56 SrsServer* server; 55 SrsServer* server;
57 - st_thread_t tid;  
58 - bool loop; 56 + SrsThread* pthread;
59 public: 57 public:
60 SrsListener(SrsServer* _server, SrsListenerType _type); 58 SrsListener(SrsServer* _server, SrsListenerType _type);
61 virtual ~SrsListener(); 59 virtual ~SrsListener();
62 public: 60 public:
63 virtual int listen(int port); 61 virtual int listen(int port);
64 -private:  
65 - virtual void listen_cycle();  
66 - static void* listen_thread(void* arg); 62 +// interface ISrsThreadHandler.
  63 +public:
  64 + virtual void on_enter_loop();
  65 + virtual int cycle();
67 }; 66 };
68 67
69 class SrsServer : public SrsReloadHandler 68 class SrsServer : public SrsReloadHandler
@@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,8 +30,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
33 -#include <st.h>  
34 -  
35 /** 33 /**
36 * the socket provides TCP socket over st, 34 * the socket provides TCP socket over st,
37 * that is, the sync socket mechanism. 35 * that is, the sync socket mechanism.
@@ -425,6 +425,8 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -425,6 +425,8 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
425 425
426 metadata->metadata->set("server", new SrsAmf0String( 426 metadata->metadata->set("server", new SrsAmf0String(
427 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); 427 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  428 + metadata->metadata->set("contributor",
  429 + new SrsAmf0String(RTMP_SIG_SRS_CONTRIBUTOR));
428 430
429 SrsAmf0Any* prop = NULL; 431 SrsAmf0Any* prop = NULL;
430 if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) { 432 if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) {
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_thread.hpp>
  25 +
  26 +#include <srs_core_error.hpp>
  27 +#include <srs_core_log.hpp>
  28 +
  29 +ISrsThreadHandler::ISrsThreadHandler()
  30 +{
  31 +}
  32 +
  33 +ISrsThreadHandler::~ISrsThreadHandler()
  34 +{
  35 +}
  36 +
  37 +void ISrsThreadHandler::on_enter_loop()
  38 +{
  39 +}
  40 +
  41 +int ISrsThreadHandler::on_before_cycle()
  42 +{
  43 + int ret = ERROR_SUCCESS;
  44 + return ret;
  45 +}
  46 +
  47 +int ISrsThreadHandler::on_end_cycle()
  48 +{
  49 + int ret = ERROR_SUCCESS;
  50 + return ret;
  51 +}
  52 +
  53 +void ISrsThreadHandler::on_leave_loop()
  54 +{
  55 +}
  56 +
  57 +SrsThread::SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_ms)
  58 +{
  59 + handler = thread_handler;
  60 + cycle_interval_milliseconds = interval_ms;
  61 +
  62 + tid = NULL;
  63 + loop = false;
  64 +}
  65 +
  66 +SrsThread::~SrsThread()
  67 +{
  68 + stop();
  69 +}
  70 +
  71 +int SrsThread::start()
  72 +{
  73 + int ret = ERROR_SUCCESS;
  74 +
  75 + if(tid) {
  76 + srs_info("thread already running.");
  77 + return ret;
  78 + }
  79 +
  80 + if((tid = st_thread_create(thread_fun, this, 1, 0)) == NULL){
  81 + ret = ERROR_ST_CREATE_CYCLE_THREAD;
  82 + srs_error("st_thread_create failed. ret=%d", ret);
  83 + return ret;
  84 + }
  85 +
  86 + return ret;
  87 +}
  88 +
  89 +void SrsThread::stop()
  90 +{
  91 + if (tid) {
  92 + loop = false;
  93 +
  94 + // the interrupt will cause the socket to read/write error,
  95 + // which will terminate the cycle thread.
  96 + st_thread_interrupt(tid);
  97 +
  98 + // wait the thread to exit.
  99 + st_thread_join(tid, NULL);
  100 +
  101 + tid = NULL;
  102 + }
  103 +}
  104 +
  105 +void SrsThread::thread_cycle()
  106 +{
  107 + int ret = ERROR_SUCCESS;
  108 +
  109 + srs_assert(handler);
  110 +
  111 + log_context->generate_id();
  112 + srs_trace("thread cycle start");
  113 +
  114 + handler->on_end_cycle();
  115 +
  116 + loop = true;
  117 + while (loop) {
  118 + if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
  119 + srs_warn("thread on before cycle failed, ignored and retry, ret=%d", ret);
  120 + goto failed;
  121 + }
  122 + srs_info("thread on before cycle success");
  123 +
  124 + if ((ret = handler->cycle()) != ERROR_SUCCESS) {
  125 + srs_warn("thread cycle failed, ignored and retry, ret=%d", ret);
  126 + goto failed;
  127 + }
  128 + srs_info("thread cycle success");
  129 +
  130 + if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
  131 + srs_warn("thread on end cycle failed, ignored and retry, ret=%d", ret);
  132 + goto failed;
  133 + }
  134 + srs_info("thread on end cycle success");
  135 +
  136 +failed:
  137 + if (!loop) {
  138 + break;
  139 + }
  140 +
  141 + st_usleep(cycle_interval_milliseconds * 1000);
  142 + }
  143 +
  144 + handler->on_leave_loop();
  145 + srs_trace("thread cycle finished");
  146 +}
  147 +
  148 +void* SrsThread::thread_fun(void* arg)
  149 +{
  150 + SrsThread* obj = (SrsThread*)arg;
  151 + srs_assert(obj);
  152 +
  153 + obj->thread_cycle();
  154 +
  155 + return NULL;
  156 +}
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#ifndef SRS_CORE_THREAD_HPP
  25 +#define SRS_CORE_THREAD_HPP
  26 +
  27 +/*
  28 +#include <srs_core_thread.hpp>
  29 +*/
  30 +#include <srs_core.hpp>
  31 +
  32 +/**
  33 +* the handler for the thread, callback interface.
  34 +* the thread model defines as:
  35 +* handler->on_enter_loop()
  36 +* while loop:
  37 +* handler->on_before_cycle()
  38 +* handler->cycle()
  39 +* handler->on_end_cycle()
  40 +* if !loop then break for user stop thread.
  41 +* sleep(CycleIntervalMilliseconds)
  42 +* handler->on_leave_loop()
  43 +* when stop, the thread will interrupt the st_thread,
  44 +* which will cause the socket to return error and
  45 +* terminate the cycle thread.
  46 +*/
  47 +class ISrsThreadHandler
  48 +{
  49 +public:
  50 + ISrsThreadHandler();
  51 + virtual ~ISrsThreadHandler();
  52 +public:
  53 + virtual void on_enter_loop();
  54 + virtual int on_before_cycle();
  55 + virtual int cycle() = 0;
  56 + virtual int on_end_cycle();
  57 + virtual void on_leave_loop();
  58 +};
  59 +
  60 +/**
  61 +* provides servies from st_thread_t,
  62 +* for common thread usage.
  63 +*/
  64 +class SrsThread
  65 +{
  66 +private:
  67 + st_thread_t tid;
  68 + bool loop;
  69 +private:
  70 + ISrsThreadHandler* handler;
  71 + int64_t cycle_interval_milliseconds;
  72 +public:
  73 + /**
  74 + * initialize the thread.
  75 + * @param thread_handler, the cycle handler for the thread.
  76 + * @param interval_ms, the sleep interval when cycle finished.
  77 + */
  78 + SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_ms);
  79 + virtual ~SrsThread();
  80 +public:
  81 + /**
  82 + * start the thread, invoke the cycle of handler util
  83 + * user stop the thread.
  84 + * @remark ignore any error of cycle of handler.
  85 + * @remark user can start multiple times, ignore if already started.
  86 + */
  87 + virtual int start();
  88 + /**
  89 + * stop the thread, wait for the thread to terminate.
  90 + * @remark user can stop multiple times, ignore if already stopped.
  91 + */
  92 + virtual void stop();
  93 +private:
  94 + virtual void thread_cycle();
  95 + static void* thread_fun(void* arg);
  96 +};
  97 +
  98 +#endif
@@ -6,52 +6,54 @@ file @@ -6,52 +6,54 @@ file
6 core readonly separator, 6 core readonly separator,
7 ..\core\srs_core.hpp, 7 ..\core\srs_core.hpp,
8 ..\core\srs_core.cpp, 8 ..\core\srs_core.cpp,
9 - ..\core\srs_core_error.hpp,  
10 - ..\core\srs_core_error.cpp, 9 + ..\core\srs_core_amf0.hpp,
  10 + ..\core\srs_core_amf0.cpp,
11 ..\core\srs_core_autofree.hpp, 11 ..\core\srs_core_autofree.hpp,
12 ..\core\srs_core_autofree.cpp, 12 ..\core\srs_core_autofree.cpp,
13 - ..\core\srs_core_server.hpp,  
14 - ..\core\srs_core_server.cpp,  
15 - ..\core\srs_core_reload.hpp,  
16 - ..\core\srs_core_reload.cpp, 13 + ..\core\srs_core_buffer.hpp,
  14 + ..\core\srs_core_buffer.cpp,
  15 + ..\core\srs_core_client.hpp,
  16 + ..\core\srs_core_client.cpp,
  17 + ..\core\srs_core_codec.hpp,
  18 + ..\core\srs_core_codec.cpp,
17 ..\core\srs_core_config.hpp, 19 ..\core\srs_core_config.hpp,
18 ..\core\srs_core_config.cpp, 20 ..\core\srs_core_config.cpp,
19 - ..\core\srs_core_refer.hpp,  
20 - ..\core\srs_core_refer.cpp,  
21 ..\core\srs_core_conn.hpp, 21 ..\core\srs_core_conn.hpp,
22 ..\core\srs_core_conn.cpp, 22 ..\core\srs_core_conn.cpp,
23 - ..\core\srs_core_client.hpp,  
24 - ..\core\srs_core_client.cpp,  
25 - ..\core\srs_core_http.hpp,  
26 - ..\core\srs_core_http.cpp,  
27 - ..\core\srs_core_source.hpp,  
28 - ..\core\srs_core_source.cpp,  
29 - ..\core\srs_core_forward.hpp,  
30 - ..\core\srs_core_forward.cpp,  
31 ..\core\srs_core_encoder.hpp, 23 ..\core\srs_core_encoder.hpp,
32 ..\core\srs_core_encoder.cpp, 24 ..\core\srs_core_encoder.cpp,
33 - ..\core\srs_core_hls.hpp,  
34 - ..\core\srs_core_hls.cpp,  
35 - ..\core\srs_core_codec.hpp,  
36 - ..\core\srs_core_codec.cpp,  
37 - ..\core\srs_core_rtmp.hpp,  
38 - ..\core\srs_core_rtmp.cpp, 25 + ..\core\srs_core_error.hpp,
  26 + ..\core\srs_core_error.cpp,
  27 + ..\core\srs_core_forward.hpp,
  28 + ..\core\srs_core_forward.cpp,
39 ..\core\srs_core_handshake.hpp, 29 ..\core\srs_core_handshake.hpp,
40 ..\core\srs_core_handshake.cpp, 30 ..\core\srs_core_handshake.cpp,
  31 + ..\core\srs_core_hls.hpp,
  32 + ..\core\srs_core_hls.cpp,
  33 + ..\core\srs_core_http.hpp,
  34 + ..\core\srs_core_http.cpp,
  35 + ..\core\srs_core_log.hpp,
  36 + ..\core\srs_core_log.cpp,
  37 + ..\core\srs_core_pithy_print.hpp,
  38 + ..\core\srs_core_pithy_print.cpp,
41 ..\core\srs_core_protocol.hpp, 39 ..\core\srs_core_protocol.hpp,
42 ..\core\srs_core_protocol.cpp, 40 ..\core\srs_core_protocol.cpp,
43 - ..\core\srs_core_amf0.hpp,  
44 - ..\core\srs_core_amf0.cpp, 41 + ..\core\srs_core_refer.hpp,
  42 + ..\core\srs_core_refer.cpp,
  43 + ..\core\srs_core_reload.hpp,
  44 + ..\core\srs_core_reload.cpp,
  45 + ..\core\srs_core_rtmp.hpp,
  46 + ..\core\srs_core_rtmp.cpp,
  47 + ..\core\srs_core_thread.hpp,
  48 + ..\core\srs_core_thread.cpp,
  49 + ..\core\srs_core_server.hpp,
  50 + ..\core\srs_core_server.cpp,
45 ..\core\srs_core_stream.hpp, 51 ..\core\srs_core_stream.hpp,
46 ..\core\srs_core_stream.cpp, 52 ..\core\srs_core_stream.cpp,
47 ..\core\srs_core_socket.hpp, 53 ..\core\srs_core_socket.hpp,
48 ..\core\srs_core_socket.cpp, 54 ..\core\srs_core_socket.cpp,
49 - ..\core\srs_core_buffer.hpp,  
50 - ..\core\srs_core_buffer.cpp,  
51 - ..\core\srs_core_pithy_print.hpp,  
52 - ..\core\srs_core_pithy_print.cpp,  
53 - ..\core\srs_core_log.hpp,  
54 - ..\core\srs_core_log.cpp, 55 + ..\core\srs_core_source.hpp,
  56 + ..\core\srs_core_source.cpp,
55 research readonly separator, 57 research readonly separator,
56 ..\..\research\ts_info.cc; 58 ..\..\research\ts_info.cc;
57 59