winlin

refine code, ignore client when no ip.

@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_app_conn.hpp> 24 #include <srs_app_conn.hpp>
25 25
  26 +using namespace std;
  27 +
26 #include <srs_kernel_log.hpp> 28 #include <srs_kernel_log.hpp>
27 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
28 #include <srs_app_utility.hpp> 30 #include <srs_app_utility.hpp>
@@ -36,11 +38,12 @@ IConnectionManager::~IConnectionManager() @@ -36,11 +38,12 @@ IConnectionManager::~IConnectionManager()
36 { 38 {
37 } 39 }
38 40
39 -SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) 41 +SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
40 { 42 {
41 id = 0; 43 id = 0;
42 manager = cm; 44 manager = cm;
43 stfd = c; 45 stfd = c;
  46 + ip = cip;
44 disposed = false; 47 disposed = false;
45 expired = false; 48 expired = false;
46 create_time = srs_get_system_time_ms(); 49 create_time = srs_get_system_time_ms();
@@ -112,8 +115,6 @@ int SrsConnection::cycle() @@ -112,8 +115,6 @@ int SrsConnection::cycle()
112 _srs_context->generate_id(); 115 _srs_context->generate_id();
113 id = _srs_context->get_id(); 116 id = _srs_context->get_id();
114 117
115 - ip = srs_get_peer_ip(st_netfd_fileno(stfd));  
116 -  
117 int oret = ret = do_cycle(); 118 int oret = ret = do_cycle();
118 119
119 // if socket io error, set to closed. 120 // if socket io error, set to closed.
@@ -110,7 +110,7 @@ protected: @@ -110,7 +110,7 @@ protected:
110 */ 110 */
111 int64_t create_time; 111 int64_t create_time;
112 public: 112 public:
113 - SrsConnection(IConnectionManager* cm, st_netfd_t c); 113 + SrsConnection(IConnectionManager* cm, st_netfd_t c, std::string cip);
114 virtual ~SrsConnection(); 114 virtual ~SrsConnection();
115 // interface IKbpsDelta 115 // interface IKbpsDelta
116 public: 116 public:
@@ -1309,8 +1309,8 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -1309,8 +1309,8 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
1309 return srs_api_response_code(w, r, 100); 1309 return srs_api_response_code(w, r, 100);
1310 } 1310 }
1311 1311
1312 -SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)  
1313 - : SrsConnection(cm, fd) 1312 +SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
  1313 + : SrsConnection(cm, fd, cip)
1314 { 1314 {
1315 mux = m; 1315 mux = m;
1316 parser = new SrsHttpParser(); 1316 parser = new SrsHttpParser();
@@ -215,7 +215,7 @@ private: @@ -215,7 +215,7 @@ private:
215 bool crossdomain_required; 215 bool crossdomain_required;
216 bool crossdomain_enabled; 216 bool crossdomain_enabled;
217 public: 217 public:
218 - SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); 218 + SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip);
219 virtual ~SrsHttpApi(); 219 virtual ~SrsHttpApi();
220 // interface IKbpsDelta 220 // interface IKbpsDelta
221 public: 221 public:
@@ -1063,8 +1063,8 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length) @@ -1063,8 +1063,8 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length)
1063 return 0; 1063 return 0;
1064 } 1064 }
1065 1065
1066 -SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m)  
1067 - : SrsConnection(cm, fd) 1066 +SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
  1067 + : SrsConnection(cm, fd, cip)
1068 { 1068 {
1069 parser = new SrsHttpParser(); 1069 parser = new SrsHttpParser();
1070 http_mux = m; 1070 http_mux = m;
@@ -1187,8 +1187,8 @@ int SrsHttpConn::on_disconnect(SrsRequest* req) @@ -1187,8 +1187,8 @@ int SrsHttpConn::on_disconnect(SrsRequest* req)
1187 return ret; 1187 return ret;
1188 } 1188 }
1189 1189
1190 -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m)  
1191 - : SrsHttpConn(cm, fd, m) 1190 +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
  1191 + : SrsHttpConn(cm, fd, m, cip)
1192 { 1192 {
1193 } 1193 }
1194 1194
@@ -350,7 +350,7 @@ private: @@ -350,7 +350,7 @@ private:
350 SrsHttpParser* parser; 350 SrsHttpParser* parser;
351 ISrsHttpServeMux* http_mux; 351 ISrsHttpServeMux* http_mux;
352 public: 352 public:
353 - SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); 353 + SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
354 virtual ~SrsHttpConn(); 354 virtual ~SrsHttpConn();
355 // interface IKbpsDelta 355 // interface IKbpsDelta
356 public: 356 public:
@@ -381,7 +381,7 @@ private: @@ -381,7 +381,7 @@ private:
381 class SrsResponseOnlyHttpConn : public SrsHttpConn 381 class SrsResponseOnlyHttpConn : public SrsHttpConn
382 { 382 {
383 public: 383 public:
384 - SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); 384 + SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
385 virtual ~SrsResponseOnlyHttpConn(); 385 virtual ~SrsResponseOnlyHttpConn();
386 public: 386 public:
387 virtual int on_got_http_message(ISrsHttpMessage* msg); 387 virtual int on_got_http_message(ISrsHttpMessage* msg);
@@ -312,11 +312,11 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) @@ -312,11 +312,11 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
312 } 312 }
313 313
314 #ifdef SRS_AUTO_KAFKA 314 #ifdef SRS_AUTO_KAFKA
315 -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c) 315 +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, string cip)
316 #else 316 #else
317 -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) 317 +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
318 #endif 318 #endif
319 - : SrsConnection(svr, c) 319 + : SrsConnection(svr, c, cip)
320 { 320 {
321 server = svr; 321 server = svr;
322 #ifdef SRS_AUTO_KAFKA 322 #ifdef SRS_AUTO_KAFKA
@@ -373,7 +373,7 @@ int SrsRtmpConn::do_cycle() @@ -373,7 +373,7 @@ int SrsRtmpConn::do_cycle()
373 { 373 {
374 int ret = ERROR_SUCCESS; 374 int ret = ERROR_SUCCESS;
375 375
376 - srs_trace("RTMP client ip=%s", ip.c_str()); 376 + srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), st_netfd_fileno(stfd));
377 377
378 // notify kafka cluster. 378 // notify kafka cluster.
379 #ifdef SRS_AUTO_KAFKA 379 #ifdef SRS_AUTO_KAFKA
@@ -144,9 +144,9 @@ private: @@ -144,9 +144,9 @@ private:
144 #endif 144 #endif
145 public: 145 public:
146 #ifdef SRS_AUTO_KAFKA 146 #ifdef SRS_AUTO_KAFKA
147 - SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c); 147 + SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, std::string cip);
148 #else 148 #else
149 - SrsRtmpConn(SrsServer* svr, st_netfd_t c); 149 + SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip);
150 #endif 150 #endif
151 virtual ~SrsRtmpConn(); 151 virtual ~SrsRtmpConn();
152 public: 152 public:
@@ -1241,25 +1241,55 @@ void SrsServer::resample_kbps() @@ -1241,25 +1241,55 @@ void SrsServer::resample_kbps()
1241 srs_update_rtmp_server((int)conns.size(), kbps); 1241 srs_update_rtmp_server((int)conns.size(), kbps);
1242 } 1242 }
1243 1243
1244 -int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) 1244 +int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd)
1245 { 1245 {
1246 int ret = ERROR_SUCCESS; 1246 int ret = ERROR_SUCCESS;
1247 1247
1248 - int fd = st_netfd_fileno(client_stfd); 1248 + SrsConnection* conn = fd2conn(type, stfd);
  1249 + if (conn == NULL) {
  1250 + srs_close_stfd(stfd);
  1251 + return ERROR_SUCCESS;
  1252 + }
  1253 + srs_assert(conn);
  1254 +
  1255 + // directly enqueue, the cycle thread will remove the client.
  1256 + conns.push_back(conn);
  1257 + srs_verbose("add conn to vector.");
  1258 +
  1259 + // cycle will start process thread and when finished remove the client.
  1260 + // @remark never use the conn, for it maybe destroyed.
  1261 + if ((ret = conn->start()) != ERROR_SUCCESS) {
  1262 + return ret;
  1263 + }
  1264 + srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
  1265 +
  1266 + return ret;
  1267 +}
  1268 +
  1269 +SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd)
  1270 +{
  1271 + int ret = ERROR_SUCCESS;
  1272 +
  1273 + int fd = st_netfd_fileno(stfd);
  1274 + string ip = srs_get_peer_ip(fd);
  1275 +
  1276 + // for some keep alive application, for example, the keepalived,
  1277 + // will send some tcp packet which we cann't got the ip,
  1278 + // we just ignore it.
  1279 + if (ip.empty()) {
  1280 + srs_info("ignore empty ip client, fd=%d.", fd);
  1281 + return NULL;
  1282 + }
1249 1283
1250 // check connection limitation. 1284 // check connection limitation.
1251 int max_connections = _srs_config->get_max_connections(); 1285 int max_connections = _srs_config->get_max_connections();
1252 if (handler && (ret = handler->on_accept_client(max_connections, (int)conns.size()) != ERROR_SUCCESS)) { 1286 if (handler && (ret = handler->on_accept_client(max_connections, (int)conns.size()) != ERROR_SUCCESS)) {
1253 - srs_error("handle accept client failed, drop client: "  
1254 - "clients=%d, max=%d, fd=%d. ret=%d", (int)conns.size(), max_connections, fd, ret);  
1255 - srs_close_stfd(client_stfd);  
1256 - return ERROR_SUCCESS; 1287 + srs_error("handle accept client failed, drop client: clients=%d, max=%d, fd=%d. ret=%d", (int)conns.size(), max_connections, fd, ret);
  1288 + return NULL;
1257 } 1289 }
1258 if ((int)conns.size() >= max_connections) { 1290 if ((int)conns.size() >= max_connections) {
1259 - srs_error("exceed the max connections, drop client: "  
1260 - "clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);  
1261 - srs_close_stfd(client_stfd);  
1262 - return ERROR_SUCCESS; 1291 + srs_error("exceed the max connections, drop client: clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);
  1292 + return NULL;
1263 } 1293 }
1264 1294
1265 // avoid fd leak when fork. 1295 // avoid fd leak when fork.
@@ -1269,56 +1299,40 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1269,56 +1299,40 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1269 if ((val = fcntl(fd, F_GETFD, 0)) < 0) { 1299 if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
1270 ret = ERROR_SYSTEM_PID_GET_FILE_INFO; 1300 ret = ERROR_SYSTEM_PID_GET_FILE_INFO;
1271 srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret); 1301 srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret);
1272 - srs_close_stfd(client_stfd);  
1273 - return ret; 1302 + return NULL;
1274 } 1303 }
1275 val |= FD_CLOEXEC; 1304 val |= FD_CLOEXEC;
1276 if (fcntl(fd, F_SETFD, val) < 0) { 1305 if (fcntl(fd, F_SETFD, val) < 0) {
1277 ret = ERROR_SYSTEM_PID_SET_FILE_INFO; 1306 ret = ERROR_SYSTEM_PID_SET_FILE_INFO;
1278 srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret); 1307 srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret);
1279 - srs_close_stfd(client_stfd);  
1280 - return ret; 1308 + return NULL;
1281 } 1309 }
1282 } 1310 }
1283 1311
1284 SrsConnection* conn = NULL; 1312 SrsConnection* conn = NULL;
  1313 +
1285 if (type == SrsListenerRtmpStream) { 1314 if (type == SrsListenerRtmpStream) {
1286 - conn = new SrsRtmpConn(this, kafka, client_stfd); 1315 + conn = new SrsRtmpConn(this, kafka, stfd, ip);
1287 } else if (type == SrsListenerHttpApi) { 1316 } else if (type == SrsListenerHttpApi) {
1288 #ifdef SRS_AUTO_HTTP_API 1317 #ifdef SRS_AUTO_HTTP_API
1289 - conn = new SrsHttpApi(this, client_stfd, http_api_mux); 1318 + conn = new SrsHttpApi(this, stfd, http_api_mux, ip);
1290 #else 1319 #else
1291 srs_warn("close http client for server not support http-api"); 1320 srs_warn("close http client for server not support http-api");
1292 - srs_close_stfd(client_stfd); 1321 + srs_close_stfd(stfd);
1293 return ret; 1322 return ret;
1294 #endif 1323 #endif
1295 } else if (type == SrsListenerHttpStream) { 1324 } else if (type == SrsListenerHttpStream) {
1296 #ifdef SRS_AUTO_HTTP_SERVER 1325 #ifdef SRS_AUTO_HTTP_SERVER
1297 - conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server); 1326 + conn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
1298 #else 1327 #else
1299 srs_warn("close http client for server not support http-server"); 1328 srs_warn("close http client for server not support http-server");
1300 - srs_close_stfd(client_stfd);  
1301 - return ret; 1329 + return NULL;
1302 #endif 1330 #endif
1303 } else { 1331 } else {
1304 // TODO: FIXME: handler others 1332 // TODO: FIXME: handler others
1305 } 1333 }
1306 - srs_assert(conn);  
1307 1334
1308 - // directly enqueue, the cycle thread will remove the client.  
1309 - conns.push_back(conn);  
1310 - srs_verbose("add conn to vector.");  
1311 -  
1312 - // cycle will start process thread and when finished remove the client.  
1313 - // @remark never use the conn, for it maybe destroyed.  
1314 - if ((ret = conn->start()) != ERROR_SUCCESS) {  
1315 - return ret;  
1316 - }  
1317 - srs_verbose("conn started success.");  
1318 -  
1319 - srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);  
1320 -  
1321 - return ret; 1335 + return conn;
1322 } 1336 }
1323 1337
1324 void SrsServer::remove(SrsConnection* conn) 1338 void SrsServer::remove(SrsConnection* conn)
@@ -365,9 +365,11 @@ public: @@ -365,9 +365,11 @@ public:
365 * when listener got a fd, notice server to accept it. 365 * when listener got a fd, notice server to accept it.
366 * @param type, the client type, used to create concrete connection, 366 * @param type, the client type, used to create concrete connection,
367 * for instance RTMP connection to serve client. 367 * for instance RTMP connection to serve client.
368 - * @param client_stfd, the client fd in st boxed, the underlayer fd. 368 + * @param stfd, the client fd in st boxed, the underlayer fd.
369 */ 369 */
370 - virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); 370 + virtual int accept_client(SrsListenerType type, st_netfd_t stfd);
  371 +private:
  372 + virtual SrsConnection* fd2conn(SrsListenerType type, st_netfd_t stfd);
371 // IConnectionManager 373 // IConnectionManager
372 public: 374 public:
373 /** 375 /**