winlin

update encoder framework

@@ -117,7 +117,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -117,7 +117,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
117 * nginx v1.5.0: 139524 lines <br/> 117 * nginx v1.5.0: 139524 lines <br/>
118 118
119 ### History 119 ### History
120 -* v0.7, 2013-11-30, add transcoding params to config. 120 +* v0.7, 2013-11-30, support live stream transcoding by ffmpeg.
121 * v0.7, 2013-11-30, support --with/without -ffmpeg, build ffmpeg-2.1. 121 * v0.7, 2013-11-30, support --with/without -ffmpeg, build ffmpeg-2.1.
122 * v0.7, 2013-11-30, add ffmpeg-2.1, x264-core138, lame-3.99.5, libaacplus-2.0.2. 122 * v0.7, 2013-11-30, add ffmpeg-2.1, x264-core138, lame-3.99.5, libaacplus-2.0.2.
123 * v0.6, 2013-11-29, v0.6 released. 16094 lines. 123 * v0.6, 2013-11-29, v0.6 released. 16094 lines.
@@ -568,7 +568,7 @@ SrsConfDirective* SrsConfig::get_vhost_enabled(std::string vhost) @@ -568,7 +568,7 @@ SrsConfDirective* SrsConfig::get_vhost_enabled(std::string vhost)
568 return conf->get("enabled"); 568 return conf->get("enabled");
569 } 569 }
570 570
571 -SrsConfDirective* SrsConfig::get_transcode(std::string vhost) 571 +SrsConfDirective* SrsConfig::get_transcode(std::string vhost, std::string scope)
572 { 572 {
573 SrsConfDirective* conf = get_vhost(vhost); 573 SrsConfDirective* conf = get_vhost(vhost);
574 574
@@ -576,7 +576,16 @@ SrsConfDirective* SrsConfig::get_transcode(std::string vhost) @@ -576,7 +576,16 @@ SrsConfDirective* SrsConfig::get_transcode(std::string vhost)
576 return NULL; 576 return NULL;
577 } 577 }
578 578
579 - return conf->get("transcode"); 579 + SrsConfDirective* transcode = conf->get("transcode");
  580 + if (!transcode) {
  581 + return NULL;
  582 + }
  583 +
  584 + if (transcode->arg0() == scope) {
  585 + return transcode;
  586 + }
  587 +
  588 + return NULL;
580 } 589 }
581 590
582 SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) 591 SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost)
@@ -114,7 +114,7 @@ public: @@ -114,7 +114,7 @@ public:
114 virtual int parse_options(int argc, char** argv); 114 virtual int parse_options(int argc, char** argv);
115 virtual SrsConfDirective* get_vhost(std::string vhost); 115 virtual SrsConfDirective* get_vhost(std::string vhost);
116 virtual SrsConfDirective* get_vhost_enabled(std::string vhost); 116 virtual SrsConfDirective* get_vhost_enabled(std::string vhost);
117 - virtual SrsConfDirective* get_transcode(std::string vhost); 117 + virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope);
118 virtual SrsConfDirective* get_gop_cache(std::string vhost); 118 virtual SrsConfDirective* get_gop_cache(std::string vhost);
119 virtual SrsConfDirective* get_forward(std::string vhost); 119 virtual SrsConfDirective* get_forward(std::string vhost);
120 virtual SrsConfDirective* get_hls(std::string vhost); 120 virtual SrsConfDirective* get_hls(std::string vhost);
@@ -25,22 +25,89 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,22 +25,89 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #include <srs_core_error.hpp> 26 #include <srs_core_error.hpp>
27 #include <srs_core_log.hpp> 27 #include <srs_core_log.hpp>
  28 +#include <srs_core_config.hpp>
  29 +
  30 +#define SRS_ENCODER_SLEEP_MS 2000
28 31
29 SrsEncoder::SrsEncoder() 32 SrsEncoder::SrsEncoder()
30 { 33 {
  34 + tid = NULL;
  35 + loop = false;
31 } 36 }
32 37
33 SrsEncoder::~SrsEncoder() 38 SrsEncoder::~SrsEncoder()
34 { 39 {
  40 + on_unpublish();
35 } 41 }
36 42
37 -int SrsEncoder::on_publish(std::string vhost, std::string app, std::string stream) 43 +int SrsEncoder::on_publish(std::string _vhost, std::string _app, std::string _stream)
38 { 44 {
39 int ret = ERROR_SUCCESS; 45 int ret = ERROR_SUCCESS;
  46 +
  47 + vhost = _vhost;
  48 + app = _app;
  49 + stream = _stream;
  50 +
  51 + srs_assert(!tid);
  52 + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL){
  53 + ret = ERROR_ST_CREATE_FORWARD_THREAD;
  54 + srs_error("st_thread_create failed. ret=%d", ret);
  55 + return ret;
  56 + }
  57 +
40 return ret; 58 return ret;
41 } 59 }
42 60
43 void SrsEncoder::on_unpublish() 61 void SrsEncoder::on_unpublish()
44 { 62 {
  63 + if (tid) {
  64 + loop = false;
  65 + st_thread_interrupt(tid);
  66 + st_thread_join(tid, NULL);
  67 + tid = NULL;
  68 + }
  69 +}
  70 +
  71 +int SrsEncoder::cycle()
  72 +{
  73 + int ret = ERROR_SUCCESS;
  74 + return ret;
  75 +}
  76 +
  77 +void SrsEncoder::encoder_cycle()
  78 +{
  79 + int ret = ERROR_SUCCESS;
  80 +
  81 + log_context->generate_id();
  82 + srs_trace("encoder cycle start");
  83 +
  84 + while (loop) {
  85 + if ((ret = cycle()) != ERROR_SUCCESS) {
  86 + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
  87 + } else {
  88 + srs_info("encoder cycle success, retry");
  89 + }
  90 +
  91 + if (!loop) {
  92 + break;
  93 + }
  94 +
  95 + st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
  96 + }
  97 +
  98 + // TODO: kill ffmpeg when finished and it alive
  99 +
  100 + srs_trace("encoder cycle finished");
  101 +}
  102 +
  103 +void* SrsEncoder::encoder_thread(void* arg)
  104 +{
  105 + SrsEncoder* obj = (SrsEncoder*)arg;
  106 + srs_assert(obj != NULL);
  107 +
  108 + obj->loop = true;
  109 + obj->encoder_cycle();
  110 +
  111 + return NULL;
45 } 112 }
46 113
@@ -31,14 +31,27 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,14 +31,27 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 31
32 #include <string> 32 #include <string>
33 33
  34 +#include <st.h>
  35 +
34 class SrsEncoder 36 class SrsEncoder
35 { 37 {
  38 +private:
  39 + std::string vhost;
  40 + std::string app;
  41 + std::string stream;
  42 +private:
  43 + st_thread_t tid;
  44 + bool loop;
36 public: 45 public:
37 SrsEncoder(); 46 SrsEncoder();
38 virtual ~SrsEncoder(); 47 virtual ~SrsEncoder();
39 public: 48 public:
40 virtual int on_publish(std::string vhost, std::string app, std::string stream); 49 virtual int on_publish(std::string vhost, std::string app, std::string stream);
41 virtual void on_unpublish(); 50 virtual void on_unpublish();
  51 +private:
  52 + virtual int cycle();
  53 + virtual void encoder_cycle();
  54 + static void* encoder_thread(void* arg);
42 }; 55 };
43 56
44 #endif 57 #endif
@@ -43,10 +43,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -43,10 +43,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
43 SrsForwarder::SrsForwarder() 43 SrsForwarder::SrsForwarder()
44 { 44 {
45 client = NULL; 45 client = NULL;
46 - tid = NULL;  
47 stfd = NULL; 46 stfd = NULL;
48 - loop = false;  
49 stream_id = 0; 47 stream_id = 0;
  48 +
  49 + tid = NULL;
  50 + loop = false;
50 } 51 }
51 52
52 SrsForwarder::~SrsForwarder() 53 SrsForwarder::~SrsForwarder()
@@ -221,7 +222,7 @@ std::string SrsForwarder::parse_server(std::string host) @@ -221,7 +222,7 @@ std::string SrsForwarder::parse_server(std::string host)
221 return ipv4; 222 return ipv4;
222 } 223 }
223 224
224 -int SrsForwarder::forward_cycle_imp() 225 +int SrsForwarder::cycle()
225 { 226 {
226 int ret = ERROR_SUCCESS; 227 int ret = ERROR_SUCCESS;
227 228
@@ -316,7 +317,7 @@ void SrsForwarder::forward_cycle() @@ -316,7 +317,7 @@ void SrsForwarder::forward_cycle()
316 srs_trace("forward cycle start"); 317 srs_trace("forward cycle start");
317 318
318 while (loop) { 319 while (loop) {
319 - if ((ret = forward_cycle_imp()) != ERROR_SUCCESS) { 320 + if ((ret = cycle()) != ERROR_SUCCESS) {
320 srs_warn("forward cycle failed, ignored and retry, ret=%d", ret); 321 srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
321 } else { 322 } else {
322 srs_info("forward cycle success, retry"); 323 srs_info("forward cycle success, retry");
@@ -71,7 +71,7 @@ private: @@ -71,7 +71,7 @@ private:
71 virtual int connect_server(); 71 virtual int connect_server();
72 std::string parse_server(std::string host); 72 std::string parse_server(std::string host);
73 private: 73 private:
74 - virtual int forward_cycle_imp(); 74 + virtual int cycle();
75 virtual int forward(); 75 virtual int forward();
76 virtual void forward_cycle(); 76 virtual void forward_cycle();
77 static void* forward_thread(void* arg); 77 static void* forward_thread(void* arg);