继续操作前请注册或者登录。
winlin

support forward stream to origin/edge

@@ -47,14 +47,16 @@ m3u8 url: http://127.0.0.1:80/live/livestream.m3u8 @@ -47,14 +47,16 @@ m3u8 url: http://127.0.0.1:80/live/livestream.m3u8
47 8. support cache last gop for flash player to fast startup.<br/> 47 8. support cache last gop for flash player to fast startup.<br/>
48 9. support listen at multiple ports.<br/> 48 9. support listen at multiple ports.<br/>
49 10. support long time(>4.6hours) publish/play.<br/> 49 10. support long time(>4.6hours) publish/play.<br/>
50 -11. [dev] support forward publish stream to build active-standby cluster.<br/>  
51 -12. [plan] support live stream transcoding by ffmpeg.<br/>  
52 -13. [plan] support full http callback api.<br/>  
53 -14. [plan] support network based cli and json result.<br/>  
54 -15. [plan] support bandwidth test api and flash client.<br/>  
55 -16. no edge server, origin server only.<br/>  
56 -17. no vod streaming, live streaming only.<br/>  
57 -18. no multiple processes, single process only.<br/> 50 +11. high performace, 1800 connections(500kbps), 900Mbps, CPU 90.2%, 41MB<br/>
  51 +12. support forward publish stream to build active-standby cluster.<br/>
  52 +13. support broadcast by forward the stream to other servers(origin/edge).<br/>
  53 +14. [plan] support live stream transcoding by ffmpeg.<br/>
  54 +15. [plan] support full http callback api.<br/>
  55 +16. [plan] support network based cli and json result.<br/>
  56 +17. [plan] support bandwidth test api and flash client.<br/>
  57 +18. no edge server, origin server only.<br/>
  58 +19. no vod streaming, live streaming only.<br/>
  59 +20. no multiple processes, single process only.<br/>
58 60
59 ### Performance 61 ### Performance
60 1. 300 connections, 150Mbps, 500kbps, CPU 18.8%, 5956KB. 62 1. 300 connections, 150Mbps, 500kbps, CPU 18.8%, 5956KB.
@@ -15,7 +15,7 @@ vhost __defaultVhost__ { @@ -15,7 +15,7 @@ vhost __defaultVhost__ {
15 hls_path ./objs/nginx/html; 15 hls_path ./objs/nginx/html;
16 hls_fragment 5; 16 hls_fragment 5;
17 hls_window 30; 17 hls_window 30;
18 - forward 192.168.1.50; 18 + forward 127.0.0.1:1936;
19 } 19 }
20 # the vhost which forward publish streams. 20 # the vhost which forward publish streams.
21 vhost forward.vhost.com { 21 vhost forward.vhost.com {
@@ -104,5 +104,8 @@ pithy_print { @@ -104,5 +104,8 @@ pithy_print {
104 # shared print interval for all play clients, in milliseconds. 104 # shared print interval for all play clients, in milliseconds.
105 # if not specified, set to 1300. 105 # if not specified, set to 1300.
106 play 3000; 106 play 3000;
  107 + # shared print interval for all forwarders, in milliseconds.
  108 + # if not specified, set to 2000.
  109 + forwarder 3000;
107 } 110 }
108 111
@@ -681,6 +681,16 @@ SrsConfDirective* SrsConfig::get_pithy_print_publish() @@ -681,6 +681,16 @@ SrsConfDirective* SrsConfig::get_pithy_print_publish()
681 return pithy->get("publish"); 681 return pithy->get("publish");
682 } 682 }
683 683
  684 +SrsConfDirective* SrsConfig::get_pithy_print_forwarder()
  685 +{
  686 + SrsConfDirective* pithy = root->get("pithy_print");
  687 + if (!pithy) {
  688 + return NULL;
  689 + }
  690 +
  691 + return pithy->get("forwarder");
  692 +}
  693 +
684 SrsConfDirective* SrsConfig::get_pithy_print_play() 694 SrsConfDirective* SrsConfig::get_pithy_print_play()
685 { 695 {
686 SrsConfDirective* pithy = root->get("pithy_print"); 696 SrsConfDirective* pithy = root->get("pithy_print");
@@ -126,6 +126,7 @@ public: @@ -126,6 +126,7 @@ public:
126 virtual SrsConfDirective* get_listen(); 126 virtual SrsConfDirective* get_listen();
127 virtual SrsConfDirective* get_chunk_size(); 127 virtual SrsConfDirective* get_chunk_size();
128 virtual SrsConfDirective* get_pithy_print_publish(); 128 virtual SrsConfDirective* get_pithy_print_publish();
  129 + virtual SrsConfDirective* get_pithy_print_forwarder();
129 virtual SrsConfDirective* get_pithy_print_play(); 130 virtual SrsConfDirective* get_pithy_print_play();
130 private: 131 private:
131 virtual int parse_file(const char* filename); 132 virtual int parse_file(const char* filename);
@@ -32,7 +32,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,7 +32,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <srs_core_error.hpp> 32 #include <srs_core_error.hpp>
33 #include <srs_core_rtmp.hpp> 33 #include <srs_core_rtmp.hpp>
34 #include <srs_core_log.hpp> 34 #include <srs_core_log.hpp>
  35 +#include <srs_core_protocol.hpp>
  36 +#include <srs_core_pithy_print.hpp>
35 37
  38 +#define SRS_PULSE_TIMEOUT_MS 100
36 #define SRS_FORWARDER_SLEEP_MS 2000 39 #define SRS_FORWARDER_SLEEP_MS 2000
37 #define SRS_SEND_TIMEOUT_US 3000000L 40 #define SRS_SEND_TIMEOUT_US 3000000L
38 #define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US 41 #define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
@@ -49,6 +52,13 @@ SrsForwarder::SrsForwarder() @@ -49,6 +52,13 @@ SrsForwarder::SrsForwarder()
49 SrsForwarder::~SrsForwarder() 52 SrsForwarder::~SrsForwarder()
50 { 53 {
51 on_unpublish(); 54 on_unpublish();
  55 +
  56 + std::vector<SrsSharedPtrMessage*>::iterator it;
  57 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  58 + SrsSharedPtrMessage* msg = *it;
  59 + srs_freep(msg);
  60 + }
  61 + msgs.clear();
52 } 62 }
53 63
54 int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server) 64 int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
@@ -111,18 +121,27 @@ void SrsForwarder::on_unpublish() @@ -111,18 +121,27 @@ void SrsForwarder::on_unpublish()
111 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) 121 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
112 { 122 {
113 int ret = ERROR_SUCCESS; 123 int ret = ERROR_SUCCESS;
  124 +
  125 + msgs.push_back(metadata);
  126 +
114 return ret; 127 return ret;
115 } 128 }
116 129
117 int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) 130 int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
118 { 131 {
119 int ret = ERROR_SUCCESS; 132 int ret = ERROR_SUCCESS;
  133 +
  134 + msgs.push_back(msg);
  135 +
120 return ret; 136 return ret;
121 } 137 }
122 138
123 int SrsForwarder::on_video(SrsSharedPtrMessage* msg) 139 int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
124 { 140 {
125 int ret = ERROR_SUCCESS; 141 int ret = ERROR_SUCCESS;
  142 +
  143 + msgs.push_back(msg);
  144 +
126 return ret; 145 return ret;
127 } 146 }
128 147
@@ -232,6 +251,60 @@ int SrsForwarder::forward_cycle_imp() @@ -232,6 +251,60 @@ int SrsForwarder::forward_cycle_imp()
232 return ret; 251 return ret;
233 } 252 }
234 253
  254 + if ((ret = forward()) != ERROR_SUCCESS) {
  255 + return ret;
  256 + }
  257 +
  258 + return ret;
  259 +}
  260 +
  261 +int SrsForwarder::forward()
  262 +{
  263 + int ret = ERROR_SUCCESS;
  264 +
  265 + client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
  266 +
  267 + SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
  268 +
  269 + while (loop) {
  270 + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
  271 +
  272 + // switch to other st-threads.
  273 + st_usleep(0);
  274 +
  275 + // read from client.
  276 + if (true) {
  277 + SrsCommonMessage* msg = NULL;
  278 + ret = client->recv_message(&msg);
  279 +
  280 + srs_verbose("play loop recv message. ret=%d", ret);
  281 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
  282 + srs_error("recv server control message failed. ret=%d", ret);
  283 + return ret;
  284 + }
  285 + }
  286 +
  287 + int count = (int)msgs.size();
  288 +
  289 + // reportable
  290 + if (pithy_print.can_print()) {
  291 + srs_trace("-> clock=%u, time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  292 + (int)(srs_get_system_time_ms()/1000), pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
  293 + }
  294 +
  295 + // all msgs to forward.
  296 + for (int i = 0; i < count; i++) {
  297 + SrsSharedPtrMessage* msg = msgs[i];
  298 + msgs[i] = NULL;
  299 +
  300 + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
  301 + srs_error("forwarder send message to server failed. ret=%d", ret);
  302 + return ret;
  303 + }
  304 + }
  305 + msgs.clear();
  306 + }
  307 +
235 return ret; 308 return ret;
236 } 309 }
237 310
@@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
32 #include <string> 32 #include <string>
  33 +#include <vector>
33 34
34 #include <st.h> 35 #include <st.h>
35 36
@@ -55,6 +56,7 @@ private: @@ -55,6 +56,7 @@ private:
55 bool loop; 56 bool loop;
56 private: 57 private:
57 SrsRtmpClient* client; 58 SrsRtmpClient* client;
  59 + std::vector<SrsSharedPtrMessage*> msgs;
58 public: 60 public:
59 SrsForwarder(); 61 SrsForwarder();
60 virtual ~SrsForwarder(); 62 virtual ~SrsForwarder();
@@ -70,6 +72,7 @@ private: @@ -70,6 +72,7 @@ private:
70 std::string parse_server(std::string host); 72 std::string parse_server(std::string host);
71 private: 73 private:
72 virtual int forward_cycle_imp(); 74 virtual int forward_cycle_imp();
  75 + virtual int forward();
73 virtual void forward_cycle(); 76 virtual void forward_cycle();
74 static void* forward_thread(void* arg); 77 static void* forward_thread(void* arg);
75 }; 78 };
@@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #define SRS_STAGE_DEFAULT_INTERVAL_MS 1200 34 #define SRS_STAGE_DEFAULT_INTERVAL_MS 1200
35 #define SRS_STAGE_PLAY_USER_INTERVAL_MS 1300 35 #define SRS_STAGE_PLAY_USER_INTERVAL_MS 1300
36 #define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100 36 #define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100
  37 +#define SRS_STAGE_FORWARDER_INTERVAL_MS 2000
37 38
38 struct SrsStageInfo : public SrsReloadHandler 39 struct SrsStageInfo : public SrsReloadHandler
39 { 40 {
@@ -73,6 +74,14 @@ struct SrsStageInfo : public SrsReloadHandler @@ -73,6 +74,14 @@ struct SrsStageInfo : public SrsReloadHandler
73 } 74 }
74 break; 75 break;
75 } 76 }
  77 + case SRS_STAGE_FORWARDER: {
  78 + pithy_print_time_ms = SRS_STAGE_FORWARDER_INTERVAL_MS;
  79 + SrsConfDirective* conf = config->get_pithy_print_forwarder();
  80 + if (conf && !conf->arg0().empty()) {
  81 + pithy_print_time_ms = ::atoi(conf->arg0().c_str());
  82 + }
  83 + break;
  84 + }
76 default: { 85 default: {
77 pithy_print_time_ms = SRS_STAGE_DEFAULT_INTERVAL_MS; 86 pithy_print_time_ms = SRS_STAGE_DEFAULT_INTERVAL_MS;
78 break; 87 break;
@@ -34,6 +34,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #define SRS_STAGE_PLAY_USER 1 34 #define SRS_STAGE_PLAY_USER 1
35 // the pithy stage for all publish clients. 35 // the pithy stage for all publish clients.
36 #define SRS_STAGE_PUBLISH_USER 2 36 #define SRS_STAGE_PUBLISH_USER 2
  37 +// the pithy stage for all forward clients.
  38 +#define SRS_STAGE_FORWARDER 3
37 39
38 /** 40 /**
39 * the stage is used for a collection of object to do print, 41 * the stage is used for a collection of object to do print,
@@ -184,6 +184,36 @@ void SrsRtmpClient::set_send_timeout(int64_t timeout_us) @@ -184,6 +184,36 @@ void SrsRtmpClient::set_send_timeout(int64_t timeout_us)
184 protocol->set_send_timeout(timeout_us); 184 protocol->set_send_timeout(timeout_us);
185 } 185 }
186 186
  187 +int64_t SrsRtmpClient::get_recv_bytes()
  188 +{
  189 + return protocol->get_recv_bytes();
  190 +}
  191 +
  192 +int64_t SrsRtmpClient::get_send_bytes()
  193 +{
  194 + return protocol->get_send_bytes();
  195 +}
  196 +
  197 +int SrsRtmpClient::get_recv_kbps()
  198 +{
  199 + return protocol->get_recv_kbps();
  200 +}
  201 +
  202 +int SrsRtmpClient::get_send_kbps()
  203 +{
  204 + return protocol->get_send_kbps();
  205 +}
  206 +
  207 +int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)
  208 +{
  209 + return protocol->recv_message(pmsg);
  210 +}
  211 +
  212 +int SrsRtmpClient::send_message(ISrsMessage* msg)
  213 +{
  214 + return protocol->send_message(msg);
  215 +}
  216 +
187 int SrsRtmpClient::handshake() 217 int SrsRtmpClient::handshake()
188 { 218 {
189 int ret = ERROR_SUCCESS; 219 int ret = ERROR_SUCCESS;
@@ -108,6 +108,12 @@ public: @@ -108,6 +108,12 @@ public:
108 public: 108 public:
109 virtual void set_recv_timeout(int64_t timeout_us); 109 virtual void set_recv_timeout(int64_t timeout_us);
110 virtual void set_send_timeout(int64_t timeout_us); 110 virtual void set_send_timeout(int64_t timeout_us);
  111 + virtual int64_t get_recv_bytes();
  112 + virtual int64_t get_send_bytes();
  113 + virtual int get_recv_kbps();
  114 + virtual int get_send_kbps();
  115 + virtual int recv_message(SrsCommonMessage** pmsg);
  116 + virtual int send_message(ISrsMessage* msg);
111 public: 117 public:
112 virtual int handshake(); 118 virtual int handshake();
113 virtual int connect_app(std::string app, std::string tc_url); 119 virtual int connect_app(std::string app, std::string tc_url);