diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index a03a17489..8cedcfec3 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -40,6 +40,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) id = 0; manager = cm; stfd = c; + disposed = false; // the client thread should reap itself, // so we never use joinable. @@ -50,12 +51,24 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) SrsConnection::~SrsConnection() { + dispose(); + + srs_freep(pthread); +} + +void SrsConnection::dispose() +{ + if (disposed) { + return; + } + + disposed = true; + /** * when delete the connection, stop the connection, * close the underlayer socket, delete the thread. */ srs_close_stfd(stfd); - srs_freep(pthread); } int SrsConnection::start() diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index e1845406c..e9426c395 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -83,10 +83,19 @@ protected: * the ip of client. */ std::string ip; + /** + * whether the connection is disposed, + * when disposed, connection should stop cycle and cleanup itself. + */; + bool disposed; public: SrsConnection(IConnectionManager* cm, st_netfd_t c); virtual ~SrsConnection(); public: + /** + * to dipose the connection. + */ + virtual void dispose(); /** * start the client green thread. * when server get a client from listener, diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 393943f34..6b25c9544 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -529,7 +529,7 @@ int SrsHttpApi::do_cycle() skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US); // process http messages. - for (;;) { + while(!disposed) { ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 3caf7f28a..4f7cac5bd 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -2533,7 +2533,7 @@ int SrsHttpConn::do_cycle() skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US); // process http messages. - for (;;) { + while (!disposed) { ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index a1e9815db..78710d34d 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -320,7 +320,7 @@ int SrsRtmpConn::service_cycle() } srs_verbose("on_bw_done success"); - while (true) { + while (!disposed) { ret = stream_service_cycle(); // stream service must terminated with error, never success. @@ -361,6 +361,8 @@ int SrsRtmpConn::service_cycle() srs_error("control message(%d) reject as error. ret=%d", ret, ret); return ret; } + + return ret; } int SrsRtmpConn::stream_service_cycle() @@ -635,7 +637,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // set the sock options. play_set_sock_options(); - while (true) { + while (!disposed) { // collect elapse for pithy print. pprint->elapse(); @@ -865,7 +867,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) } int64_t nb_msgs = 0; - while (true) { + while (!disposed) { pprint->elapse(); // cond wait for error. diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d1c64b50b..575372d83 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -514,17 +514,7 @@ void SrsServer::destroy() { srs_warn("start destroy server"); - _srs_config->unsubscribe(this); - - close_listeners(SrsListenerRtmpStream); - close_listeners(SrsListenerHttpApi); - close_listeners(SrsListenerHttpStream); - -#ifdef SRS_AUTO_INGEST - ingester->dispose(); -#endif - - SrsSource::dispose_all(); + dispose(); #ifdef SRS_AUTO_HTTP_API srs_freep(http_api_mux); @@ -550,32 +540,35 @@ void SrsServer::destroy() srs_freep(signal_manager); srs_freep(handler); - - // @remark never destroy the connections, - // for it's still alive. - - // @remark never destroy the source, - // when we free all sources, the fmle publish may retry - // and segment fault. - -#ifdef SRS_MEM_WATCH - srs_memory_report(); -#endif } void SrsServer::dispose() { _srs_config->unsubscribe(this); + // prevent fresh clients. + close_listeners(SrsListenerRtmpStream); + close_listeners(SrsListenerHttpApi); + close_listeners(SrsListenerHttpStream); + close_listeners(SrsListenerMpegTsOverUdp); + close_listeners(SrsListenerRtsp); + close_listeners(SrsListenerFlv); + #ifdef SRS_AUTO_INGEST ingester->dispose(); - srs_trace("gracefully dispose ingesters"); #endif SrsSource::dispose_all(); - srs_trace("gracefully dispose sources"); - srs_trace("terminate server"); + while (!conns.empty()) { + std::vector::iterator it; + for (it = conns.begin(); it != conns.end(); ++it) { + SrsConnection* conn = *it; + conn->dispose(); + } + + st_usleep(100 * 1000); + } #ifdef SRS_MEM_WATCH srs_memory_report();