winlin

support edge token traverse, fix #104. 0.9.129

@@ -240,6 +240,7 @@ Supported operating systems and hardware: @@ -240,6 +240,7 @@ Supported operating systems and hardware:
240 * 2013-10-17, Created.<br/> 240 * 2013-10-17, Created.<br/>
241 241
242 ## History 242 ## History
  243 +* v1.0, 2014-06-21, support edge token traverse, fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
243 * v1.0, 2014-06-19, add connections count to api summaries. 0.9.127 244 * v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
244 * v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126 245 * v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126
245 * v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125 246 * v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125
@@ -124,8 +124,15 @@ vhost same.edge.srs.com { @@ -124,8 +124,15 @@ vhost same.edge.srs.com {
124 # @remark user can specifies multiple origin for error backup, by space, 124 # @remark user can specifies multiple origin for error backup, by space,
125 # for example, 192.168.1.100:1935 192.168.1.101:1935 192.168.1.102:1935 125 # for example, 192.168.1.100:1935 192.168.1.101:1935 192.168.1.102:1935
126 origin 127.0.0.1:1935 localhost:1935; 126 origin 127.0.0.1:1935 localhost:1935;
  127 + # for edge, whether open the token traverse mode,
  128 + # if token traverse on, all connections of edge will forward to origin to check(auth),
  129 + # it's very important for the edge to do the token auth.
  130 + # the better way is use http callback to do the token auth by the edge,
  131 + # but if user prefer origin check(auth), the token_traverse if better solution.
  132 + # default: off
  133 + token_traverse off;
127 } 134 }
128 -# vhost for edge, chnage vhost. 135 +# vhost for edge, change vhost.
129 vhost change.edge.srs.com { 136 vhost change.edge.srs.com {
130 mode remote; 137 mode remote;
131 # TODO: FIXME: support extra params. 138 # TODO: FIXME: support extra params.
@@ -509,7 +509,8 @@ int SrsConfig::reload() @@ -509,7 +509,8 @@ int SrsConfig::reload()
509 // 509 //
510 // always support reload without additional code: 510 // always support reload without additional code:
511 // chunk_size, ff_log_dir, max_connections, 511 // chunk_size, ff_log_dir, max_connections,
512 - // bandcheck, http_hooks, heartbeat 512 + // bandcheck, http_hooks, heartbeat,
  513 + // token_traverse
513 514
514 // merge config: listen 515 // merge config: listen
515 if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) { 516 if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) {
@@ -1987,6 +1988,22 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost) @@ -1987,6 +1988,22 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
1987 return conf->get("origin"); 1988 return conf->get("origin");
1988 } 1989 }
1989 1990
  1991 +bool SrsConfig::get_vhost_edge_token_traverse(std::string vhost)
  1992 +{
  1993 + SrsConfDirective* conf = get_vhost(vhost);
  1994 +
  1995 + if (!conf) {
  1996 + return false;
  1997 + }
  1998 +
  1999 + conf = conf->get("token_traverse");
  2000 + if (!conf || conf->arg0() != "on") {
  2001 + return false;
  2002 + }
  2003 +
  2004 + return true;
  2005 +}
  2006 +
1990 SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope) 2007 SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope)
1991 { 2008 {
1992 SrsConfDirective* conf = get_vhost(vhost); 2009 SrsConfDirective* conf = get_vhost(vhost);
@@ -214,6 +214,7 @@ public: @@ -214,6 +214,7 @@ public:
214 virtual bool get_vhost_is_edge(std::string vhost); 214 virtual bool get_vhost_is_edge(std::string vhost);
215 virtual bool get_vhost_is_edge(SrsConfDirective* vhost); 215 virtual bool get_vhost_is_edge(SrsConfDirective* vhost);
216 virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost); 216 virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost);
  217 + virtual bool get_vhost_edge_token_traverse(std::string vhost);
217 // vhost transcode section 218 // vhost transcode section
218 public: 219 public:
219 virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); 220 virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope);
@@ -24,6 +24,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,6 +24,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_app_rtmp_conn.hpp> 24 #include <srs_app_rtmp_conn.hpp>
25 25
26 #include <stdlib.h> 26 #include <stdlib.h>
  27 +#include <sys/socket.h>
  28 +#include <netinet/in.h>
  29 +#include <arpa/inet.h>
27 30
28 using namespace std; 31 using namespace std;
29 32
@@ -45,6 +48,8 @@ using namespace std; @@ -45,6 +48,8 @@ using namespace std;
45 #include <srs_app_edge.hpp> 48 #include <srs_app_edge.hpp>
46 #include <srs_app_kbps.hpp> 49 #include <srs_app_kbps.hpp>
47 #include <srs_app_utility.hpp> 50 #include <srs_app_utility.hpp>
  51 +#include <srs_protocol_utility.hpp>
  52 +#include <srs_kernel_utility.hpp>
48 53
49 // when stream is busy, for example, streaming is already 54 // when stream is busy, for example, streaming is already
50 // publishing, when a new client to request to publish, 55 // publishing, when a new client to request to publish,
@@ -63,6 +68,9 @@ using namespace std; @@ -63,6 +68,9 @@ using namespace std;
63 // if timeout, close the connection. 68 // if timeout, close the connection.
64 #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL) 69 #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
65 70
  71 +// when edge timeout, retry next.
  72 +#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
  73 +
66 SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) 74 SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
67 : SrsConnection(srs_server, client_stfd) 75 : SrsConnection(srs_server, client_stfd)
68 { 76 {
@@ -290,6 +298,16 @@ int SrsRtmpConn::stream_service_cycle() @@ -290,6 +298,16 @@ int SrsRtmpConn::stream_service_cycle()
290 } 298 }
291 srs_info("set chunk_size=%d success", chunk_size); 299 srs_info("set chunk_size=%d success", chunk_size);
292 300
  301 + // do token traverse before serve it.
  302 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
  303 + bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
  304 + if (vhost_is_edge && edge_traverse) {
  305 + if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
  306 + srs_warn("token auth failed, ret=%d", ret);
  307 + return ret;
  308 + }
  309 + }
  310 +
293 // find a source to serve. 311 // find a source to serve.
294 SrsSource* source = NULL; 312 SrsSource* source = NULL;
295 if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) { 313 if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) {
@@ -297,8 +315,6 @@ int SrsRtmpConn::stream_service_cycle() @@ -297,8 +315,6 @@ int SrsRtmpConn::stream_service_cycle()
297 } 315 }
298 srs_assert(source != NULL); 316 srs_assert(source != NULL);
299 317
300 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
301 -  
302 // check publish available 318 // check publish available
303 // for edge, never check it, for edge use proxy mode. 319 // for edge, never check it, for edge use proxy mode.
304 if (!vhost_is_edge) { 320 if (!vhost_is_edge) {
@@ -846,6 +862,122 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg @@ -846,6 +862,122 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg
846 return ret; 862 return ret;
847 } 863 }
848 864
  865 +int SrsRtmpConn::check_edge_token_traverse_auth()
  866 +{
  867 + int ret = ERROR_SUCCESS;
  868 +
  869 + srs_assert(req);
  870 +
  871 + st_netfd_t stsock = NULL;
  872 + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
  873 + for (int i = 0; i < (int)conf->args.size(); i++) {
  874 + if ((ret = connect_server(i, &stsock)) == ERROR_SUCCESS) {
  875 + break;
  876 + }
  877 + }
  878 + if (ret != ERROR_SUCCESS) {
  879 + srs_warn("token traverse connect failed. ret=%d", ret);
  880 + return ret;
  881 + }
  882 +
  883 + srs_assert(stsock);
  884 + SrsSocket* io = new SrsSocket(stsock);
  885 + SrsRtmpClient* client = new SrsRtmpClient(io);
  886 +
  887 + ret = do_token_traverse_auth(io, client);
  888 +
  889 + srs_freep(client);
  890 + srs_freep(io);
  891 + srs_close_stfd(stsock);
  892 +
  893 + return ret;
  894 +}
  895 +
  896 +// TODO: FIXME: refine the connect server serials functions.
  897 +int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
  898 +{
  899 + int ret = ERROR_SUCCESS;
  900 +
  901 + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
  902 + srs_assert(conf);
  903 +
  904 + // select the origin.
  905 + std::string server = conf->args.at(origin_index % conf->args.size());
  906 + origin_index = (origin_index + 1) % conf->args.size();
  907 +
  908 + std::string s_port = RTMP_DEFAULT_PORT;
  909 + int port = ::atoi(RTMP_DEFAULT_PORT);
  910 + size_t pos = server.find(":");
  911 + if (pos != std::string::npos) {
  912 + s_port = server.substr(pos + 1);
  913 + server = server.substr(0, pos);
  914 + port = ::atoi(s_port.c_str());
  915 + }
  916 +
  917 + // connect to server.
  918 + std::string ip = srs_dns_resolve(server);
  919 + if (ip.empty()) {
  920 + ret = ERROR_SYSTEM_IP_INVALID;
  921 + srs_error("dns resolve server error, ip empty. ret=%d", ret);
  922 + return ret;
  923 + }
  924 +
  925 + // open socket.
  926 + // TODO: FIXME: extract utility method
  927 + int sock = socket(AF_INET, SOCK_STREAM, 0);
  928 + if(sock == -1){
  929 + ret = ERROR_SOCKET_CREATE;
  930 + srs_error("create socket error. ret=%d", ret);
  931 + return ret;
  932 + }
  933 +
  934 + st_netfd_t stsock = st_netfd_open_socket(sock);
  935 + if(stsock == NULL){
  936 + ret = ERROR_ST_OPEN_SOCKET;
  937 + srs_error("st_netfd_open_socket failed. ret=%d", ret);
  938 + return ret;
  939 + }
  940 +
  941 + sockaddr_in addr;
  942 + addr.sin_family = AF_INET;
  943 + addr.sin_port = htons(port);
  944 + addr.sin_addr.s_addr = inet_addr(ip.c_str());
  945 +
  946 + if (st_connect(stsock, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US) == -1){
  947 + ret = ERROR_ST_CONNECT;
  948 + srs_close_stfd(stsock);
  949 + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
  950 + return ret;
  951 + }
  952 + srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);
  953 +
  954 + *pstsock = stsock;
  955 + return ret;
  956 +}
  957 +
  958 +int SrsRtmpConn::do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client)
  959 +{
  960 + int ret = ERROR_SUCCESS;
  961 +
  962 + srs_assert(client);
  963 +
  964 + client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  965 + client->set_send_timeout(SRS_SEND_TIMEOUT_US);
  966 +
  967 + if ((ret = client->handshake()) != ERROR_SUCCESS) {
  968 + srs_error("handshake with server failed. ret=%d", ret);
  969 + return ret;
  970 + }
  971 + if ((ret = client->connect_app(req->app, req->tcUrl, req)) != ERROR_SUCCESS) {
  972 + srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret);
  973 + return ret;
  974 + }
  975 +
  976 + srs_trace("edge token auth ok, tcUrl=%s", req->tcUrl.c_str());
  977 +
  978 + return ret;
  979 +}
  980 +
849 int SrsRtmpConn::http_hooks_on_connect() 981 int SrsRtmpConn::http_hooks_on_connect()
850 { 982 {
851 int ret = ERROR_SUCCESS; 983 int ret = ERROR_SUCCESS;
@@ -47,6 +47,7 @@ class SrsHttpHooks; @@ -47,6 +47,7 @@ class SrsHttpHooks;
47 #endif 47 #endif
48 class SrsBandwidth; 48 class SrsBandwidth;
49 class SrsKbps; 49 class SrsKbps;
  50 +class SrsRtmpClient;
50 51
51 /** 52 /**
52 * the client provides the main logic control for RTMP clients. 53 * the client provides the main logic control for RTMP clients.
@@ -91,6 +92,10 @@ private: @@ -91,6 +92,10 @@ private:
91 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); 92 virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
92 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); 93 virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
93 private: 94 private:
  95 + virtual int check_edge_token_traverse_auth();
  96 + virtual int connect_server(int origin_index, st_netfd_t* pstsock);
  97 + virtual int do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client);
  98 +private:
94 virtual int http_hooks_on_connect(); 99 virtual int http_hooks_on_connect();
95 virtual void http_hooks_on_close(); 100 virtual void http_hooks_on_close();
96 virtual int http_hooks_on_publish(); 101 virtual int http_hooks_on_publish();
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "128" 34 +#define VERSION_REVISION "129"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"