winlin

rtmp edge support 302 redirect.

@@ -70,8 +70,9 @@ SrsEdgeUpstream::~SrsEdgeUpstream() @@ -70,8 +70,9 @@ SrsEdgeUpstream::~SrsEdgeUpstream()
70 { 70 {
71 } 71 }
72 72
73 -SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream() 73 +SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r)
74 { 74 {
  75 + redirect = r;
75 sdk = new SrsSimpleRtmpClient(); 76 sdk = new SrsSimpleRtmpClient();
76 } 77 }
77 78
@@ -106,6 +107,17 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) @@ -106,6 +107,17 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
106 int port = SRS_CONSTS_RTMP_DEFAULT_PORT; 107 int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
107 srs_parse_hostport(server, server, port); 108 srs_parse_hostport(server, server, port);
108 109
  110 + // override the origin info by redirect.
  111 + if (!redirect.empty()) {
  112 + int _port;
  113 + string _schema, _vhost, _app, _param, _host;
  114 + srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _port, _param);
  115 +
  116 + srs_warn("RTMP redirect %s:%d to %s:%d", server.c_str(), port, _host.c_str(), _port);
  117 + server = _host;
  118 + port = _port;
  119 + }
  120 +
109 // support vhost tranform for edge, 121 // support vhost tranform for edge,
110 // @see https://github.com/ossrs/srs/issues/372 122 // @see https://github.com/ossrs/srs/issues/372
111 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); 123 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
@@ -160,7 +172,7 @@ SrsEdgeIngester::SrsEdgeIngester() @@ -160,7 +172,7 @@ SrsEdgeIngester::SrsEdgeIngester()
160 edge = NULL; 172 edge = NULL;
161 req = NULL; 173 req = NULL;
162 174
163 - upstream = new SrsEdgeRtmpUpstream(); 175 + upstream = new SrsEdgeRtmpUpstream(redirect);
164 lb = new SrsLbRoundRobin(); 176 lb = new SrsLbRoundRobin();
165 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); 177 pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
166 } 178 }
@@ -215,22 +227,39 @@ int SrsEdgeIngester::cycle() @@ -215,22 +227,39 @@ int SrsEdgeIngester::cycle()
215 { 227 {
216 int ret = ERROR_SUCCESS; 228 int ret = ERROR_SUCCESS;
217 229
218 - if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) {  
219 - return ret;  
220 - }  
221 -  
222 - if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) {  
223 - return ret;  
224 - }  
225 -  
226 - if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) {  
227 - return ret;  
228 - }  
229 -  
230 - ret = ingest();  
231 - if (srs_is_client_gracefully_close(ret)) {  
232 - srs_warn("origin disconnected, retry. ret=%d", ret);  
233 - ret = ERROR_SUCCESS; 230 + for (;;) {
  231 + srs_freep(upstream);
  232 + upstream = new SrsEdgeRtmpUpstream(redirect);
  233 +
  234 + // we only use the redict once.
  235 + // reset the redirect to empty, for maybe the origin changed.
  236 + redirect = "";
  237 +
  238 + if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) {
  239 + return ret;
  240 + }
  241 +
  242 + if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) {
  243 + return ret;
  244 + }
  245 +
  246 + if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) {
  247 + return ret;
  248 + }
  249 +
  250 + ret = ingest();
  251 +
  252 + // retry for rtmp 302 immediately.
  253 + if (ret == ERROR_CONTROL_REDIRECT) {
  254 + ret = ERROR_SUCCESS;
  255 + continue;
  256 + }
  257 +
  258 + if (srs_is_client_gracefully_close(ret)) {
  259 + srs_warn("origin disconnected, retry. ret=%d", ret);
  260 + ret = ERROR_SUCCESS;
  261 + }
  262 + break;
234 } 263 }
235 264
236 return ret; 265 return ret;
@@ -327,6 +356,47 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) @@ -327,6 +356,47 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
327 return ret; 356 return ret;
328 } 357 }
329 358
  359 + // call messages, for example, reject, redirect.
  360 + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
  361 + SrsPacket* pkt = NULL;
  362 + if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
  363 + srs_error("decode call message failed. ret=%d", ret);
  364 + return ret;
  365 + }
  366 + SrsAutoFree(SrsPacket, pkt);
  367 +
  368 + // RTMP 302 redirect
  369 + if (dynamic_cast<SrsCallPacket*>(pkt)) {
  370 + SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
  371 + if (!call->arguments->is_object()) {
  372 + return ret;
  373 + }
  374 +
  375 + SrsAmf0Any* prop = NULL;
  376 + SrsAmf0Object* evt = call->arguments->to_object();
  377 +
  378 + if ((prop = evt->ensure_property_string("level")) == NULL) {
  379 + return ret;
  380 + } else if (prop->to_str() != StatusLevelError) {
  381 + return ret;
  382 + }
  383 +
  384 + if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {
  385 + return ret;
  386 + }
  387 + SrsAmf0Object* ex = prop->to_object();
  388 +
  389 + if ((prop = ex->ensure_property_string("redirect")) == NULL) {
  390 + return ret;
  391 + }
  392 + redirect = prop->to_str();
  393 +
  394 + ret = ERROR_CONTROL_REDIRECT;
  395 + srs_info("RTMP 302 redirect to %s, ret=%d", redirect.c_str(), ret);
  396 + return ret;
  397 + }
  398 + }
  399 +
330 return ret; 400 return ret;
331 } 401 }
332 402
@@ -97,9 +97,13 @@ public: @@ -97,9 +97,13 @@ public:
97 class SrsEdgeRtmpUpstream : public SrsEdgeUpstream 97 class SrsEdgeRtmpUpstream : public SrsEdgeUpstream
98 { 98 {
99 private: 99 private:
  100 + // for RTMP 302, if not empty,
  101 + // use this <ip[:port]> as upstream.
  102 + std::string redirect;
100 SrsSimpleRtmpClient* sdk; 103 SrsSimpleRtmpClient* sdk;
101 public: 104 public:
102 - SrsEdgeRtmpUpstream(); 105 + // @param rediect, override the server. ignore if empty.
  106 + SrsEdgeRtmpUpstream(std::string r);
103 virtual ~SrsEdgeRtmpUpstream(); 107 virtual ~SrsEdgeRtmpUpstream();
104 public: 108 public:
105 virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb); 109 virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb);
@@ -123,6 +127,8 @@ private: @@ -123,6 +127,8 @@ private:
123 SrsReusableThread2* pthread; 127 SrsReusableThread2* pthread;
124 SrsLbRoundRobin* lb; 128 SrsLbRoundRobin* lb;
125 SrsEdgeUpstream* upstream; 129 SrsEdgeUpstream* upstream;
  130 + // for RTMP 302 redirect.
  131 + std::string redirect;
126 public: 132 public:
127 SrsEdgeIngester(); 133 SrsEdgeIngester();
128 virtual ~SrsEdgeIngester(); 134 virtual ~SrsEdgeIngester();