winlin

refine edge ingester, use upstream adapter.

@@ -62,13 +62,100 @@ using namespace std; @@ -62,13 +62,100 @@ using namespace std;
62 // when edge error, wait for quit 62 // when edge error, wait for quit
63 #define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL) 63 #define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
64 64
  65 +SrsEdgeUpstream::SrsEdgeUpstream()
  66 +{
  67 +}
  68 +
  69 +SrsEdgeUpstream::~SrsEdgeUpstream()
  70 +{
  71 +}
  72 +
  73 +SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream()
  74 +{
  75 + sdk = new SrsSimpleRtmpClient();
  76 +}
  77 +
  78 +SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream()
  79 +{
  80 + close();
  81 +
  82 + srs_freep(sdk);
  83 +}
  84 +
  85 +int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
  86 +{
  87 + int ret = ERROR_SUCCESS;
  88 +
  89 + SrsRequest* req = r;
  90 +
  91 + std::string url;
  92 + if (true) {
  93 + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
  94 +
  95 + // @see https://github.com/ossrs/srs/issues/79
  96 + // when origin is error, for instance, server is shutdown,
  97 + // then user remove the vhost then reload, the conf is empty.
  98 + if (!conf) {
  99 + ret = ERROR_EDGE_VHOST_REMOVED;
  100 + srs_warn("vhost %s removed. ret=%d", req->vhost.c_str(), ret);
  101 + return ret;
  102 + }
  103 +
  104 + // select the origin.
  105 + std::string server = lb->select(conf->args);
  106 + int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
  107 + srs_parse_hostport(server, server, port);
  108 +
  109 + // support vhost tranform for edge,
  110 + // @see https://github.com/ossrs/srs/issues/372
  111 + std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
  112 + vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
  113 +
  114 + url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);
  115 + }
  116 +
  117 + int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;
  118 + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
  119 + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {
  120 + srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
  121 + return ret;
  122 + }
  123 +
  124 + if ((ret = sdk->play()) != ERROR_SUCCESS) {
  125 + srs_error("edge pull %s stream failed. ret=%d", url.c_str(), ret);
  126 + return ret;
  127 + }
  128 +
  129 + return ret;
  130 +}
  131 +
  132 +int SrsEdgeRtmpUpstream::recv_message(SrsCommonMessage** pmsg)
  133 +{
  134 + return sdk->recv_message(pmsg);
  135 +}
  136 +
  137 +int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
  138 +{
  139 + return sdk->decode_message(msg, ppacket);
  140 +}
  141 +
  142 +void SrsEdgeRtmpUpstream::close()
  143 +{
  144 + sdk->close();
  145 +}
  146 +
  147 +void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age)
  148 +{
  149 + sdk->kbps_sample(label, age);
  150 +}
  151 +
65 SrsEdgeIngester::SrsEdgeIngester() 152 SrsEdgeIngester::SrsEdgeIngester()
66 { 153 {
67 source = NULL; 154 source = NULL;
68 edge = NULL; 155 edge = NULL;
69 req = NULL; 156 req = NULL;
70 157
71 - sdk = new SrsSimpleRtmpClient(); 158 + upstream = new SrsEdgeRtmpUpstream();
72 lb = new SrsLbRoundRobin(); 159 lb = new SrsLbRoundRobin();
73 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); 160 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
74 } 161 }
@@ -77,7 +164,7 @@ SrsEdgeIngester::~SrsEdgeIngester() @@ -77,7 +164,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
77 { 164 {
78 stop(); 165 stop();
79 166
80 - srs_freep(sdk); 167 + srs_freep(upstream);
81 srs_freep(lb); 168 srs_freep(lb);
82 srs_freep(pthread); 169 srs_freep(pthread);
83 } 170 }
@@ -108,7 +195,7 @@ int SrsEdgeIngester::start() @@ -108,7 +195,7 @@ int SrsEdgeIngester::start()
108 void SrsEdgeIngester::stop() 195 void SrsEdgeIngester::stop()
109 { 196 {
110 pthread->stop(); 197 pthread->stop();
111 - sdk->close(); 198 + upstream->close();
112 199
113 // notice to unpublish. 200 // notice to unpublish.
114 source->on_unpublish(); 201 source->on_unpublish();
@@ -122,44 +209,12 @@ string SrsEdgeIngester::get_curr_origin() @@ -122,44 +209,12 @@ string SrsEdgeIngester::get_curr_origin()
122 int SrsEdgeIngester::cycle() 209 int SrsEdgeIngester::cycle()
123 { 210 {
124 int ret = ERROR_SUCCESS; 211 int ret = ERROR_SUCCESS;
125 -  
126 - source->on_source_id_changed(_srs_context->get_id());  
127 -  
128 - std::string url;  
129 - if (true) {  
130 - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);  
131 -  
132 - // @see https://github.com/ossrs/srs/issues/79  
133 - // when origin is error, for instance, server is shutdown,  
134 - // then user remove the vhost then reload, the conf is empty.  
135 - if (!conf) {  
136 - ret = ERROR_EDGE_VHOST_REMOVED;  
137 - srs_warn("vhost %s removed. ret=%d", req->vhost.c_str(), ret);  
138 - return ret;  
139 - }  
140 -  
141 - // select the origin.  
142 - std::string server = lb->select(conf->args);  
143 - int port = SRS_CONSTS_RTMP_DEFAULT_PORT;  
144 - srs_parse_hostport(server, server, port);  
145 -  
146 - // support vhost tranform for edge,  
147 - // @see https://github.com/ossrs/srs/issues/372  
148 - std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);  
149 - vhost = srs_string_replace(vhost, "[vhost]", req->vhost);  
150 -  
151 - url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);  
152 - }  
153 212
154 - int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;  
155 - int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;  
156 - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {  
157 - srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); 213 + if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) {
158 return ret; 214 return ret;
159 } 215 }
160 216
161 - if ((ret = sdk->play()) != ERROR_SUCCESS) {  
162 - srs_error("edge pull %s stream failed. ret=%d", url.c_str(), ret); 217 + if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) {
163 return ret; 218 return ret;
164 } 219 }
165 220
@@ -188,12 +243,12 @@ int SrsEdgeIngester::ingest() @@ -188,12 +243,12 @@ int SrsEdgeIngester::ingest()
188 243
189 // pithy print 244 // pithy print
190 if (pprint->can_print()) { 245 if (pprint->can_print()) {
191 - sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age()); 246 + upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
192 } 247 }
193 248
194 // read from client. 249 // read from client.
195 SrsCommonMessage* msg = NULL; 250 SrsCommonMessage* msg = NULL;
196 - if ((ret = sdk->recv_message(&msg)) != ERROR_SUCCESS) { 251 + if ((ret = upstream->recv_message(&msg)) != ERROR_SUCCESS) {
197 if (!srs_is_client_gracefully_close(ret)) { 252 if (!srs_is_client_gracefully_close(ret)) {
198 srs_error("pull origin server message failed. ret=%d", ret); 253 srs_error("pull origin server message failed. ret=%d", ret);
199 } 254 }
@@ -244,7 +299,7 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) @@ -244,7 +299,7 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
244 // process onMetaData 299 // process onMetaData
245 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { 300 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
246 SrsPacket* pkt = NULL; 301 SrsPacket* pkt = NULL;
247 - if ((ret = sdk->decode_message(msg, &pkt)) != ERROR_SUCCESS) { 302 + if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
248 srs_error("decode onMetaData message failed. ret=%d", ret); 303 srs_error("decode onMetaData message failed. ret=%d", ret);
249 return ret; 304 return ret;
250 } 305 }
@@ -49,6 +49,7 @@ class SrsKbps; @@ -49,6 +49,7 @@ class SrsKbps;
49 class SrsLbRoundRobin; 49 class SrsLbRoundRobin;
50 class SrsTcpClient; 50 class SrsTcpClient;
51 class SrsSimpleRtmpClient; 51 class SrsSimpleRtmpClient;
  52 +class SrsPacket;
52 53
53 /** 54 /**
54 * the state of edge, auto machine 55 * the state of edge, auto machine
@@ -76,6 +77,37 @@ enum SrsEdgeUserState @@ -76,6 +77,37 @@ enum SrsEdgeUserState
76 }; 77 };
77 78
78 /** 79 /**
  80 + * the upstream of edge, can be rtmp or http.
  81 + */
  82 +class SrsEdgeUpstream
  83 +{
  84 +public:
  85 + SrsEdgeUpstream();
  86 + virtual ~SrsEdgeUpstream();
  87 +public:
  88 + virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb) = 0;
  89 + virtual int recv_message(SrsCommonMessage** pmsg) = 0;
  90 + virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
  91 + virtual void close() = 0;
  92 + virtual void kbps_sample(const char* label, int64_t age) = 0;
  93 +};
  94 +
  95 +class SrsEdgeRtmpUpstream : public SrsEdgeUpstream
  96 +{
  97 +private:
  98 + SrsSimpleRtmpClient* sdk;
  99 +public:
  100 + SrsEdgeRtmpUpstream();
  101 + virtual ~SrsEdgeRtmpUpstream();
  102 +public:
  103 + virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb);
  104 + virtual int recv_message(SrsCommonMessage** pmsg);
  105 + virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
  106 + virtual void close();
  107 + virtual void kbps_sample(const char* label, int64_t age);
  108 +};
  109 +
  110 +/**
79 * edge used to ingest stream from origin. 111 * edge used to ingest stream from origin.
80 */ 112 */
81 class SrsEdgeIngester : public ISrsReusableThread2Handler 113 class SrsEdgeIngester : public ISrsReusableThread2Handler
@@ -85,8 +117,8 @@ private: @@ -85,8 +117,8 @@ private:
85 SrsPlayEdge* edge; 117 SrsPlayEdge* edge;
86 SrsRequest* req; 118 SrsRequest* req;
87 SrsReusableThread2* pthread; 119 SrsReusableThread2* pthread;
88 - SrsSimpleRtmpClient* sdk;  
89 SrsLbRoundRobin* lb; 120 SrsLbRoundRobin* lb;
  121 + SrsEdgeUpstream* upstream;
90 public: 122 public:
91 SrsEdgeIngester(); 123 SrsEdgeIngester();
92 virtual ~SrsEdgeIngester(); 124 virtual ~SrsEdgeIngester();
@@ -2054,7 +2054,9 @@ int SrsSource::on_publish() @@ -2054,7 +2054,9 @@ int SrsSource::on_publish()
2054 2054
2055 // whatever, the publish thread is the source or edge source, 2055 // whatever, the publish thread is the source or edge source,
2056 // save its id to srouce id. 2056 // save its id to srouce id.
2057 - on_source_id_changed(_srs_context->get_id()); 2057 + if ((ret = on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) {
  2058 + return ret;
  2059 + }
2058 2060
2059 // reset the mix queue. 2061 // reset the mix queue.
2060 mix_queue->clear(); 2062 mix_queue->clear();