From 1bf99e8f3e0d8249d4ba5780d6beab7b20a14f2d Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 30 May 2017 09:05:02 +0800 Subject: [PATCH] For #907, Wrap ST, only use in service ST. --- README.md | 1 + trunk/src/app/srs_app_async_call.cpp | 10 +- trunk/src/app/srs_app_async_call.hpp | 2 +- trunk/src/app/srs_app_bandwidth.cpp | 8 +- trunk/src/app/srs_app_caster_flv.cpp | 6 +- trunk/src/app/srs_app_caster_flv.hpp | 4 +- trunk/src/app/srs_app_conn.cpp | 2 +- trunk/src/app/srs_app_conn.hpp | 4 +- trunk/src/app/srs_app_edge.cpp | 6 +- trunk/src/app/srs_app_encoder.cpp | 2 +- trunk/src/app/srs_app_forward.cpp | 2 +- trunk/src/app/srs_app_hls.cpp | 2 +- trunk/src/app/srs_app_hourglass.cpp | 2 +- trunk/src/app/srs_app_http_api.cpp | 2 +- trunk/src/app/srs_app_http_api.hpp | 2 +- trunk/src/app/srs_app_http_conn.cpp | 4 +- trunk/src/app/srs_app_http_conn.hpp | 4 +- trunk/src/app/srs_app_http_stream.cpp | 6 +- trunk/src/app/srs_app_ingest.cpp | 2 +- trunk/src/app/srs_app_kafka.cpp | 22 +-- trunk/src/app/srs_app_kafka.hpp | 4 +- trunk/src/app/srs_app_listener.cpp | 17 +- trunk/src/app/srs_app_listener.hpp | 10 +- trunk/src/app/srs_app_log.cpp | 1 + trunk/src/app/srs_app_ng_exec.cpp | 2 +- trunk/src/app/srs_app_recv_thread.cpp | 15 +- trunk/src/app/srs_app_recv_thread.hpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 18 +- trunk/src/app/srs_app_rtmp_conn.hpp | 2 +- trunk/src/app/srs_app_rtsp.cpp | 6 +- trunk/src/app/srs_app_rtsp.hpp | 6 +- trunk/src/app/srs_app_server.cpp | 22 +-- trunk/src/app/srs_app_server.hpp | 13 +- trunk/src/app/srs_app_source.cpp | 14 +- trunk/src/app/srs_app_source.hpp | 2 +- trunk/src/app/srs_app_st.cpp | 184 +------------------- trunk/src/app/srs_app_st.hpp | 130 +------------- trunk/src/app/srs_app_thread.cpp | 8 +- trunk/src/app/srs_app_thread.hpp | 2 +- trunk/src/app/srs_app_utility.cpp | 4 +- trunk/src/main/srs_main_ingest_hls.cpp | 2 +- trunk/src/main/srs_main_server.cpp | 1 + trunk/src/service/srs_service_log.cpp | 11 +- trunk/src/service/srs_service_log.hpp | 2 +- trunk/src/service/srs_service_rtmp_conn.cpp | 1 + trunk/src/service/srs_service_st.cpp | 168 ++++++++++++++++-- trunk/src/service/srs_service_st.hpp | 50 +++++- trunk/src/service/srs_service_utility.cpp | 59 ------- trunk/src/service/srs_service_utility.hpp | 4 - 49 files changed, 340 insertions(+), 513 deletions(-) diff --git a/README.md b/README.md index f608514e2..8f02ddb24 100755 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ Please select your language: - [ ] Support HLS+, please read [#466][bug #466] and [#468][bug #468]. ### Change Logs + ### V3 changes diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index c3a30d288..e32b536ff 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -39,7 +39,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() SrsAsyncCallWorker::SrsAsyncCallWorker() { trd = NULL; - wait = st_cond_new(); + wait = srs_cond_new(); } SrsAsyncCallWorker::~SrsAsyncCallWorker() @@ -53,7 +53,7 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker() } tasks.clear(); - st_cond_destroy(wait); + srs_cond_destroy(wait); } int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) @@ -61,7 +61,7 @@ int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) int ret = ERROR_SUCCESS; tasks.push_back(t); - st_cond_signal(wait); + srs_cond_signal(wait); return ret; } @@ -80,7 +80,7 @@ int SrsAsyncCallWorker::start() void SrsAsyncCallWorker::stop() { - st_cond_signal(wait); + srs_cond_signal(wait); trd->stop(); } @@ -90,7 +90,7 @@ int SrsAsyncCallWorker::cycle() while (!trd->pull()) { if (tasks.empty()) { - st_cond_wait(wait); + srs_cond_wait(wait); } std::vector copy = tasks; diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 0d303dfc5..14fcce6f1 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -69,7 +69,7 @@ private: SrsCoroutine* trd; protected: std::vector tasks; - st_cond_t wait; + srs_cond_t wait; public: SrsAsyncCallWorker(); virtual ~SrsAsyncCallWorker(); diff --git a/trunk/src/app/srs_app_bandwidth.cpp b/trunk/src/app/srs_app_bandwidth.cpp index 5f828e7b6..529e85e18 100644 --- a/trunk/src/app/srs_app_bandwidth.cpp +++ b/trunk/src/app/srs_app_bandwidth.cpp @@ -245,7 +245,7 @@ int SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit) return ret; } - st_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000); + srs_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000); srs_info("BW check finished."); return ret; @@ -291,7 +291,7 @@ int SrsBandwidth::play_checking(SrsBandwidthSample* sample, SrsKbpsLimit* limit) srs_update_system_time_ms(); int64_t starttime = srs_get_system_time_ms(); while ((srs_get_system_time_ms() - starttime) < sample->duration_ms) { - st_usleep(sample->interval_ms); + srs_usleep(sample->interval_ms); // TODO: FIXME: use shared ptr message. SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing(); @@ -499,7 +499,7 @@ void SrsKbpsLimit::recv_limit() while (_kbps->get_recv_kbps() > _limit_kbps) { _kbps->sample(); - st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000); + srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000); } } @@ -510,7 +510,7 @@ void SrsKbpsLimit::send_limit() while (_kbps->get_send_kbps() > _limit_kbps) { _kbps->sample(); - st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000); + srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000); } } diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 64f57835d..2dbc34f4d 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -75,11 +75,11 @@ int SrsAppCasterFlv::initialize() return ret; } -int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd) +int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; - string ip = srs_get_peer_ip(st_netfd_fileno(stfd)); + string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip); conns.push_back(conn); @@ -131,7 +131,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) return conn->proxy(w, r, o); } -SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip) +SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip) : SrsHttpConn(cm, fd, m, cip) { sdk = NULL; diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index dcdcee8d4..f22e074a6 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -66,7 +66,7 @@ public: virtual int initialize(); // ISrsTcpHandler public: - virtual int on_tcp_client(st_netfd_t stfd); + virtual int on_tcp_client(srs_netfd_t stfd); // IConnectionManager public: virtual void remove(ISrsConnection* c); @@ -85,7 +85,7 @@ private: SrsPithyPrint* pprint; SrsSimpleRtmpClient* sdk; public: - SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip); + SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip); virtual ~SrsDynamicHttpConn(); public: virtual int on_got_http_message(ISrsHttpMessage* msg); diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 340a3aaa9..fb215454c 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -30,7 +30,7 @@ using namespace std; #include #include -SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip) +SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip) { manager = cm; stfd = c; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 00b447a68..9e9c502dd 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -55,7 +55,7 @@ protected: /** * the underlayer st fd handler. */ - st_netfd_t stfd; + srs_netfd_t stfd; /** * the ip of client. */ @@ -77,7 +77,7 @@ protected: */ int64_t create_time; public: - SrsConnection(IConnectionManager* cm, st_netfd_t c, std::string cip); + SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip); virtual ~SrsConnection(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 74cbb18ae..efe649ef9 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -232,7 +232,7 @@ int SrsEdgeIngester::cycle() } if (!trd->pull()) { - st_usleep(SRS_EDGE_INGESTER_CIMS * 1000); + srs_usleep(SRS_EDGE_INGESTER_CIMS * 1000); } } return ret; @@ -517,7 +517,7 @@ int SrsEdgeForwarder::cycle() } if (!trd->pull()) { - st_usleep(SRS_EDGE_FORWARDER_CIMS * 1000); + srs_usleep(SRS_EDGE_FORWARDER_CIMS * 1000); } } return ret; @@ -538,7 +538,7 @@ int SrsEdgeForwarder::do_cycle() while (!trd->pull()) { if (send_error_code != ERROR_SUCCESS) { - st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000); + srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000); continue; } diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index a2c5be72c..39ba45876 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -102,7 +102,7 @@ int SrsEncoder::cycle() } if (!trd->pull()) { - st_usleep(SRS_RTMP_ENCODER_CIMS * 1000); + srs_usleep(SRS_RTMP_ENCODER_CIMS * 1000); } } diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 0fe4b0a2c..165c5e36d 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -231,7 +231,7 @@ int SrsForwarder::cycle() } if (!trd->pull()) { - st_usleep(SRS_FORWARDER_CIMS * 1000); + srs_usleep(SRS_FORWARDER_CIMS * 1000); } } diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 7b9e80bf3..c6ffca683 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -29,7 +29,7 @@ #include #include #include - +#include #include #include using namespace std; diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index a48dafb0a..8a3f6913d 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -80,7 +80,7 @@ int SrsHourGlass::cycle() } total_elapse += resolution; - st_usleep(resolution * 1000); + srs_usleep(resolution * 1000); return ret; } diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index c4a654cb5..90bcea87c 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1290,7 +1290,7 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) return srs_api_response_code(w, r, 100); } -SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip) +SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip) : SrsConnection(cm, fd, cip) { mux = m; diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index ac9273048..d577d1b93 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -208,7 +208,7 @@ private: SrsHttpCorsMux* cors; SrsHttpServeMux* mux; public: - SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip); + SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip); virtual ~SrsHttpApi(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 89b065206..001395c78 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -59,7 +59,7 @@ using namespace std; #include #include -SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip) +SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip) : SrsConnection(cm, fd, cip) { parser = new SrsHttpParser(); @@ -204,7 +204,7 @@ int SrsHttpConn::on_reload_http_stream_crossdomain() return ret; } -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip) +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip) : SrsHttpConn(cm, fd, m, cip) { } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index bd4d4d4aa..6802095c1 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -65,7 +65,7 @@ protected: ISrsHttpServeMux* http_mux; SrsHttpCorsMux* cors; public: - SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip); + SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip); virtual ~SrsHttpConn(); // interface IKbpsDelta public: @@ -99,7 +99,7 @@ public: class SrsResponseOnlyHttpConn : public SrsHttpConn { public: - SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip); + SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip); virtual ~SrsResponseOnlyHttpConn(); public: // Directly read a HTTP request message. diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 7696f2d66..f85ea1e86 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -116,7 +116,7 @@ int SrsBufferCache::cycle() // TODO: FIXME: support reload. if (fast_cache <= 0) { - st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS); + srs_usleep(SRS_STREAM_CACHE_CYCLE_SECONDS * 1000 * 1000); return ret; } @@ -152,7 +152,7 @@ int SrsBufferCache::cycle() if (count <= 0) { srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS); // directly use sleep, donot use consumer wait. - st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); + srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); // ignore when nothing got. continue; @@ -572,7 +572,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) if (count <= 0) { srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS); // directly use sleep, donot use consumer wait. - st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); + srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); // ignore when nothing got. continue; diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index bbc51b204..0565445f3 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -183,7 +183,7 @@ int SrsIngester::cycle() } if (!trd->pull()) { - st_usleep(SRS_AUTO_INGESTER_CIMS * 1000); + srs_usleep(SRS_AUTO_INGESTER_CIMS * 1000); } } diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 3c33ae6cc..6f435cc2b 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -362,9 +362,9 @@ void srs_dispose_kafka() SrsKafkaProducer::SrsKafkaProducer() { metadata_ok = false; - metadata_expired = st_cond_new(); + metadata_expired = srs_cond_new(); - lock = st_mutex_new(); + lock = srs_mutex_new(); trd = NULL; worker = new SrsAsyncCallWorker(); cache = new SrsKafkaCache(); @@ -382,8 +382,8 @@ SrsKafkaProducer::~SrsKafkaProducer() srs_freep(trd); srs_freep(cache); - st_mutex_destroy(lock); - st_cond_destroy(metadata_expired); + srs_mutex_destroy(lock); + srs_cond_destroy(metadata_expired); } int SrsKafkaProducer::initialize() @@ -448,14 +448,14 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj) } // sync with backgound metadata worker. - st_mutex_lock(lock); + srs_mutex_lock(lock); // flush message when metadata is ok. if (metadata_ok) { ret = flush(); } - st_mutex_unlock(lock); + srs_mutex_unlock(lock); return ret; } @@ -503,7 +503,7 @@ int SrsKafkaProducer::cycle() } if (!trd->pull()) { - st_usleep(SRS_KAKFA_CIMS * 1000); + srs_usleep(SRS_KAKFA_CIMS * 1000); } } @@ -515,18 +515,18 @@ int SrsKafkaProducer::on_before_cycle() // wait for the metadata expired. // when metadata is ok, wait for it expired. if (metadata_ok) { - st_cond_wait(metadata_expired); + srs_cond_wait(metadata_expired); } // request to lock to acquire the socket. - st_mutex_lock(lock); + srs_mutex_lock(lock); return ERROR_SUCCESS; } int SrsKafkaProducer::on_end_cycle() { - st_mutex_unlock(lock); + srs_mutex_unlock(lock); return ERROR_SUCCESS; } @@ -644,7 +644,7 @@ void SrsKafkaProducer::refresh_metadata() clear_metadata(); metadata_ok = false; - st_cond_signal(metadata_expired); + srs_cond_signal(metadata_expired); srs_trace("kafka async refresh metadata in background"); } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index ef2dcec1a..72b126c1e 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -163,11 +163,11 @@ class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISr private: // TODO: FIXME: support reload. bool enabled; - st_mutex_t lock; + srs_mutex_t lock; SrsCoroutine* trd; private: bool metadata_ok; - st_cond_t metadata_expired; + srs_cond_t metadata_expired; public: std::vector partitions; SrsKafkaCache* cache; diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 5b047f2a4..d2ef6512a 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -30,6 +30,7 @@ #include #include #include +#include using namespace std; #include @@ -54,7 +55,7 @@ ISrsUdpHandler::~ISrsUdpHandler() { } -int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/) +int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) { return ERROR_SUCCESS; } @@ -101,7 +102,7 @@ int SrsUdpListener::fd() return _fd; } -st_netfd_t SrsUdpListener::stfd() +srs_netfd_t SrsUdpListener::stfd() { return _stfd; } @@ -131,7 +132,7 @@ int SrsUdpListener::listen() } srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((_stfd = st_netfd_open_socket(_fd)) == NULL){ + if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ ret = ERROR_ST_OPEN_SOCKET; srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; @@ -159,7 +160,7 @@ int SrsUdpListener::cycle() int nb_from = sizeof(sockaddr_in); int nread = 0; - if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { srs_warn("ignore recv udp packet failed, nread=%d", nread); return ret; } @@ -170,7 +171,7 @@ int SrsUdpListener::cycle() } if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { - st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + srs_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); } } @@ -233,7 +234,7 @@ int SrsTcpListener::listen() } srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((_stfd = st_netfd_open_socket(_fd)) == NULL){ + if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ ret = ERROR_ST_OPEN_SOCKET; srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; @@ -256,8 +257,8 @@ int SrsTcpListener::cycle() int ret = ERROR_SUCCESS; while (!trd->pull()) { - st_netfd_t stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); - int fd = st_netfd_fileno(stfd); + srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + int fd = srs_netfd_fileno(stfd); srs_fd_close_exec(fd); diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index cad00c87e..39af63c40 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -46,7 +46,7 @@ public: * when fd changed, for instance, reload the listen port, * notify the handler and user can do something. */ - virtual int on_stfd_change(st_netfd_t fd); + virtual int on_stfd_change(srs_netfd_t fd); public: /** * when udp listener got a udp packet, notice server to process it. @@ -72,7 +72,7 @@ public: /** * when got tcp client. */ - virtual int on_tcp_client(st_netfd_t stfd) = 0; + virtual int on_tcp_client(srs_netfd_t stfd) = 0; }; /** @@ -82,7 +82,7 @@ class SrsUdpListener : public ISrsCoroutineHandler { private: int _fd; - st_netfd_t _stfd; + srs_netfd_t _stfd; SrsCoroutine* trd; private: char* buf; @@ -96,7 +96,7 @@ public: virtual ~SrsUdpListener(); public: virtual int fd(); - virtual st_netfd_t stfd(); + virtual srs_netfd_t stfd(); public: virtual int listen(); // interface ISrsReusableThreadHandler. @@ -111,7 +111,7 @@ class SrsTcpListener : public ISrsCoroutineHandler { private: int _fd; - st_netfd_t _stfd; + srs_netfd_t _stfd; SrsCoroutine* trd; private: ISrsTcpHandler* handler; diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 7180011b0..78149a186 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index 8674bc4b5..e132b2f2f 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -88,7 +88,7 @@ int SrsNgExec::cycle() } if (!trd->pull()) { - st_usleep(SRS_RTMP_EXEC_CIMS * 1000); + srs_usleep(SRS_RTMP_EXEC_CIMS * 1000); } } diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 62e87cf0f..b24ffd1f9 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -34,6 +34,7 @@ #include #include +#include using namespace std; // the max small bytes to group @@ -120,7 +121,7 @@ int SrsRecvThread::do_cycle() while (!trd->pull()) { // When the pumper is interrupted, wait then retry. if (pumper->interrupted()) { - st_usleep(timeout * 1000); + srs_usleep(timeout * 1000); continue; } @@ -265,7 +266,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* recv_error_code = ERROR_SUCCESS; _nb_msgs = 0; video_frames = 0; - error = st_cond_new(); + error = srs_cond_new(); ncid = cid = 0; req = _req; @@ -286,7 +287,7 @@ SrsPublishRecvThread::~SrsPublishRecvThread() _srs_config->unsubscribe(this); trd.stop(); - st_cond_destroy(error); + srs_cond_destroy(error); } int SrsPublishRecvThread::wait(uint64_t timeout_ms) @@ -296,7 +297,7 @@ int SrsPublishRecvThread::wait(uint64_t timeout_ms) } // ignore any return of cond wait. - st_cond_timedwait(error, timeout_ms * 1000); + srs_cond_timedwait(error, timeout_ms * 1000); return ERROR_SUCCESS; } @@ -380,7 +381,7 @@ void SrsPublishRecvThread::interrupt(int ret) // when recv thread error, signal the conn thread to process it. // @see https://github.com/ossrs/srs/issues/244 - st_cond_signal(error); + srs_cond_signal(error); } void SrsPublishRecvThread::on_start() @@ -407,7 +408,7 @@ void SrsPublishRecvThread::on_stop() // when thread stop, signal the conn thread which wait. // @see https://github.com/ossrs/srs/issues/244 - st_cond_signal(error); + srs_cond_signal(error); #ifdef SRS_PERF_MERGED_READ if (mr) { @@ -436,7 +437,7 @@ void SrsPublishRecvThread::on_read(ssize_t nread) * @see https://github.com/ossrs/srs/issues/241 */ if (nread < SRS_MR_SMALL_BYTES) { - st_usleep(mr_sleep * 1000); + srs_usleep(mr_sleep * 1000); } } #endif diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index c84173039..3061a3018 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -183,7 +183,7 @@ private: SrsSource* _source; // the error timeout cond // @see https://github.com/ossrs/srs/issues/244 - st_cond_t error; + srs_cond_t error; // merged context id. int cid; int ncid; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 8c827352f..77cdb61c2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -28,7 +28,7 @@ #include #include #include - +#include using namespace std; #include @@ -110,7 +110,7 @@ SrsClientInfo::~SrsClientInfo() srs_freep(res); } -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnection(svr, c, cip) { server = svr; @@ -161,7 +161,7 @@ int SrsRtmpConn::do_cycle() { int ret = ERROR_SUCCESS; - srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), st_netfd_fileno(stfd)); + srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd)); // notify kafka cluster. #ifdef SRS_AUTO_KAFKA @@ -407,7 +407,7 @@ int SrsRtmpConn::service_cycle() srs_verbose("set peer bandwidth success"); // get the ip which client connected. - std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd)); + std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd)); // do bandwidth test if connect to the vhost which is for bandwidth check. if (_srs_config->get_bw_check_enabled(req->vhost)) { @@ -818,7 +818,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe if (count <= 0) { #ifndef SRS_PERF_QUEUE_COND_WAIT srs_info("mw sleep %dms for no msg", mw_sleep); - st_usleep(mw_sleep * 1000); + srs_usleep(mw_sleep * 1000); #else srs_verbose("mw wait %dms and got nothing.", mw_sleep); #endif @@ -864,7 +864,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // apply the minimal interval for delivery stream in ms. if (send_min_interval > 0) { - st_usleep((int64_t)(send_min_interval * 1000)); + srs_usleep((int64_t)(send_min_interval * 1000)); } } @@ -893,7 +893,7 @@ int SrsRtmpConn::publishing(SrsSource* source) if ((ret = acquire_publish(source)) == ERROR_SUCCESS) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 - SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source); + SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source); srs_info("start to publish stream %s success", req->stream.c_str()); ret = do_publishing(source, &rtrd); @@ -1243,7 +1243,7 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) } // get the sock buffer size. - int fd = st_netfd_fileno(stfd); + int fd = srs_netfd_fileno(stfd); int onb_sbuf = 0; socklen_t sock_buf_size = sizeof(int); getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size); @@ -1295,7 +1295,7 @@ void SrsRtmpConn::set_sock_options() if (nvalue != tcp_nodelay) { tcp_nodelay = nvalue; #ifdef SRS_PERF_TCP_NODELAY - int fd = st_netfd_fileno(stfd); + int fd = srs_netfd_fileno(stfd); socklen_t nb_v = sizeof(int); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 312d68718..9c37bcd20 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -127,7 +127,7 @@ private: // About the rtmp client. SrsClientInfo* info; public: - SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); + SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip); virtual ~SrsRtmpConn(); public: virtual void dispose(); diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index c2fdfedeb..d0f83b2cb 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -183,7 +183,7 @@ int SrsRtspJitter::correct(int64_t& ts) return ret; } -SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) +SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o) { output_template = o; @@ -245,7 +245,7 @@ int SrsRtspConn::do_cycle() int ret = ERROR_SUCCESS; // retrieve ip of client. - std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd)); + std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); srs_trace("rtsp: serve %s", ip.c_str()); // consume all rtsp messages. @@ -746,7 +746,7 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax) srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax); } -int SrsRtspCaster::on_tcp_client(st_netfd_t stfd) +int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index e40aeee84..7eef307f8 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -129,7 +129,7 @@ private: int audio_channel; SrsRtpConn* audio_rtp; private: - st_netfd_t stfd; + srs_netfd_t stfd; SrsStSocket* skt; SrsRtspStack* rtsp; SrsRtspCaster* caster; @@ -149,7 +149,7 @@ private: std::string aac_specific_config; SrsRtspAudioCache* acache; public: - SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o); + SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o); virtual ~SrsRtspConn(); public: virtual int serve(); @@ -206,7 +206,7 @@ public: virtual void free_port(int lpmin, int lpmax); // interface ISrsTcpHandler public: - virtual int on_tcp_client(st_netfd_t stfd); + virtual int on_tcp_client(srs_netfd_t stfd); // internal methods. public: virtual void remove(SrsRtspConn* conn); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 192192245..f54e1e310 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -28,7 +28,7 @@ #include #include #include - +#include #include using namespace std; @@ -163,7 +163,7 @@ int SrsBufferListener::listen(string i, int p) return ret; } -int SrsBufferListener::on_tcp_client(st_netfd_t stfd) +int SrsBufferListener::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; @@ -219,7 +219,7 @@ int SrsRtspListener::listen(string i, int p) return ret; } -int SrsRtspListener::on_tcp_client(st_netfd_t stfd) +int SrsRtspListener::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; @@ -279,7 +279,7 @@ int SrsHttpFlvListener::listen(string i, int p) return ret; } -int SrsHttpFlvListener::on_tcp_client(st_netfd_t stfd) +int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) { int ret = ERROR_SUCCESS; @@ -391,7 +391,7 @@ int SrsSignalManager::initialize() return ret; } - if ((signal_read_stfd = st_netfd_open(sig_pipe[0])) == NULL) { + if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) { ret = ERROR_SYSTEM_CREATE_PIPE; srs_error("create signal manage st pipe failed. ret=%d", ret); return ret; @@ -444,7 +444,7 @@ int SrsSignalManager::cycle() int signo; /* Read the next signal from the pipe */ - st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT); + srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT); /* Process signal synchronously */ server->on_signal(signo); @@ -863,7 +863,7 @@ int SrsServer::cycle() // remark, for gmc, never invoke the exit(). srs_warn("sleep a long time for system st-threads to cleanup."); - st_usleep(3 * 1000 * 1000); + srs_usleep(3 * 1000 * 1000); srs_warn("system quit"); #else // normally quit with neccessary cleanup by dispose(). @@ -966,7 +966,7 @@ int SrsServer::do_cycle() int dynamic_max = srs_max(max, heartbeat_max_resolution); for (int i = 0; i < dynamic_max; i++) { - st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000); + srs_usleep(SRS_SYS_CYCLE_INTERVAL * 1000); // asprocess check. if (asprocess && ::getppid() != ppid) { @@ -1235,7 +1235,7 @@ void SrsServer::resample_kbps() srs_update_rtmp_server((int)conns.size(), kbps); } -int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd) +int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) { int ret = ERROR_SUCCESS; @@ -1260,11 +1260,11 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd) return ret; } -SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd) +SrsConnection* SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd) { int ret = ERROR_SUCCESS; - int fd = st_netfd_fileno(stfd); + int fd = srs_netfd_fileno(stfd); string ip = srs_get_peer_ip(fd); // for some keep alive application, for example, the keepalived, diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 93c01e61b..dd8b13a8d 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -35,6 +35,7 @@ #include #include #include +#include class SrsServer; class SrsConnection; @@ -107,7 +108,7 @@ public: virtual int listen(std::string ip, int port); // ISrsTcpHandler public: - virtual int on_tcp_client(st_netfd_t stfd); + virtual int on_tcp_client(srs_netfd_t stfd); }; #ifdef SRS_AUTO_STREAM_CASTER @@ -126,7 +127,7 @@ public: virtual int listen(std::string i, int p); // ISrsTcpHandler public: - virtual int on_tcp_client(st_netfd_t stfd); + virtual int on_tcp_client(srs_netfd_t stfd); }; /** @@ -144,7 +145,7 @@ public: virtual int listen(std::string i, int p); // ISrsTcpHandler public: - virtual int on_tcp_client(st_netfd_t stfd); + virtual int on_tcp_client(srs_netfd_t stfd); }; #endif @@ -185,7 +186,7 @@ private: /* Per-process pipe which is used as a signal queue. */ /* Up to PIPE_BUF/sizeof(int) signals can be queued up. */ int sig_pipe[2]; - st_netfd_t signal_read_stfd; + srs_netfd_t signal_read_stfd; private: SrsServer* server; SrsCoroutine* trd; @@ -357,9 +358,9 @@ public: * for instance RTMP connection to serve client. * @param stfd, the client fd in st boxed, the underlayer fd. */ - virtual int accept_client(SrsListenerType type, st_netfd_t stfd); + virtual int accept_client(SrsListenerType type, srs_netfd_t stfd); private: - virtual SrsConnection* fd2conn(SrsListenerType type, st_netfd_t stfd); + virtual SrsConnection* fd2conn(SrsListenerType type, srs_netfd_t stfd); // IConnectionManager public: /** diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d4c979ee8..a695c0af8 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -434,7 +434,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) should_update_source_id = false; #ifdef SRS_PERF_QUEUE_COND_WAIT - mw_wait = st_cond_new(); + mw_wait = srs_cond_new(); mw_min_msgs = 0; mw_duration = 0; mw_waiting = false; @@ -448,7 +448,7 @@ SrsConsumer::~SrsConsumer() srs_freep(queue); #ifdef SRS_PERF_QUEUE_COND_WAIT - st_cond_destroy(mw_wait); + srs_cond_destroy(mw_wait); #endif } @@ -497,14 +497,14 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitte // when encoder republish or overflow. // @see https://github.com/ossrs/srs/pull/749 if (atc && duration_ms < 0) { - st_cond_signal(mw_wait); + srs_cond_signal(mw_wait); mw_waiting = false; return ret; } // when duration ok, signal to flush. if (match_min_msgs && duration_ms > mw_duration) { - st_cond_signal(mw_wait); + srs_cond_signal(mw_wait); mw_waiting = false; return ret; } @@ -550,7 +550,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) void SrsConsumer::wait(int nb_msgs, int duration) { if (paused) { - st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); + srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); return; } @@ -569,7 +569,7 @@ void SrsConsumer::wait(int nb_msgs, int duration) mw_waiting = true; // use cond block wait for high performance mode. - st_cond_wait(mw_wait); + srs_cond_wait(mw_wait); } #endif @@ -587,7 +587,7 @@ void SrsConsumer::wakeup() { #ifdef SRS_PERF_QUEUE_COND_WAIT if (mw_waiting) { - st_cond_signal(mw_wait); + srs_cond_signal(mw_wait); mw_waiting = false; } #endif diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index e97494b5e..495023c9f 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -228,7 +228,7 @@ private: #ifdef SRS_PERF_QUEUE_COND_WAIT // the cond wait for mw. // @see https://github.com/ossrs/srs/issues/251 - st_cond_t mw_wait; + srs_cond_t mw_wait; bool mw_waiting; int mw_min_msgs; int mw_duration; diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 3713d8251..c3e34e3cc 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -23,6 +23,7 @@ #include +#include #include using namespace std; @@ -65,7 +66,7 @@ int SrsCoroutine::start() return ret; } - if((trd = st_thread_create(pfn, this, 1, 0)) == NULL){ + if((trd = (srs_thread_t)st_thread_create(pfn, this, 1, 0)) == NULL){ ret = ERROR_ST_CREATE_CYCLE_THREAD; srs_error("Thread.start: Create thread failed. ret=%d", ret); return ret; @@ -86,7 +87,7 @@ void SrsCoroutine::stop() interrupt(); void* res = NULL; - int ret = st_thread_join(trd, &res); + int ret = st_thread_join((st_thread_t)trd, &res); srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err); srs_assert(!ret); @@ -109,7 +110,7 @@ void SrsCoroutine::interrupt() srs_info("Thread.interrupt: Interrupt thread, err=%d", err); err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err); - st_thread_interrupt(trd); + st_thread_interrupt((st_thread_t)trd); } int SrsCoroutine::pull() @@ -145,180 +146,3 @@ void* SrsCoroutine::pfn(void* arg) return res; } -namespace internal -{ - ISrsThreadHandler::ISrsThreadHandler() - { - } - - ISrsThreadHandler::~ISrsThreadHandler() - { - } - - void ISrsThreadHandler::on_thread_start() - { - } - - int ISrsThreadHandler::on_before_cycle() - { - int ret = ERROR_SUCCESS; - return ret; - } - - int ISrsThreadHandler::on_end_cycle() - { - int ret = ERROR_SUCCESS; - return ret; - } - - void ISrsThreadHandler::on_thread_stop() - { - } - - SrsThread::SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j) - { - name = n; - handler = h; - cims = ims; - - trd = NULL; - loop = false; - context_id = -1; - joinable = j; - } - - SrsThread::~SrsThread() - { - stop(); - } - - int SrsThread::cid() - { - return context_id; - } - - int SrsThread::start() - { - int ret = ERROR_SUCCESS; - - if(trd) { - srs_info("thread %s already running.", name); - return ret; - } - - loop = true; - - if((trd = st_thread_create(pfn, this, (joinable? 1:0), 0)) == NULL){ - ret = ERROR_ST_CREATE_CYCLE_THREAD; - srs_error("st_thread_create failed. ret=%d", ret); - return ret; - } - - return ret; - } - - void SrsThread::stop() - { - if (!trd) { - return; - } - - // notify the cycle to stop loop. - loop = false; - - // the interrupt will cause the socket to read/write error, - // which will terminate the cycle thread. - st_thread_interrupt(trd); - - // when joinable, wait util quit. - if (joinable) { - // wait the thread to exit. - int ret = st_thread_join(trd, NULL); - srs_assert(ret == ERROR_SUCCESS); - } - - trd = NULL; - } - - bool SrsThread::can_loop() - { - return loop; - } - - void SrsThread::stop_loop() - { - loop = false; - } - - void SrsThread::cycle() - { - int ret = ERROR_SUCCESS; - - // TODO: FIXME: it's better for user to specifies the cid, - // because sometimes we need to merge cid, for example, - // the publish thread should use the same cid of connection. - _srs_context->generate_id(); - srs_info("thread %s cycle start", name); - context_id = _srs_context->get_id(); - - srs_assert(handler); - handler->on_thread_start(); - - while (loop) { - if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", name, ret); - goto failed; - } - srs_info("thread %s on before cycle success", name); - - if ((ret = handler->cycle()) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { - srs_warn("thread %s cycle failed, ignored and retry, ret=%d", name, ret); - } - goto failed; - } - srs_info("thread %s cycle success", name); - - if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", name, ret); - goto failed; - } - srs_info("thread %s on end cycle success", name); - - failed: - if (!loop) { - break; - } - - // Should never use no timeout, just ignore it. - // to improve performance, donot sleep when interval is zero. - // @see: https://github.com/ossrs/srs/issues/237 - if (cims != 0 && cims != SRS_CONSTS_NO_TMMS) { - st_usleep(cims * 1000); - } - } - - srs_info("thread %s cycle finished", name); - // @remark in this callback, user may delete this, so never use this->xxx anymore. - handler->on_thread_stop(); - } - - void* SrsThread::pfn(void* arg) - { - SrsThread* obj = (SrsThread*)arg; - srs_assert(obj); - - obj->cycle(); - - // delete cid for valgrind to detect memory leak. - SrsThreadContext* ctx = dynamic_cast(_srs_context); - if (ctx) { - ctx->clear_cid(); - } - - st_thread_exit(NULL); - - return NULL; - } -} - diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 9bf67502e..2d151f06e 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -87,7 +87,7 @@ private: std::string name; ISrsCoroutineHandler* handler; private: - st_thread_t trd; + srs_thread_t trd; int context; int err; private: @@ -135,133 +135,5 @@ private: static void* pfn(void* arg); }; -// the internal classes, user should never use it. -// user should use the public classes at the bellow: -// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread -namespace internal -{ - /** - * the handler for the thread, callback interface. - * the thread model defines as: - * handler->on_thread_start() - * while loop: - * handler->on_before_cycle() - * handler->cycle() - * handler->on_end_cycle() - * if !loop then break for user stop thread. - * sleep(CycleIntervalMilliseconds) - * handler->on_thread_stop() - * when stop, the thread will interrupt the st_thread, - * which will cause the socket to return error and - * terminate the cycle thread. - * - * @remark why should check can_loop() in cycle method? - * when thread interrupt, the socket maybe not got EINT, - * espectially on st_usleep(), so the cycle must check the loop, - * when handler->cycle() has loop itself, for example: - * while (true): - * if (read_from_socket(skt) < 0) break; - * if thread stop when read_from_socket, it's ok, the loop will break, - * but when thread stop interrupt the s_usleep(0), then the loop is - * death loop. - * in a word, the handler->cycle() must: - * while (pthread->can_loop()): - * if (read_from_socket(skt) < 0) break; - * check the loop, then it works. - * - * @remark why should use stop_loop() to terminate thread in itself? - * in the thread itself, that is the cycle method, - * if itself want to terminate the thread, should never use stop(), - * but use stop_loop() to set the loop to false and terminate normally. - * - * @remark when should set the interval_us, and when not? - * the cycle will invoke util cannot loop, eventhough the return code of cycle is error, - * so the interval_us used to sleep for each cycle. - */ - class ISrsThreadHandler - { - public: - ISrsThreadHandler(); - virtual ~ISrsThreadHandler(); - public: - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int cycle() = 0; - virtual int on_end_cycle(); - virtual void on_thread_stop(); - }; - - /** - * provides servies from st_thread_t, - * for common thread usage. - */ - class SrsThread - { - private: - st_thread_t trd; - int context_id; - bool loop; - bool joinable; - const char* name; - private: - ISrsThreadHandler* handler; - // The cycle interval in ms. - int64_t cims; - public: - /** - * initialize the thread. - * @param n, human readable name for st debug. - * @param h, the cycle handler for the thread. - * @param ims, the sleep interval in ms when cycle finished. - * @param j, if joinable, other thread must stop the thread. - * @remark if joinable, thread never quit itself, or memory leak. - * @see: https://github.com/ossrs/srs/issues/78 - * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag - */ - /** - * TODO: FIXME: maybe all thread must be reap by others threads, - * @see: https://github.com/ossrs/srs/issues/77 - */ - SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j); - virtual ~SrsThread(); - public: - /** - * get the context id. @see: ISrsThreadContext.get_id(). - * used for parent thread to get the id. - * @remark when start thread, parent thread will block and wait for this id ready. - */ - virtual int cid(); - /** - * start the thread, invoke the cycle of handler util - * user stop the thread. - * @remark ignore any error of cycle of handler. - * @remark user can start multiple times, ignore if already started. - * @remark wait for the cid is set by thread pfn. - */ - virtual int start(); - /** - * stop the thread, wait for the thread to terminate. - * @remark user can stop multiple times, ignore if already stopped. - */ - virtual void stop(); - public: - /** - * whether the thread should loop, - * used for handler->cycle() which has a loop method, - * to check this method, break if false. - */ - virtual bool can_loop(); - /** - * for the loop thread to stop the loop. - * other thread can directly use stop() to stop loop and wait for quit. - * this stop loop method only set loop to false. - */ - virtual void stop_loop(); - private: - virtual void cycle(); - static void* pfn(void* arg); - }; -} - #endif diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 91bf3a778..4839c3586 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -31,14 +31,14 @@ using namespace std; SrsCoroutineManager::SrsCoroutineManager() { - cond = st_cond_new(); + cond = srs_cond_new(); trd = new SrsCoroutine("manager", this); } SrsCoroutineManager::~SrsCoroutineManager() { srs_freep(trd); - st_cond_destroy(cond); + srs_cond_destroy(cond); clear(); } @@ -51,7 +51,7 @@ int SrsCoroutineManager::start() int SrsCoroutineManager::cycle() { while (!trd->pull()) { - st_cond_wait(cond); + srs_cond_wait(cond); clear(); } @@ -61,7 +61,7 @@ int SrsCoroutineManager::cycle() void SrsCoroutineManager::remove(ISrsConnection* c) { conns.push_back(c); - st_cond_signal(cond); + srs_cond_signal(cond); } void SrsCoroutineManager::clear() diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index d1c009118..5458b7e0e 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -42,7 +42,7 @@ class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public private: SrsCoroutine* trd; std::vector conns; - st_cond_t cond; + srs_cond_t cond; public: SrsCoroutineManager(); virtual ~SrsCoroutineManager(); diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index 5e71f2cc2..b4da46fda 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -179,7 +179,7 @@ int srs_kill_forced(int& pid) // 0 is not quit yet. if (qpid == 0) { - st_usleep(10 * 1000); + srs_usleep(10 * 1000); continue; } @@ -204,7 +204,7 @@ int srs_kill_forced(int& pid) // @remark when we use SIGKILL to kill process, it must be killed, // so we always wait it to quit by infinite loop. while (waitpid(pid, &status, 0) < 0) { - st_usleep(10 * 1000); + srs_usleep(10 * 1000); continue; } diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index 11759fe0f..db5dc8419 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -216,7 +216,7 @@ int SrsIngestHlsInput::connect() int64_t now = srs_update_system_time_ms(); if (now < next_connect_time) { srs_trace("input hls wait for %dms", next_connect_time - now); - st_usleep((next_connect_time - now) * 1000); + srs_usleep((next_connect_time - now) * 1000); } // set all ts to dirty. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index d054d50e1..dbcd3eac4 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -37,6 +37,7 @@ using namespace std; #include #endif +#include using namespace std; #include diff --git a/trunk/src/service/srs_service_log.cpp b/trunk/src/service/srs_service_log.cpp index c71958342..ec7bb5d1a 100644 --- a/trunk/src/service/srs_service_log.cpp +++ b/trunk/src/service/srs_service_log.cpp @@ -25,6 +25,7 @@ #include #include +#include using namespace std; #include @@ -45,18 +46,18 @@ int SrsThreadContext::generate_id() static int id = 100; int gid = id++; - cache[st_thread_self()] = gid; + cache[srs_thread_self()] = gid; return gid; } int SrsThreadContext::get_id() { - return cache[st_thread_self()]; + return cache[srs_thread_self()]; } int SrsThreadContext::set_id(int v) { - st_thread_t self = st_thread_self(); + srs_thread_t self = srs_thread_self(); int ov = 0; if (cache.find(self) != cache.end()) { @@ -70,8 +71,8 @@ int SrsThreadContext::set_id(int v) void SrsThreadContext::clear_cid() { - st_thread_t self = st_thread_self(); - std::map::iterator it = cache.find(self); + srs_thread_t self = srs_thread_self(); + std::map::iterator it = cache.find(self); if (it != cache.end()) { cache.erase(it); } diff --git a/trunk/src/service/srs_service_log.hpp b/trunk/src/service/srs_service_log.hpp index 97376402d..f13e94f5c 100644 --- a/trunk/src/service/srs_service_log.hpp +++ b/trunk/src/service/srs_service_log.hpp @@ -38,7 +38,7 @@ class SrsThreadContext : public ISrsThreadContext { private: - std::map cache; + std::map cache; public: SrsThreadContext(); virtual ~SrsThreadContext(); diff --git a/trunk/src/service/srs_service_rtmp_conn.cpp b/trunk/src/service/srs_service_rtmp_conn.cpp index f0fd0716c..d806742d7 100644 --- a/trunk/src/service/srs_service_rtmp_conn.cpp +++ b/trunk/src/service/srs_service_rtmp_conn.cpp @@ -23,6 +23,7 @@ #include +#include using namespace std; #include diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index d921cf54f..858fe2312 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -23,6 +23,7 @@ #include +#include #include #include using namespace std; @@ -30,6 +31,7 @@ using namespace std; #include #include #include +#include #ifdef __linux__ #include @@ -80,11 +82,11 @@ int srs_st_init() return ret; } -void srs_close_stfd(st_netfd_t& stfd) +void srs_close_stfd(srs_netfd_t& stfd) { if (stfd) { // we must ensure the close is ok. - int err = st_netfd_close(stfd); + int err = st_netfd_close((st_netfd_t)stfd); srs_assert(err != -1); stfd = NULL; } @@ -103,6 +105,150 @@ void srs_socket_reuse_addr(int fd) setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)); } +srs_thread_t srs_thread_self() +{ + return (srs_thread_t)st_thread_self(); +} + +int srs_socket_connect(string server, int port, int64_t tm, srs_netfd_t* pstfd) +{ + int ret = ERROR_SUCCESS; + + st_utime_t timeout = ST_UTIME_NO_TIMEOUT; + if (tm != SRS_CONSTS_NO_TMMS) { + timeout = (st_utime_t)(tm * 1000); + } + + *pstfd = NULL; + srs_netfd_t stfd = NULL; + sockaddr_in addr; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + srs_fd_close_exec(sock); + + srs_assert(!stfd); + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + // connect to server. + std::string ip = srs_dns_resolve(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + goto failed; + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect((st_netfd_t)stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + goto failed; + } + srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + + *pstfd = stfd; + return ret; + +failed: + if (stfd) { + srs_close_stfd(stfd); + } + return ret; +} + +srs_cond_t srs_cond_new() +{ + return (srs_cond_t)st_cond_new(); +} + +int srs_cond_destroy(srs_cond_t cond) +{ + return st_cond_destroy((st_cond_t)cond); +} + +int srs_cond_wait(srs_cond_t cond) +{ + return st_cond_wait((st_cond_t)cond); +} + +int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout) +{ + return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout); +} + +int srs_cond_signal(srs_cond_t cond) +{ + return st_cond_signal((st_cond_t)cond); +} + +srs_mutex_t srs_mutex_new() +{ + return (srs_mutex_t)st_mutex_new(); +} + +int srs_mutex_destroy(srs_mutex_t mutex) +{ + return st_mutex_destroy((st_mutex_t)mutex); +} + +int srs_mutex_lock(srs_mutex_t mutex) +{ + return st_mutex_lock((st_mutex_t)mutex); +} + +int srs_mutex_unlock(srs_mutex_t mutex) +{ + return st_mutex_unlock((st_mutex_t)mutex); +} + +int srs_netfd_fileno(srs_netfd_t stfd) +{ + return st_netfd_fileno((st_netfd_t)stfd); +} + +int srs_usleep(srs_utime_t usecs) +{ + return st_usleep((st_utime_t)usecs); +} + +srs_netfd_t srs_netfd_open_socket(int osfd) +{ + return (srs_netfd_t)st_netfd_open_socket(osfd); +} + +srs_netfd_t srs_netfd_open(int osfd) +{ + return (srs_netfd_t)st_netfd_open(osfd); +} + +int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout) +{ + return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout); +} + +srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) +{ + return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); +} + +ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout) +{ + return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout); +} + SrsStSocket::SrsStSocket() { stfd = NULL; @@ -114,7 +260,7 @@ SrsStSocket::~SrsStSocket() { } -int SrsStSocket::initialize(st_netfd_t fd) +int SrsStSocket::initialize(srs_netfd_t fd) { stfd = fd; return ERROR_SUCCESS; @@ -161,9 +307,9 @@ int SrsStSocket::read(void* buf, size_t size, ssize_t* nread) ssize_t nb_read; if (rtm == SRS_CONSTS_NO_TMMS) { - nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read(stfd, buf, size, rtm * 1000); + nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000); } if (nread) { @@ -197,9 +343,9 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) ssize_t nb_read; if (rtm == SRS_CONSTS_NO_TMMS) { - nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read_fully(stfd, buf, size, rtm * 1000); + nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000); } if (nread) { @@ -233,9 +379,9 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) ssize_t nb_write; if (stm == SRS_CONSTS_NO_TMMS) { - nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_write(stfd, buf, size, stm * 1000); + nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000); } if (nwrite) { @@ -264,9 +410,9 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) ssize_t nb_write; if (stm == SRS_CONSTS_NO_TMMS) { - nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); + nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_writev(stfd, iov, iov_size, stm * 1000); + nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000); } if (nwrite) { diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 1faf95373..5d6cc50e9 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -27,16 +27,24 @@ #include #include -#include #include +// Wrap for coroutine. +typedef void* srs_netfd_t; +typedef void* srs_thread_t; +typedef void* srs_cond_t; +typedef void* srs_mutex_t; +typedef uint64_t srs_utime_t; + +#define SRS_UTIME_NO_TIMEOUT ((srs_utime_t) -1LL) + // initialize st, requires epoll. extern int srs_st_init(); // close the netfd, and close the underlayer fd. // @remark when close, user must ensure io completed. -extern void srs_close_stfd(st_netfd_t& stfd); +extern void srs_close_stfd(srs_netfd_t& stfd); // Set the FD_CLOEXEC of FD. extern void srs_fd_close_exec(int fd); @@ -44,6 +52,38 @@ extern void srs_fd_close_exec(int fd); // Set the SO_REUSEADDR of socket. extern void srs_socket_reuse_addr(int fd); +// Get current coroutine/thread. +extern srs_thread_t srs_thread_self(); + +// client open socket and connect to server. +// @param tm The timeout in ms. +extern int srs_socket_connect(std::string server, int port, int64_t tm, srs_netfd_t* pstfd); + +// Wrap for coroutine. +extern srs_cond_t srs_cond_new(); +extern int srs_cond_destroy(srs_cond_t cond); +extern int srs_cond_wait(srs_cond_t cond); +extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout); +extern int srs_cond_signal(srs_cond_t cond); + +extern srs_mutex_t srs_mutex_new(); +extern int srs_mutex_destroy(srs_mutex_t mutex); +extern int srs_mutex_lock(srs_mutex_t mutex); +extern int srs_mutex_unlock(srs_mutex_t mutex); + +extern int srs_netfd_fileno(srs_netfd_t stfd); + +extern int srs_usleep(srs_utime_t usecs); + +extern srs_netfd_t srs_netfd_open_socket(int osfd); +extern srs_netfd_t srs_netfd_open(int osfd); + +extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); + +extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout); + +extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout); + /** * the socket provides TCP socket over st, * that is, the sync socket mechanism. @@ -59,13 +99,13 @@ private: int64_t rbytes; int64_t sbytes; // The underlayer st fd. - st_netfd_t stfd; + srs_netfd_t stfd; public: SrsStSocket(); virtual ~SrsStSocket(); public: // Initialize the socket with stfd, user must manage it. - virtual int initialize(st_netfd_t fd); + virtual int initialize(srs_netfd_t fd); public: virtual bool is_never_timeout(int64_t tm); virtual void set_recv_timeout(int64_t tm); @@ -100,7 +140,7 @@ public: class SrsTcpClient : public ISrsProtocolReaderWriter { private: - st_netfd_t stfd; + srs_netfd_t stfd; SrsStSocket* io; private: std::string host; diff --git a/trunk/src/service/srs_service_utility.cpp b/trunk/src/service/srs_service_utility.cpp index 2d36d2387..5b255c967 100644 --- a/trunk/src/service/srs_service_utility.cpp +++ b/trunk/src/service/srs_service_utility.cpp @@ -36,65 +36,6 @@ using namespace std; #include #include -int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd) -{ - int ret = ERROR_SUCCESS; - - st_utime_t timeout = ST_UTIME_NO_TIMEOUT; - if (tm != SRS_CONSTS_NO_TMMS) { - timeout = (st_utime_t)(tm * 1000); - } - - *pstfd = NULL; - st_netfd_t stfd = NULL; - sockaddr_in addr; - - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - srs_fd_close_exec(sock); - - srs_assert(!stfd); - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - // connect to server. - std::string ip = srs_dns_resolve(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - goto failed; - } - - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - goto failed; - } - srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); - - *pstfd = stfd; - return ret; - -failed: - if (stfd) { - srs_close_stfd(stfd); - } - return ret; -} - bool srs_string_is_http(string url) { return srs_string_starts_with(url, "http://", "https://"); diff --git a/trunk/src/service/srs_service_utility.hpp b/trunk/src/service/srs_service_utility.hpp index 6e85a141d..d9b613b9c 100644 --- a/trunk/src/service/srs_service_utility.hpp +++ b/trunk/src/service/srs_service_utility.hpp @@ -32,10 +32,6 @@ #include -// client open socket and connect to server. -// @param tm The timeout in ms. -extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd); - // whether the url is starts with http:// or https:// extern bool srs_string_is_http(std::string url); extern bool srs_string_is_rtmp(std::string url);