winlin

cleanup connections when terminate server.

@@ -40,6 +40,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) @@ -40,6 +40,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
40 id = 0; 40 id = 0;
41 manager = cm; 41 manager = cm;
42 stfd = c; 42 stfd = c;
  43 + disposed = false;
43 44
44 // the client thread should reap itself, 45 // the client thread should reap itself,
45 // so we never use joinable. 46 // so we never use joinable.
@@ -50,12 +51,24 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) @@ -50,12 +51,24 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
50 51
51 SrsConnection::~SrsConnection() 52 SrsConnection::~SrsConnection()
52 { 53 {
  54 + dispose();
  55 +
  56 + srs_freep(pthread);
  57 +}
  58 +
  59 +void SrsConnection::dispose()
  60 +{
  61 + if (disposed) {
  62 + return;
  63 + }
  64 +
  65 + disposed = true;
  66 +
53 /** 67 /**
54 * when delete the connection, stop the connection, 68 * when delete the connection, stop the connection,
55 * close the underlayer socket, delete the thread. 69 * close the underlayer socket, delete the thread.
56 */ 70 */
57 srs_close_stfd(stfd); 71 srs_close_stfd(stfd);
58 - srs_freep(pthread);  
59 } 72 }
60 73
61 int SrsConnection::start() 74 int SrsConnection::start()
@@ -83,11 +83,20 @@ protected: @@ -83,11 +83,20 @@ protected:
83 * the ip of client. 83 * the ip of client.
84 */ 84 */
85 std::string ip; 85 std::string ip;
  86 + /**
  87 + * whether the connection is disposed,
  88 + * when disposed, connection should stop cycle and cleanup itself.
  89 + */;
  90 + bool disposed;
86 public: 91 public:
87 SrsConnection(IConnectionManager* cm, st_netfd_t c); 92 SrsConnection(IConnectionManager* cm, st_netfd_t c);
88 virtual ~SrsConnection(); 93 virtual ~SrsConnection();
89 public: 94 public:
90 /** 95 /**
  96 + * to dipose the connection.
  97 + */
  98 + virtual void dispose();
  99 + /**
91 * start the client green thread. 100 * start the client green thread.
92 * when server get a client from listener, 101 * when server get a client from listener,
93 * 1. server will create an concrete connection(for instance, RTMP connection), 102 * 1. server will create an concrete connection(for instance, RTMP connection),
@@ -529,7 +529,7 @@ int SrsHttpApi::do_cycle() @@ -529,7 +529,7 @@ int SrsHttpApi::do_cycle()
529 skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US); 529 skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
530 530
531 // process http messages. 531 // process http messages.
532 - for (;;) { 532 + while(!disposed) {
533 ISrsHttpMessage* req = NULL; 533 ISrsHttpMessage* req = NULL;
534 534
535 // get a http message 535 // get a http message
@@ -2533,7 +2533,7 @@ int SrsHttpConn::do_cycle() @@ -2533,7 +2533,7 @@ int SrsHttpConn::do_cycle()
2533 skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US); 2533 skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
2534 2534
2535 // process http messages. 2535 // process http messages.
2536 - for (;;) { 2536 + while (!disposed) {
2537 ISrsHttpMessage* req = NULL; 2537 ISrsHttpMessage* req = NULL;
2538 2538
2539 // get a http message 2539 // get a http message
@@ -320,7 +320,7 @@ int SrsRtmpConn::service_cycle() @@ -320,7 +320,7 @@ int SrsRtmpConn::service_cycle()
320 } 320 }
321 srs_verbose("on_bw_done success"); 321 srs_verbose("on_bw_done success");
322 322
323 - while (true) { 323 + while (!disposed) {
324 ret = stream_service_cycle(); 324 ret = stream_service_cycle();
325 325
326 // stream service must terminated with error, never success. 326 // stream service must terminated with error, never success.
@@ -361,6 +361,8 @@ int SrsRtmpConn::service_cycle() @@ -361,6 +361,8 @@ int SrsRtmpConn::service_cycle()
361 srs_error("control message(%d) reject as error. ret=%d", ret, ret); 361 srs_error("control message(%d) reject as error. ret=%d", ret, ret);
362 return ret; 362 return ret;
363 } 363 }
  364 +
  365 + return ret;
364 } 366 }
365 367
366 int SrsRtmpConn::stream_service_cycle() 368 int SrsRtmpConn::stream_service_cycle()
@@ -635,7 +637,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe @@ -635,7 +637,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
635 // set the sock options. 637 // set the sock options.
636 play_set_sock_options(); 638 play_set_sock_options();
637 639
638 - while (true) { 640 + while (!disposed) {
639 // collect elapse for pithy print. 641 // collect elapse for pithy print.
640 pprint->elapse(); 642 pprint->elapse();
641 643
@@ -865,7 +867,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -865,7 +867,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
865 } 867 }
866 868
867 int64_t nb_msgs = 0; 869 int64_t nb_msgs = 0;
868 - while (true) { 870 + while (!disposed) {
869 pprint->elapse(); 871 pprint->elapse();
870 872
871 // cond wait for error. 873 // cond wait for error.
@@ -514,17 +514,7 @@ void SrsServer::destroy() @@ -514,17 +514,7 @@ void SrsServer::destroy()
514 { 514 {
515 srs_warn("start destroy server"); 515 srs_warn("start destroy server");
516 516
517 - _srs_config->unsubscribe(this);  
518 -  
519 - close_listeners(SrsListenerRtmpStream);  
520 - close_listeners(SrsListenerHttpApi);  
521 - close_listeners(SrsListenerHttpStream);  
522 -  
523 -#ifdef SRS_AUTO_INGEST  
524 - ingester->dispose();  
525 -#endif  
526 -  
527 - SrsSource::dispose_all(); 517 + dispose();
528 518
529 #ifdef SRS_AUTO_HTTP_API 519 #ifdef SRS_AUTO_HTTP_API
530 srs_freep(http_api_mux); 520 srs_freep(http_api_mux);
@@ -550,32 +540,35 @@ void SrsServer::destroy() @@ -550,32 +540,35 @@ void SrsServer::destroy()
550 srs_freep(signal_manager); 540 srs_freep(signal_manager);
551 541
552 srs_freep(handler); 542 srs_freep(handler);
553 -  
554 - // @remark never destroy the connections,  
555 - // for it's still alive.  
556 -  
557 - // @remark never destroy the source,  
558 - // when we free all sources, the fmle publish may retry  
559 - // and segment fault.  
560 -  
561 -#ifdef SRS_MEM_WATCH  
562 - srs_memory_report();  
563 -#endif  
564 } 543 }
565 544
566 void SrsServer::dispose() 545 void SrsServer::dispose()
567 { 546 {
568 _srs_config->unsubscribe(this); 547 _srs_config->unsubscribe(this);
569 548
  549 + // prevent fresh clients.
  550 + close_listeners(SrsListenerRtmpStream);
  551 + close_listeners(SrsListenerHttpApi);
  552 + close_listeners(SrsListenerHttpStream);
  553 + close_listeners(SrsListenerMpegTsOverUdp);
  554 + close_listeners(SrsListenerRtsp);
  555 + close_listeners(SrsListenerFlv);
  556 +
570 #ifdef SRS_AUTO_INGEST 557 #ifdef SRS_AUTO_INGEST
571 ingester->dispose(); 558 ingester->dispose();
572 - srs_trace("gracefully dispose ingesters");  
573 #endif 559 #endif
574 560
575 SrsSource::dispose_all(); 561 SrsSource::dispose_all();
576 - srs_trace("gracefully dispose sources");  
577 562
578 - srs_trace("terminate server"); 563 + while (!conns.empty()) {
  564 + std::vector<SrsConnection*>::iterator it;
  565 + for (it = conns.begin(); it != conns.end(); ++it) {
  566 + SrsConnection* conn = *it;
  567 + conn->dispose();
  568 + }
  569 +
  570 + st_usleep(100 * 1000);
  571 + }
579 572
580 #ifdef SRS_MEM_WATCH 573 #ifdef SRS_MEM_WATCH
581 srs_memory_report(); 574 srs_memory_report();