winlin

support basic edge(play/publish) RTMP server. 0.9.78

@@ -192,6 +192,7 @@ Supported operating systems and hardware: @@ -192,6 +192,7 @@ Supported operating systems and hardware:
192 * 2013-10-17, Created.<br/> 192 * 2013-10-17, Created.<br/>
193 193
194 ## History 194 ## History
  195 +* v1.0, 2014-04-27, support basic edge(play/publish) RTMP server. 0.9.78
195 * v1.0, 2014-04-25, add donation page. 0.9.76 196 * v1.0, 2014-04-25, add donation page. 0.9.76
196 * v1.0, 2014-04-24, support live flashP2P(integrated by chnvideo VDN). 0.9.75 197 * v1.0, 2014-04-24, support live flashP2P(integrated by chnvideo VDN). 0.9.75
197 * v1.0, 2014-04-21, support android app to start srs for internal edge. 0.9.72 198 * v1.0, 2014-04-21, support android app to start srs for internal edge. 0.9.72
@@ -49,6 +49,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -49,6 +49,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
49 // when edge timeout, retry next. 49 // when edge timeout, retry next.
50 #define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL) 50 #define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL)
51 51
  52 +// when error, edge ingester sleep for a while and retry.
  53 +#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(1*1000*1000LL)
  54 +
  55 +// when edge timeout, retry next.
  56 +#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL)
  57 +
52 SrsEdgeIngester::SrsEdgeIngester() 58 SrsEdgeIngester::SrsEdgeIngester()
53 { 59 {
54 io = NULL; 60 io = NULL;
@@ -316,6 +322,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() @@ -316,6 +322,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
316 origin_index = 0; 322 origin_index = 0;
317 stream_id = 0; 323 stream_id = 0;
318 stfd = NULL; 324 stfd = NULL;
  325 + pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US);
319 } 326 }
320 327
321 SrsEdgeForwarder::~SrsEdgeForwarder() 328 SrsEdgeForwarder::~SrsEdgeForwarder()
@@ -367,17 +374,54 @@ int SrsEdgeForwarder::start() @@ -367,17 +374,54 @@ int SrsEdgeForwarder::start()
367 return ret; 374 return ret;
368 } 375 }
369 376
370 - return ret; 377 + return pthread->start();
371 } 378 }
372 379
373 void SrsEdgeForwarder::stop() 380 void SrsEdgeForwarder::stop()
374 { 381 {
  382 + pthread->stop();
  383 +
375 close_underlayer_socket(); 384 close_underlayer_socket();
376 385
377 srs_freep(client); 386 srs_freep(client);
378 srs_freep(io); 387 srs_freep(io);
379 } 388 }
380 389
  390 +int SrsEdgeForwarder::cycle()
  391 +{
  392 + int ret = ERROR_SUCCESS;
  393 +
  394 + client->set_recv_timeout(SRS_EDGE_FORWARDER_TIMEOUT_US);
  395 +
  396 + SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
  397 +
  398 + while (pthread->can_loop()) {
  399 + // switch to other st-threads.
  400 + st_usleep(0);
  401 +
  402 + pithy_print.elapse();
  403 +
  404 + // pithy print
  405 + if (pithy_print.can_print()) {
  406 + srs_trace("-> time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  407 + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
  408 + }
  409 +
  410 + // read from client.
  411 + SrsCommonMessage* msg = NULL;
  412 + if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
  413 + srs_info("ignore forwarder recv origin server message failed. ret=%d", ret);
  414 + continue;
  415 + }
  416 + srs_verbose("edge loop recv message. ret=%d", ret);
  417 +
  418 + srs_assert(msg);
  419 + SrsAutoFree(SrsCommonMessage, msg, false);
  420 + }
  421 +
  422 + return ret;
  423 +}
  424 +
381 int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) 425 int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
382 { 426 {
383 int ret = ERROR_SUCCESS; 427 int ret = ERROR_SUCCESS;
@@ -104,7 +104,7 @@ private: @@ -104,7 +104,7 @@ private:
104 /** 104 /**
105 * edge used to forward stream to origin. 105 * edge used to forward stream to origin.
106 */ 106 */
107 -class SrsEdgeForwarder 107 +class SrsEdgeForwarder : public ISrsThreadHandler
108 { 108 {
109 private: 109 private:
110 int stream_id; 110 int stream_id;
@@ -112,6 +112,7 @@ private: @@ -112,6 +112,7 @@ private:
112 SrsSource* _source; 112 SrsSource* _source;
113 SrsPublishEdge* _edge; 113 SrsPublishEdge* _edge;
114 SrsRequest* _req; 114 SrsRequest* _req;
  115 + SrsThread* pthread;
115 st_netfd_t stfd; 116 st_netfd_t stfd;
116 ISrsProtocolReaderWriter* io; 117 ISrsProtocolReaderWriter* io;
117 SrsRtmpClient* client; 118 SrsRtmpClient* client;
@@ -123,6 +124,9 @@ public: @@ -123,6 +124,9 @@ public:
123 virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); 124 virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req);
124 virtual int start(); 125 virtual int start();
125 virtual void stop(); 126 virtual void stop();
  127 +// interface ISrsThreadHandler
  128 +public:
  129 + virtual int cycle();
126 public: 130 public:
127 virtual int proxy(SrsCommonMessage* msg); 131 virtual int proxy(SrsCommonMessage* msg);
128 private: 132 private:
@@ -134,6 +138,7 @@ private: @@ -134,6 +138,7 @@ private:
134 * play edge control service. 138 * play edge control service.
135 * downloading edge speed-up. 139 * downloading edge speed-up.
136 */ 140 */
  141 +// TODO: FIXME: support reload
137 class SrsPlayEdge 142 class SrsPlayEdge
138 { 143 {
139 private: 144 private:
@@ -164,6 +169,7 @@ public: @@ -164,6 +169,7 @@ public:
164 * publish edge control service. 169 * publish edge control service.
165 * uploading edge speed-up. 170 * uploading edge speed-up.
166 */ 171 */
  172 +// TODO: FIXME: support reload
167 class SrsPublishEdge 173 class SrsPublishEdge
168 { 174 {
169 private: 175 private:
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "77" 34 +#define VERSION_REVISION "78"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "srs" 37 #define RTMP_SIG_SRS_KEY "srs"