winlin

refine the http message, set the connection if required.

@@ -37,6 +37,12 @@ using namespace std; @@ -37,6 +37,12 @@ using namespace std;
37 #include <srs_app_http_conn.hpp> 37 #include <srs_app_http_conn.hpp>
38 #include <srs_core_autofree.hpp> 38 #include <srs_core_autofree.hpp>
39 #include <srs_kernel_flv.hpp> 39 #include <srs_kernel_flv.hpp>
  40 +#include <srs_rtmp_sdk.hpp>
  41 +#include <srs_rtmp_utility.hpp>
  42 +#include <srs_app_st_socket.hpp>
  43 +#include <srs_app_utility.hpp>
  44 +#include <srs_rtmp_amf0.hpp>
  45 +#include <srs_kernel_utility.hpp>
40 46
41 #define SRS_HTTP_FLV_STREAM_BUFFER 4096 47 #define SRS_HTTP_FLV_STREAM_BUFFER 4096
42 48
@@ -85,9 +91,53 @@ void SrsAppCasterFlv::remove(SrsConnection* c) @@ -85,9 +91,53 @@ void SrsAppCasterFlv::remove(SrsConnection* c)
85 91
86 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) 92 int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
87 { 93 {
  94 + SrsDynamicHttpConn* conn = dynamic_cast<SrsDynamicHttpConn*>(r->connection());
  95 + srs_assert(conn);
  96 +
  97 + std::string app = srs_path_dirname(r->path());
  98 + std::string stream = srs_path_basename(r->path());
  99 +
  100 + std::string o = output;
  101 + if (!app.empty() && app != "/") {
  102 + o = srs_string_replace(o, "[app]", app);
  103 + }
  104 + o = srs_string_replace(o, "[stream]", stream);
  105 +
  106 + // remove the extension.
  107 + if (srs_string_ends_with(o, ".flv")) {
  108 + o = o.substr(0, o.length() - 4);
  109 + }
  110 +
  111 + return conn->proxy(w, r, o);
  112 +}
  113 +
  114 +SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
  115 + : SrsHttpConn(cm, fd, m)
  116 +{
  117 +
  118 + req = NULL;
  119 + io = NULL;
  120 + client = NULL;
  121 + stfd = NULL;
  122 + stream_id = 0;
  123 +}
  124 +
  125 +SrsDynamicHttpConn::~SrsDynamicHttpConn()
  126 +{
  127 +}
  128 +
  129 +int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg)
  130 +{
  131 + int ret = ERROR_SUCCESS;
  132 + return ret;
  133 +}
  134 +
  135 +int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o)
  136 +{
88 int ret = ERROR_SUCCESS; 137 int ret = ERROR_SUCCESS;
89 138
90 - srs_info("flv: handle request at %s", r->path().c_str()); 139 + output = o;
  140 + srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str());
91 141
92 char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER]; 142 char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER];
93 SrsAutoFree(char, buffer); 143 SrsAutoFree(char, buffer);
@@ -111,19 +161,126 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) @@ -111,19 +161,126 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
111 return ret; 161 return ret;
112 } 162 }
113 163
114 -SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)  
115 - : SrsHttpConn(cm, fd, m) 164 +int SrsDynamicHttpConn::connect()
116 { 165 {
  166 + int ret = ERROR_SUCCESS;
  167 +
  168 + // when ok, ignore.
  169 + // TODO: FIXME: should reconnect when disconnected.
  170 + if (io || client) {
  171 + return ret;
  172 + }
  173 +
  174 + // parse uri
  175 + if (!req) {
  176 + req = new SrsRequest();
  177 +
  178 + size_t pos = string::npos;
  179 + string uri = req->tcUrl = output;
  180 +
  181 + // tcUrl, stream
  182 + if ((pos = uri.rfind("/")) != string::npos) {
  183 + req->stream = uri.substr(pos + 1);
  184 + req->tcUrl = uri = uri.substr(0, pos);
  185 + }
  186 +
  187 + srs_discovery_tc_url(req->tcUrl,
  188 + req->schema, req->host, req->vhost, req->app, req->port,
  189 + req->param);
  190 + }
  191 +
  192 + // connect host.
  193 + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) {
  194 + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret);
  195 + return ret;
  196 + }
  197 + io = new SrsStSocket(stfd);
  198 + client = new SrsRtmpClient(io);
  199 +
  200 + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  201 + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  202 +
  203 + // connect to vhost/app
  204 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  205 + srs_error("mpegts: handshake with server failed. ret=%d", ret);
  206 + return ret;
  207 + }
  208 + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {
  209 + srs_error("mpegts: connect with server failed. ret=%d", ret);
  210 + return ret;
  211 + }
  212 + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
  213 + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
  214 + return ret;
  215 + }
  216 +
  217 + // publish.
  218 + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
  219 + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",
  220 + req->stream.c_str(), stream_id, ret);
  221 + return ret;
  222 + }
  223 +
  224 + return ret;
117 } 225 }
118 226
119 -SrsDynamicHttpConn::~SrsDynamicHttpConn() 227 +// TODO: FIXME: refine the connect_app.
  228 +int SrsDynamicHttpConn::connect_app(string ep_server, string ep_port)
120 { 229 {
  230 + int ret = ERROR_SUCCESS;
  231 +
  232 + // args of request takes the srs info.
  233 + if (req->args == NULL) {
  234 + req->args = SrsAmf0Any::object();
  235 + }
  236 +
  237 + // notify server the edge identity,
  238 + // @see https://github.com/simple-rtmp-server/srs/issues/147
  239 + SrsAmf0Object* data = req->args;
  240 + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
  241 + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  242 + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
  243 + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
  244 + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
  245 + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
  246 + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
  247 + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
  248 + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
  249 + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
  250 + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
  251 + // for edge to directly get the id of client.
  252 + data->set("srs_pid", SrsAmf0Any::number(getpid()));
  253 + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
  254 +
  255 + // local ip of edge
  256 + std::vector<std::string> ips = srs_get_local_ipv4_ips();
  257 + assert(_srs_config->get_stats_network() < (int)ips.size());
  258 + std::string local_ip = ips[_srs_config->get_stats_network()];
  259 + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
  260 +
  261 + // generate the tcUrl
  262 + std::string param = "";
  263 + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);
  264 +
  265 + // upnode server identity will show in the connect_app of client.
  266 + // @see https://github.com/simple-rtmp-server/srs/issues/160
  267 + // the debug_srs_upnode is config in vhost and default to true.
  268 + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
  269 + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {
  270 + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
  271 + tc_url.c_str(), debug_srs_upnode, ret);
  272 + return ret;
  273 + }
  274 +
  275 + return ret;
121 } 276 }
122 277
123 -int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) 278 +void SrsDynamicHttpConn::close()
124 { 279 {
125 - int ret = ERROR_SUCCESS;  
126 - return ret; 280 + srs_freep(client);
  281 + srs_freep(io);
  282 + srs_freep(req);
  283 + srs_close_stfd(stfd);
127 } 284 }
128 285
129 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) 286 SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
@@ -146,7 +303,7 @@ void SrsHttpFileReader::close() @@ -146,7 +303,7 @@ void SrsHttpFileReader::close()
146 303
147 bool SrsHttpFileReader::is_open() 304 bool SrsHttpFileReader::is_open()
148 { 305 {
149 - return false; 306 + return true;
150 } 307 }
151 308
152 int64_t SrsHttpFileReader::tellg() 309 int64_t SrsHttpFileReader::tellg()
@@ -38,6 +38,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -38,6 +38,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
38 class SrsConfDirective; 38 class SrsConfDirective;
39 class SrsHttpServeMux; 39 class SrsHttpServeMux;
40 class SrsHttpConn; 40 class SrsHttpConn;
  41 +class SrsRtmpClient;
  42 +class SrsStSocket;
  43 +class SrsRequest;
41 44
42 #include <srs_app_st.hpp> 45 #include <srs_app_st.hpp>
43 #include <srs_app_listener.hpp> 46 #include <srs_app_listener.hpp>
@@ -77,11 +80,28 @@ public: @@ -77,11 +80,28 @@ public:
77 */ 80 */
78 class SrsDynamicHttpConn : public SrsHttpConn 81 class SrsDynamicHttpConn : public SrsHttpConn
79 { 82 {
  83 +private:
  84 + std::string output;
  85 +private:
  86 + SrsRequest* req;
  87 + st_netfd_t stfd;
  88 + SrsStSocket* io;
  89 + SrsRtmpClient* client;
  90 + int stream_id;
80 public: 91 public:
81 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); 92 SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
82 virtual ~SrsDynamicHttpConn(); 93 virtual ~SrsDynamicHttpConn();
83 public: 94 public:
84 virtual int on_got_http_message(SrsHttpMessage* msg); 95 virtual int on_got_http_message(SrsHttpMessage* msg);
  96 +public:
  97 + virtual int proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o);
  98 +private:
  99 + // connect to rtmp output url.
  100 + // @remark ignore when not connected, reconnect when disconnected.
  101 + virtual int connect();
  102 + virtual int connect_app(std::string ep_server, std::string ep_port);
  103 + // close the connected io and rtmp to ready to be re-connect.
  104 + virtual void close();
85 }; 105 };
86 106
87 /** 107 /**
@@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) @@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
1078 return ret; 1078 return ret;
1079 } 1079 }
1080 1080
1081 -SrsHttpMessage::SrsHttpMessage(SrsStSocket* io) 1081 +SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c)
1082 { 1082 {
  1083 + conn = c;
1083 chunked = false; 1084 chunked = false;
1084 _uri = new SrsHttpUri(); 1085 _uri = new SrsHttpUri();
1085 _body = new SrsHttpResponseReader(this, io); 1086 _body = new SrsHttpResponseReader(this, io);
@@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer() @@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer()
1165 return _http_ts_send_buffer; 1166 return _http_ts_send_buffer;
1166 } 1167 }
1167 1168
  1169 +SrsConnection* SrsHttpMessage::connection()
  1170 +{
  1171 + return conn;
  1172 +}
  1173 +
1168 u_int8_t SrsHttpMessage::method() 1174 u_int8_t SrsHttpMessage::method()
1169 { 1175 {
1170 return (u_int8_t)_header.method; 1176 return (u_int8_t)_header.method;
@@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri() @@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri()
1230 { 1236 {
1231 std::string uri = _uri->get_schema(); 1237 std::string uri = _uri->get_schema();
1232 if (uri.empty()) { 1238 if (uri.empty()) {
1233 - uri += "http://"; 1239 + uri += "http";
1234 } 1240 }
  1241 + uri += "://";
1235 1242
1236 uri += host(); 1243 uri += host();
1237 uri += path(); 1244 uri += path();
@@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type) @@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type)
1393 return ret; 1400 return ret;
1394 } 1401 }
1395 1402
1396 -int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) 1403 +int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg)
1397 { 1404 {
1398 *ppmsg = NULL; 1405 *ppmsg = NULL;
1399 1406
@@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) @@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg)
1418 } 1425 }
1419 1426
1420 // create msg 1427 // create msg
1421 - SrsHttpMessage* msg = new SrsHttpMessage(skt); 1428 + SrsHttpMessage* msg = new SrsHttpMessage(skt, conn);
1422 1429
1423 // initalize http msg, parse url. 1430 // initalize http msg, parse url.
1424 if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) { 1431 if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) {
@@ -50,6 +50,7 @@ class SrsSimpleBuffer; @@ -50,6 +50,7 @@ class SrsSimpleBuffer;
50 class SrsHttpMuxEntry; 50 class SrsHttpMuxEntry;
51 class ISrsHttpResponseWriter; 51 class ISrsHttpResponseWriter;
52 class SrsFastBuffer; 52 class SrsFastBuffer;
  53 +class SrsConnection;
53 54
54 // http specification 55 // http specification
55 // CR = <US-ASCII CR, carriage return (13)> 56 // CR = <US-ASCII CR, carriage return (13)>
@@ -506,8 +507,10 @@ private: @@ -506,8 +507,10 @@ private:
506 std::vector<SrsHttpHeaderField> _headers; 507 std::vector<SrsHttpHeaderField> _headers;
507 // the query map 508 // the query map
508 std::map<std::string, std::string> _query; 509 std::map<std::string, std::string> _query;
  510 + // the transport connection, can be NULL.
  511 + SrsConnection* conn;
509 public: 512 public:
510 - SrsHttpMessage(SrsStSocket* io); 513 + SrsHttpMessage(SrsStSocket* io, SrsConnection* c);
511 virtual ~SrsHttpMessage(); 514 virtual ~SrsHttpMessage();
512 public: 515 public:
513 /** 516 /**
@@ -518,6 +521,7 @@ public: @@ -518,6 +521,7 @@ public:
518 ); 521 );
519 public: 522 public:
520 virtual char* http_ts_send_buffer(); 523 virtual char* http_ts_send_buffer();
  524 + virtual SrsConnection* connection();
521 public: 525 public:
522 virtual u_int8_t method(); 526 virtual u_int8_t method();
523 virtual u_int16_t status_code(); 527 virtual u_int16_t status_code();
@@ -617,7 +621,7 @@ public: @@ -617,7 +621,7 @@ public:
617 * or error and *ppmsg must be NULL. 621 * or error and *ppmsg must be NULL.
618 * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). 622 * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
619 */ 623 */
620 - virtual int parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg); 624 + virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg);
621 private: 625 private:
622 /** 626 /**
623 * parse the HTTP message to member field: msg. 627 * parse the HTTP message to member field: msg.
@@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle() @@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle()
528 SrsHttpMessage* req = NULL; 528 SrsHttpMessage* req = NULL;
529 529
530 // get a http message 530 // get a http message
531 - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { 531 + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
532 return ret; 532 return ret;
533 } 533 }
534 534
@@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg) @@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg)
105 } 105 }
106 106
107 SrsHttpMessage* msg = NULL; 107 SrsHttpMessage* msg = NULL;
108 - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { 108 + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) {
109 srs_error("parse http post response failed. ret=%d", ret); 109 srs_error("parse http post response failed. ret=%d", ret);
110 return ret; 110 return ret;
111 } 111 }
@@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg) @@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg)
151 } 151 }
152 152
153 SrsHttpMessage* msg = NULL; 153 SrsHttpMessage* msg = NULL;
154 - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { 154 + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) {
155 srs_error("parse http post response failed. ret=%d", ret); 155 srs_error("parse http post response failed. ret=%d", ret);
156 return ret; 156 return ret;
157 } 157 }
@@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle() @@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle()
1388 SrsHttpMessage* req = NULL; 1388 SrsHttpMessage* req = NULL;
1389 1389
1390 // get a http message 1390 // get a http message
1391 - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { 1391 + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
1392 return ret; 1392 return ret;
1393 } 1393 }
1394 1394