diff --git a/README.md b/README.md
index f608514e2..8f02ddb24 100755
--- a/README.md
+++ b/README.md
@@ -186,6 +186,7 @@ Please select your language:
- [ ] Support HLS+, please read [#466][bug #466] and [#468][bug #468].
### Change Logs
+
### V3 changes
diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp
index c3a30d288..e32b536ff 100644
--- a/trunk/src/app/srs_app_async_call.cpp
+++ b/trunk/src/app/srs_app_async_call.cpp
@@ -39,7 +39,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
trd = NULL;
- wait = st_cond_new();
+ wait = srs_cond_new();
}
SrsAsyncCallWorker::~SrsAsyncCallWorker()
@@ -53,7 +53,7 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker()
}
tasks.clear();
- st_cond_destroy(wait);
+ srs_cond_destroy(wait);
}
int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
@@ -61,7 +61,7 @@ int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
int ret = ERROR_SUCCESS;
tasks.push_back(t);
- st_cond_signal(wait);
+ srs_cond_signal(wait);
return ret;
}
@@ -80,7 +80,7 @@ int SrsAsyncCallWorker::start()
void SrsAsyncCallWorker::stop()
{
- st_cond_signal(wait);
+ srs_cond_signal(wait);
trd->stop();
}
@@ -90,7 +90,7 @@ int SrsAsyncCallWorker::cycle()
while (!trd->pull()) {
if (tasks.empty()) {
- st_cond_wait(wait);
+ srs_cond_wait(wait);
}
std::vector copy = tasks;
diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp
index 0d303dfc5..14fcce6f1 100644
--- a/trunk/src/app/srs_app_async_call.hpp
+++ b/trunk/src/app/srs_app_async_call.hpp
@@ -69,7 +69,7 @@ private:
SrsCoroutine* trd;
protected:
std::vector tasks;
- st_cond_t wait;
+ srs_cond_t wait;
public:
SrsAsyncCallWorker();
virtual ~SrsAsyncCallWorker();
diff --git a/trunk/src/app/srs_app_bandwidth.cpp b/trunk/src/app/srs_app_bandwidth.cpp
index 5f828e7b6..529e85e18 100644
--- a/trunk/src/app/srs_app_bandwidth.cpp
+++ b/trunk/src/app/srs_app_bandwidth.cpp
@@ -245,7 +245,7 @@ int SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit)
return ret;
}
- st_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000);
+ srs_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000);
srs_info("BW check finished.");
return ret;
@@ -291,7 +291,7 @@ int SrsBandwidth::play_checking(SrsBandwidthSample* sample, SrsKbpsLimit* limit)
srs_update_system_time_ms();
int64_t starttime = srs_get_system_time_ms();
while ((srs_get_system_time_ms() - starttime) < sample->duration_ms) {
- st_usleep(sample->interval_ms);
+ srs_usleep(sample->interval_ms);
// TODO: FIXME: use shared ptr message.
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing();
@@ -499,7 +499,7 @@ void SrsKbpsLimit::recv_limit()
while (_kbps->get_recv_kbps() > _limit_kbps) {
_kbps->sample();
- st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
+ srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
}
}
@@ -510,7 +510,7 @@ void SrsKbpsLimit::send_limit()
while (_kbps->get_send_kbps() > _limit_kbps) {
_kbps->sample();
- st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
+ srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
}
}
diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp
index 64f57835d..2dbc34f4d 100644
--- a/trunk/src/app/srs_app_caster_flv.cpp
+++ b/trunk/src/app/srs_app_caster_flv.cpp
@@ -75,11 +75,11 @@ int SrsAppCasterFlv::initialize()
return ret;
}
-int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd)
+int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
- string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
+ string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip);
conns.push_back(conn);
@@ -131,7 +131,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
return conn->proxy(w, r, o);
}
-SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
+SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
{
sdk = NULL;
diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp
index dcdcee8d4..f22e074a6 100644
--- a/trunk/src/app/srs_app_caster_flv.hpp
+++ b/trunk/src/app/srs_app_caster_flv.hpp
@@ -66,7 +66,7 @@ public:
virtual int initialize();
// ISrsTcpHandler
public:
- virtual int on_tcp_client(st_netfd_t stfd);
+ virtual int on_tcp_client(srs_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);
@@ -85,7 +85,7 @@ private:
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
public:
- SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip);
+ SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
virtual ~SrsDynamicHttpConn();
public:
virtual int on_got_http_message(ISrsHttpMessage* msg);
diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp
index 340a3aaa9..fb215454c 100644
--- a/trunk/src/app/srs_app_conn.cpp
+++ b/trunk/src/app/srs_app_conn.cpp
@@ -30,7 +30,7 @@ using namespace std;
#include
#include
-SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
+SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip)
{
manager = cm;
stfd = c;
diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp
index 00b447a68..9e9c502dd 100644
--- a/trunk/src/app/srs_app_conn.hpp
+++ b/trunk/src/app/srs_app_conn.hpp
@@ -55,7 +55,7 @@ protected:
/**
* the underlayer st fd handler.
*/
- st_netfd_t stfd;
+ srs_netfd_t stfd;
/**
* the ip of client.
*/
@@ -77,7 +77,7 @@ protected:
*/
int64_t create_time;
public:
- SrsConnection(IConnectionManager* cm, st_netfd_t c, std::string cip);
+ SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip);
virtual ~SrsConnection();
// interface IKbpsDelta
public:
diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index 74cbb18ae..efe649ef9 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -232,7 +232,7 @@ int SrsEdgeIngester::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_EDGE_INGESTER_CIMS * 1000);
+ srs_usleep(SRS_EDGE_INGESTER_CIMS * 1000);
}
}
return ret;
@@ -517,7 +517,7 @@ int SrsEdgeForwarder::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_EDGE_FORWARDER_CIMS * 1000);
+ srs_usleep(SRS_EDGE_FORWARDER_CIMS * 1000);
}
}
return ret;
@@ -538,7 +538,7 @@ int SrsEdgeForwarder::do_cycle()
while (!trd->pull()) {
if (send_error_code != ERROR_SUCCESS) {
- st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
+ srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;
}
diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp
index a2c5be72c..39ba45876 100644
--- a/trunk/src/app/srs_app_encoder.cpp
+++ b/trunk/src/app/srs_app_encoder.cpp
@@ -102,7 +102,7 @@ int SrsEncoder::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_RTMP_ENCODER_CIMS * 1000);
+ srs_usleep(SRS_RTMP_ENCODER_CIMS * 1000);
}
}
diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp
index 0fe4b0a2c..165c5e36d 100755
--- a/trunk/src/app/srs_app_forward.cpp
+++ b/trunk/src/app/srs_app_forward.cpp
@@ -231,7 +231,7 @@ int SrsForwarder::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_FORWARDER_CIMS * 1000);
+ srs_usleep(SRS_FORWARDER_CIMS * 1000);
}
}
diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp
index 7b9e80bf3..c6ffca683 100644
--- a/trunk/src/app/srs_app_hls.cpp
+++ b/trunk/src/app/srs_app_hls.cpp
@@ -29,7 +29,7 @@
#include
#include
#include
-
+#include
#include
#include
using namespace std;
diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp
index a48dafb0a..8a3f6913d 100644
--- a/trunk/src/app/srs_app_hourglass.cpp
+++ b/trunk/src/app/srs_app_hourglass.cpp
@@ -80,7 +80,7 @@ int SrsHourGlass::cycle()
}
total_elapse += resolution;
- st_usleep(resolution * 1000);
+ srs_usleep(resolution * 1000);
return ret;
}
diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp
index c4a654cb5..90bcea87c 100644
--- a/trunk/src/app/srs_app_http_api.cpp
+++ b/trunk/src/app/srs_app_http_api.cpp
@@ -1290,7 +1290,7 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
return srs_api_response_code(w, r, 100);
}
-SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
+SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsConnection(cm, fd, cip)
{
mux = m;
diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp
index ac9273048..d577d1b93 100644
--- a/trunk/src/app/srs_app_http_api.hpp
+++ b/trunk/src/app/srs_app_http_api.hpp
@@ -208,7 +208,7 @@ private:
SrsHttpCorsMux* cors;
SrsHttpServeMux* mux;
public:
- SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip);
+ SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpApi();
// interface IKbpsDelta
public:
diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp
index 89b065206..001395c78 100644
--- a/trunk/src/app/srs_app_http_conn.cpp
+++ b/trunk/src/app/srs_app_http_conn.cpp
@@ -59,7 +59,7 @@ using namespace std;
#include
#include
-SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
+SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip)
: SrsConnection(cm, fd, cip)
{
parser = new SrsHttpParser();
@@ -204,7 +204,7 @@ int SrsHttpConn::on_reload_http_stream_crossdomain()
return ret;
}
-SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
+SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
{
}
diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp
index bd4d4d4aa..6802095c1 100644
--- a/trunk/src/app/srs_app_http_conn.hpp
+++ b/trunk/src/app/srs_app_http_conn.hpp
@@ -65,7 +65,7 @@ protected:
ISrsHttpServeMux* http_mux;
SrsHttpCorsMux* cors;
public:
- SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
+ SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpConn();
// interface IKbpsDelta
public:
@@ -99,7 +99,7 @@ public:
class SrsResponseOnlyHttpConn : public SrsHttpConn
{
public:
- SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
+ SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
virtual ~SrsResponseOnlyHttpConn();
public:
// Directly read a HTTP request message.
diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp
index 7696f2d66..f85ea1e86 100755
--- a/trunk/src/app/srs_app_http_stream.cpp
+++ b/trunk/src/app/srs_app_http_stream.cpp
@@ -116,7 +116,7 @@ int SrsBufferCache::cycle()
// TODO: FIXME: support reload.
if (fast_cache <= 0) {
- st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS);
+ srs_usleep(SRS_STREAM_CACHE_CYCLE_SECONDS * 1000 * 1000);
return ret;
}
@@ -152,7 +152,7 @@ int SrsBufferCache::cycle()
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
- st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
+ srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got.
continue;
@@ -572,7 +572,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
- st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
+ srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got.
continue;
diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp
index bbc51b204..0565445f3 100644
--- a/trunk/src/app/srs_app_ingest.cpp
+++ b/trunk/src/app/srs_app_ingest.cpp
@@ -183,7 +183,7 @@ int SrsIngester::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_AUTO_INGESTER_CIMS * 1000);
+ srs_usleep(SRS_AUTO_INGESTER_CIMS * 1000);
}
}
diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp
index 3c33ae6cc..6f435cc2b 100644
--- a/trunk/src/app/srs_app_kafka.cpp
+++ b/trunk/src/app/srs_app_kafka.cpp
@@ -362,9 +362,9 @@ void srs_dispose_kafka()
SrsKafkaProducer::SrsKafkaProducer()
{
metadata_ok = false;
- metadata_expired = st_cond_new();
+ metadata_expired = srs_cond_new();
- lock = st_mutex_new();
+ lock = srs_mutex_new();
trd = NULL;
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();
@@ -382,8 +382,8 @@ SrsKafkaProducer::~SrsKafkaProducer()
srs_freep(trd);
srs_freep(cache);
- st_mutex_destroy(lock);
- st_cond_destroy(metadata_expired);
+ srs_mutex_destroy(lock);
+ srs_cond_destroy(metadata_expired);
}
int SrsKafkaProducer::initialize()
@@ -448,14 +448,14 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
}
// sync with backgound metadata worker.
- st_mutex_lock(lock);
+ srs_mutex_lock(lock);
// flush message when metadata is ok.
if (metadata_ok) {
ret = flush();
}
- st_mutex_unlock(lock);
+ srs_mutex_unlock(lock);
return ret;
}
@@ -503,7 +503,7 @@ int SrsKafkaProducer::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_KAKFA_CIMS * 1000);
+ srs_usleep(SRS_KAKFA_CIMS * 1000);
}
}
@@ -515,18 +515,18 @@ int SrsKafkaProducer::on_before_cycle()
// wait for the metadata expired.
// when metadata is ok, wait for it expired.
if (metadata_ok) {
- st_cond_wait(metadata_expired);
+ srs_cond_wait(metadata_expired);
}
// request to lock to acquire the socket.
- st_mutex_lock(lock);
+ srs_mutex_lock(lock);
return ERROR_SUCCESS;
}
int SrsKafkaProducer::on_end_cycle()
{
- st_mutex_unlock(lock);
+ srs_mutex_unlock(lock);
return ERROR_SUCCESS;
}
@@ -644,7 +644,7 @@ void SrsKafkaProducer::refresh_metadata()
clear_metadata();
metadata_ok = false;
- st_cond_signal(metadata_expired);
+ srs_cond_signal(metadata_expired);
srs_trace("kafka async refresh metadata in background");
}
diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp
index ef2dcec1a..72b126c1e 100644
--- a/trunk/src/app/srs_app_kafka.hpp
+++ b/trunk/src/app/srs_app_kafka.hpp
@@ -163,11 +163,11 @@ class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISr
private:
// TODO: FIXME: support reload.
bool enabled;
- st_mutex_t lock;
+ srs_mutex_t lock;
SrsCoroutine* trd;
private:
bool metadata_ok;
- st_cond_t metadata_expired;
+ srs_cond_t metadata_expired;
public:
std::vector partitions;
SrsKafkaCache* cache;
diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp
index 5b047f2a4..d2ef6512a 100755
--- a/trunk/src/app/srs_app_listener.cpp
+++ b/trunk/src/app/srs_app_listener.cpp
@@ -30,6 +30,7 @@
#include
#include
#include
+#include
using namespace std;
#include
@@ -54,7 +55,7 @@ ISrsUdpHandler::~ISrsUdpHandler()
{
}
-int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/)
+int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
}
@@ -101,7 +102,7 @@ int SrsUdpListener::fd()
return _fd;
}
-st_netfd_t SrsUdpListener::stfd()
+srs_netfd_t SrsUdpListener::stfd()
{
return _stfd;
}
@@ -131,7 +132,7 @@ int SrsUdpListener::listen()
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
+ if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@@ -159,7 +160,7 @@ int SrsUdpListener::cycle()
int nb_from = sizeof(sockaddr_in);
int nread = 0;
- if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
+ if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread);
return ret;
}
@@ -170,7 +171,7 @@ int SrsUdpListener::cycle()
}
if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
- st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
+ srs_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
}
}
@@ -233,7 +234,7 @@ int SrsTcpListener::listen()
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
+ if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@@ -256,8 +257,8 @@ int SrsTcpListener::cycle()
int ret = ERROR_SUCCESS;
while (!trd->pull()) {
- st_netfd_t stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
- int fd = st_netfd_fileno(stfd);
+ srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
+ int fd = srs_netfd_fileno(stfd);
srs_fd_close_exec(fd);
diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp
index cad00c87e..39af63c40 100644
--- a/trunk/src/app/srs_app_listener.hpp
+++ b/trunk/src/app/srs_app_listener.hpp
@@ -46,7 +46,7 @@ public:
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
- virtual int on_stfd_change(st_netfd_t fd);
+ virtual int on_stfd_change(srs_netfd_t fd);
public:
/**
* when udp listener got a udp packet, notice server to process it.
@@ -72,7 +72,7 @@ public:
/**
* when got tcp client.
*/
- virtual int on_tcp_client(st_netfd_t stfd) = 0;
+ virtual int on_tcp_client(srs_netfd_t stfd) = 0;
};
/**
@@ -82,7 +82,7 @@ class SrsUdpListener : public ISrsCoroutineHandler
{
private:
int _fd;
- st_netfd_t _stfd;
+ srs_netfd_t _stfd;
SrsCoroutine* trd;
private:
char* buf;
@@ -96,7 +96,7 @@ public:
virtual ~SrsUdpListener();
public:
virtual int fd();
- virtual st_netfd_t stfd();
+ virtual srs_netfd_t stfd();
public:
virtual int listen();
// interface ISrsReusableThreadHandler.
@@ -111,7 +111,7 @@ class SrsTcpListener : public ISrsCoroutineHandler
{
private:
int _fd;
- st_netfd_t _stfd;
+ srs_netfd_t _stfd;
SrsCoroutine* trd;
private:
ISrsTcpHandler* handler;
diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp
index 7180011b0..78149a186 100644
--- a/trunk/src/app/srs_app_log.cpp
+++ b/trunk/src/app/srs_app_log.cpp
@@ -29,6 +29,7 @@
#include
#include
#include
+#include
#include
#include
diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp
index 8674bc4b5..e132b2f2f 100644
--- a/trunk/src/app/srs_app_ng_exec.cpp
+++ b/trunk/src/app/srs_app_ng_exec.cpp
@@ -88,7 +88,7 @@ int SrsNgExec::cycle()
}
if (!trd->pull()) {
- st_usleep(SRS_RTMP_EXEC_CIMS * 1000);
+ srs_usleep(SRS_RTMP_EXEC_CIMS * 1000);
}
}
diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp
index 62e87cf0f..b24ffd1f9 100644
--- a/trunk/src/app/srs_app_recv_thread.cpp
+++ b/trunk/src/app/srs_app_recv_thread.cpp
@@ -34,6 +34,7 @@
#include
#include
+#include
using namespace std;
// the max small bytes to group
@@ -120,7 +121,7 @@ int SrsRecvThread::do_cycle()
while (!trd->pull()) {
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
- st_usleep(timeout * 1000);
+ srs_usleep(timeout * 1000);
continue;
}
@@ -265,7 +266,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest*
recv_error_code = ERROR_SUCCESS;
_nb_msgs = 0;
video_frames = 0;
- error = st_cond_new();
+ error = srs_cond_new();
ncid = cid = 0;
req = _req;
@@ -286,7 +287,7 @@ SrsPublishRecvThread::~SrsPublishRecvThread()
_srs_config->unsubscribe(this);
trd.stop();
- st_cond_destroy(error);
+ srs_cond_destroy(error);
}
int SrsPublishRecvThread::wait(uint64_t timeout_ms)
@@ -296,7 +297,7 @@ int SrsPublishRecvThread::wait(uint64_t timeout_ms)
}
// ignore any return of cond wait.
- st_cond_timedwait(error, timeout_ms * 1000);
+ srs_cond_timedwait(error, timeout_ms * 1000);
return ERROR_SUCCESS;
}
@@ -380,7 +381,7 @@ void SrsPublishRecvThread::interrupt(int ret)
// when recv thread error, signal the conn thread to process it.
// @see https://github.com/ossrs/srs/issues/244
- st_cond_signal(error);
+ srs_cond_signal(error);
}
void SrsPublishRecvThread::on_start()
@@ -407,7 +408,7 @@ void SrsPublishRecvThread::on_stop()
// when thread stop, signal the conn thread which wait.
// @see https://github.com/ossrs/srs/issues/244
- st_cond_signal(error);
+ srs_cond_signal(error);
#ifdef SRS_PERF_MERGED_READ
if (mr) {
@@ -436,7 +437,7 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
* @see https://github.com/ossrs/srs/issues/241
*/
if (nread < SRS_MR_SMALL_BYTES) {
- st_usleep(mr_sleep * 1000);
+ srs_usleep(mr_sleep * 1000);
}
}
#endif
diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp
index c84173039..3061a3018 100644
--- a/trunk/src/app/srs_app_recv_thread.hpp
+++ b/trunk/src/app/srs_app_recv_thread.hpp
@@ -183,7 +183,7 @@ private:
SrsSource* _source;
// the error timeout cond
// @see https://github.com/ossrs/srs/issues/244
- st_cond_t error;
+ srs_cond_t error;
// merged context id.
int cid;
int ncid;
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index 8c827352f..77cdb61c2 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -28,7 +28,7 @@
#include
#include
#include
-
+#include
using namespace std;
#include
@@ -110,7 +110,7 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
-SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
+SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip)
: SrsConnection(svr, c, cip)
{
server = svr;
@@ -161,7 +161,7 @@ int SrsRtmpConn::do_cycle()
{
int ret = ERROR_SUCCESS;
- srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), st_netfd_fileno(stfd));
+ srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
// notify kafka cluster.
#ifdef SRS_AUTO_KAFKA
@@ -407,7 +407,7 @@ int SrsRtmpConn::service_cycle()
srs_verbose("set peer bandwidth success");
// get the ip which client connected.
- std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
+ std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
// do bandwidth test if connect to the vhost which is for bandwidth check.
if (_srs_config->get_bw_check_enabled(req->vhost)) {
@@ -818,7 +818,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_info("mw sleep %dms for no msg", mw_sleep);
- st_usleep(mw_sleep * 1000);
+ srs_usleep(mw_sleep * 1000);
#else
srs_verbose("mw wait %dms and got nothing.", mw_sleep);
#endif
@@ -864,7 +864,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// apply the minimal interval for delivery stream in ms.
if (send_min_interval > 0) {
- st_usleep((int64_t)(send_min_interval * 1000));
+ srs_usleep((int64_t)(send_min_interval * 1000));
}
}
@@ -893,7 +893,7 @@ int SrsRtmpConn::publishing(SrsSource* source)
if ((ret = acquire_publish(source)) == ERROR_SUCCESS) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
- SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);
+ SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source);
srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &rtrd);
@@ -1243,7 +1243,7 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
}
// get the sock buffer size.
- int fd = st_netfd_fileno(stfd);
+ int fd = srs_netfd_fileno(stfd);
int onb_sbuf = 0;
socklen_t sock_buf_size = sizeof(int);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);
@@ -1295,7 +1295,7 @@ void SrsRtmpConn::set_sock_options()
if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue;
#ifdef SRS_PERF_TCP_NODELAY
- int fd = st_netfd_fileno(stfd);
+ int fd = srs_netfd_fileno(stfd);
socklen_t nb_v = sizeof(int);
diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp
index 312d68718..9c37bcd20 100644
--- a/trunk/src/app/srs_app_rtmp_conn.hpp
+++ b/trunk/src/app/srs_app_rtmp_conn.hpp
@@ -127,7 +127,7 @@ private:
// About the rtmp client.
SrsClientInfo* info;
public:
- SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip);
+ SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip);
virtual ~SrsRtmpConn();
public:
virtual void dispose();
diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp
index c2fdfedeb..d0f83b2cb 100644
--- a/trunk/src/app/srs_app_rtsp.cpp
+++ b/trunk/src/app/srs_app_rtsp.cpp
@@ -183,7 +183,7 @@ int SrsRtspJitter::correct(int64_t& ts)
return ret;
}
-SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
+SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o)
{
output_template = o;
@@ -245,7 +245,7 @@ int SrsRtspConn::do_cycle()
int ret = ERROR_SUCCESS;
// retrieve ip of client.
- std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
+ std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
srs_trace("rtsp: serve %s", ip.c_str());
// consume all rtsp messages.
@@ -746,7 +746,7 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax)
srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax);
}
-int SrsRtspCaster::on_tcp_client(st_netfd_t stfd)
+int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp
index e40aeee84..7eef307f8 100644
--- a/trunk/src/app/srs_app_rtsp.hpp
+++ b/trunk/src/app/srs_app_rtsp.hpp
@@ -129,7 +129,7 @@ private:
int audio_channel;
SrsRtpConn* audio_rtp;
private:
- st_netfd_t stfd;
+ srs_netfd_t stfd;
SrsStSocket* skt;
SrsRtspStack* rtsp;
SrsRtspCaster* caster;
@@ -149,7 +149,7 @@ private:
std::string aac_specific_config;
SrsRtspAudioCache* acache;
public:
- SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o);
+ SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o);
virtual ~SrsRtspConn();
public:
virtual int serve();
@@ -206,7 +206,7 @@ public:
virtual void free_port(int lpmin, int lpmax);
// interface ISrsTcpHandler
public:
- virtual int on_tcp_client(st_netfd_t stfd);
+ virtual int on_tcp_client(srs_netfd_t stfd);
// internal methods.
public:
virtual void remove(SrsRtspConn* conn);
diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp
index 192192245..f54e1e310 100644
--- a/trunk/src/app/srs_app_server.cpp
+++ b/trunk/src/app/srs_app_server.cpp
@@ -28,7 +28,7 @@
#include
#include
#include
-
+#include
#include
using namespace std;
@@ -163,7 +163,7 @@ int SrsBufferListener::listen(string i, int p)
return ret;
}
-int SrsBufferListener::on_tcp_client(st_netfd_t stfd)
+int SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@@ -219,7 +219,7 @@ int SrsRtspListener::listen(string i, int p)
return ret;
}
-int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
+int SrsRtspListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@@ -279,7 +279,7 @@ int SrsHttpFlvListener::listen(string i, int p)
return ret;
}
-int SrsHttpFlvListener::on_tcp_client(st_netfd_t stfd)
+int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@@ -391,7 +391,7 @@ int SrsSignalManager::initialize()
return ret;
}
- if ((signal_read_stfd = st_netfd_open(sig_pipe[0])) == NULL) {
+ if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
ret = ERROR_SYSTEM_CREATE_PIPE;
srs_error("create signal manage st pipe failed. ret=%d", ret);
return ret;
@@ -444,7 +444,7 @@ int SrsSignalManager::cycle()
int signo;
/* Read the next signal from the pipe */
- st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);
+ srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT);
/* Process signal synchronously */
server->on_signal(signo);
@@ -863,7 +863,7 @@ int SrsServer::cycle()
// remark, for gmc, never invoke the exit().
srs_warn("sleep a long time for system st-threads to cleanup.");
- st_usleep(3 * 1000 * 1000);
+ srs_usleep(3 * 1000 * 1000);
srs_warn("system quit");
#else
// normally quit with neccessary cleanup by dispose().
@@ -966,7 +966,7 @@ int SrsServer::do_cycle()
int dynamic_max = srs_max(max, heartbeat_max_resolution);
for (int i = 0; i < dynamic_max; i++) {
- st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
+ srs_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
// asprocess check.
if (asprocess && ::getppid() != ppid) {
@@ -1235,7 +1235,7 @@ void SrsServer::resample_kbps()
srs_update_rtmp_server((int)conns.size(), kbps);
}
-int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd)
+int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@@ -1260,11 +1260,11 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd)
return ret;
}
-SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd)
+SrsConnection* SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
- int fd = st_netfd_fileno(stfd);
+ int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
// for some keep alive application, for example, the keepalived,
diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp
index 93c01e61b..dd8b13a8d 100644
--- a/trunk/src/app/srs_app_server.hpp
+++ b/trunk/src/app/srs_app_server.hpp
@@ -35,6 +35,7 @@
#include
#include
#include
+#include
class SrsServer;
class SrsConnection;
@@ -107,7 +108,7 @@ public:
virtual int listen(std::string ip, int port);
// ISrsTcpHandler
public:
- virtual int on_tcp_client(st_netfd_t stfd);
+ virtual int on_tcp_client(srs_netfd_t stfd);
};
#ifdef SRS_AUTO_STREAM_CASTER
@@ -126,7 +127,7 @@ public:
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
- virtual int on_tcp_client(st_netfd_t stfd);
+ virtual int on_tcp_client(srs_netfd_t stfd);
};
/**
@@ -144,7 +145,7 @@ public:
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
- virtual int on_tcp_client(st_netfd_t stfd);
+ virtual int on_tcp_client(srs_netfd_t stfd);
};
#endif
@@ -185,7 +186,7 @@ private:
/* Per-process pipe which is used as a signal queue. */
/* Up to PIPE_BUF/sizeof(int) signals can be queued up. */
int sig_pipe[2];
- st_netfd_t signal_read_stfd;
+ srs_netfd_t signal_read_stfd;
private:
SrsServer* server;
SrsCoroutine* trd;
@@ -357,9 +358,9 @@ public:
* for instance RTMP connection to serve client.
* @param stfd, the client fd in st boxed, the underlayer fd.
*/
- virtual int accept_client(SrsListenerType type, st_netfd_t stfd);
+ virtual int accept_client(SrsListenerType type, srs_netfd_t stfd);
private:
- virtual SrsConnection* fd2conn(SrsListenerType type, st_netfd_t stfd);
+ virtual SrsConnection* fd2conn(SrsListenerType type, srs_netfd_t stfd);
// IConnectionManager
public:
/**
diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp
index d4c979ee8..a695c0af8 100755
--- a/trunk/src/app/srs_app_source.cpp
+++ b/trunk/src/app/srs_app_source.cpp
@@ -434,7 +434,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
should_update_source_id = false;
#ifdef SRS_PERF_QUEUE_COND_WAIT
- mw_wait = st_cond_new();
+ mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
@@ -448,7 +448,7 @@ SrsConsumer::~SrsConsumer()
srs_freep(queue);
#ifdef SRS_PERF_QUEUE_COND_WAIT
- st_cond_destroy(mw_wait);
+ srs_cond_destroy(mw_wait);
#endif
}
@@ -497,14 +497,14 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitte
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration_ms < 0) {
- st_cond_signal(mw_wait);
+ srs_cond_signal(mw_wait);
mw_waiting = false;
return ret;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration_ms > mw_duration) {
- st_cond_signal(mw_wait);
+ srs_cond_signal(mw_wait);
mw_waiting = false;
return ret;
}
@@ -550,7 +550,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
void SrsConsumer::wait(int nb_msgs, int duration)
{
if (paused) {
- st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
+ srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
return;
}
@@ -569,7 +569,7 @@ void SrsConsumer::wait(int nb_msgs, int duration)
mw_waiting = true;
// use cond block wait for high performance mode.
- st_cond_wait(mw_wait);
+ srs_cond_wait(mw_wait);
}
#endif
@@ -587,7 +587,7 @@ void SrsConsumer::wakeup()
{
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (mw_waiting) {
- st_cond_signal(mw_wait);
+ srs_cond_signal(mw_wait);
mw_waiting = false;
}
#endif
diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp
index e97494b5e..495023c9f 100644
--- a/trunk/src/app/srs_app_source.hpp
+++ b/trunk/src/app/srs_app_source.hpp
@@ -228,7 +228,7 @@ private:
#ifdef SRS_PERF_QUEUE_COND_WAIT
// the cond wait for mw.
// @see https://github.com/ossrs/srs/issues/251
- st_cond_t mw_wait;
+ srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
int mw_duration;
diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp
index 3713d8251..c3e34e3cc 100755
--- a/trunk/src/app/srs_app_st.cpp
+++ b/trunk/src/app/srs_app_st.cpp
@@ -23,6 +23,7 @@
#include
+#include
#include
using namespace std;
@@ -65,7 +66,7 @@ int SrsCoroutine::start()
return ret;
}
- if((trd = st_thread_create(pfn, this, 1, 0)) == NULL){
+ if((trd = (srs_thread_t)st_thread_create(pfn, this, 1, 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("Thread.start: Create thread failed. ret=%d", ret);
return ret;
@@ -86,7 +87,7 @@ void SrsCoroutine::stop()
interrupt();
void* res = NULL;
- int ret = st_thread_join(trd, &res);
+ int ret = st_thread_join((st_thread_t)trd, &res);
srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err);
srs_assert(!ret);
@@ -109,7 +110,7 @@ void SrsCoroutine::interrupt()
srs_info("Thread.interrupt: Interrupt thread, err=%d", err);
err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err);
- st_thread_interrupt(trd);
+ st_thread_interrupt((st_thread_t)trd);
}
int SrsCoroutine::pull()
@@ -145,180 +146,3 @@ void* SrsCoroutine::pfn(void* arg)
return res;
}
-namespace internal
-{
- ISrsThreadHandler::ISrsThreadHandler()
- {
- }
-
- ISrsThreadHandler::~ISrsThreadHandler()
- {
- }
-
- void ISrsThreadHandler::on_thread_start()
- {
- }
-
- int ISrsThreadHandler::on_before_cycle()
- {
- int ret = ERROR_SUCCESS;
- return ret;
- }
-
- int ISrsThreadHandler::on_end_cycle()
- {
- int ret = ERROR_SUCCESS;
- return ret;
- }
-
- void ISrsThreadHandler::on_thread_stop()
- {
- }
-
- SrsThread::SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j)
- {
- name = n;
- handler = h;
- cims = ims;
-
- trd = NULL;
- loop = false;
- context_id = -1;
- joinable = j;
- }
-
- SrsThread::~SrsThread()
- {
- stop();
- }
-
- int SrsThread::cid()
- {
- return context_id;
- }
-
- int SrsThread::start()
- {
- int ret = ERROR_SUCCESS;
-
- if(trd) {
- srs_info("thread %s already running.", name);
- return ret;
- }
-
- loop = true;
-
- if((trd = st_thread_create(pfn, this, (joinable? 1:0), 0)) == NULL){
- ret = ERROR_ST_CREATE_CYCLE_THREAD;
- srs_error("st_thread_create failed. ret=%d", ret);
- return ret;
- }
-
- return ret;
- }
-
- void SrsThread::stop()
- {
- if (!trd) {
- return;
- }
-
- // notify the cycle to stop loop.
- loop = false;
-
- // the interrupt will cause the socket to read/write error,
- // which will terminate the cycle thread.
- st_thread_interrupt(trd);
-
- // when joinable, wait util quit.
- if (joinable) {
- // wait the thread to exit.
- int ret = st_thread_join(trd, NULL);
- srs_assert(ret == ERROR_SUCCESS);
- }
-
- trd = NULL;
- }
-
- bool SrsThread::can_loop()
- {
- return loop;
- }
-
- void SrsThread::stop_loop()
- {
- loop = false;
- }
-
- void SrsThread::cycle()
- {
- int ret = ERROR_SUCCESS;
-
- // TODO: FIXME: it's better for user to specifies the cid,
- // because sometimes we need to merge cid, for example,
- // the publish thread should use the same cid of connection.
- _srs_context->generate_id();
- srs_info("thread %s cycle start", name);
- context_id = _srs_context->get_id();
-
- srs_assert(handler);
- handler->on_thread_start();
-
- while (loop) {
- if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
- srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", name, ret);
- goto failed;
- }
- srs_info("thread %s on before cycle success", name);
-
- if ((ret = handler->cycle()) != ERROR_SUCCESS) {
- if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
- srs_warn("thread %s cycle failed, ignored and retry, ret=%d", name, ret);
- }
- goto failed;
- }
- srs_info("thread %s cycle success", name);
-
- if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
- srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", name, ret);
- goto failed;
- }
- srs_info("thread %s on end cycle success", name);
-
- failed:
- if (!loop) {
- break;
- }
-
- // Should never use no timeout, just ignore it.
- // to improve performance, donot sleep when interval is zero.
- // @see: https://github.com/ossrs/srs/issues/237
- if (cims != 0 && cims != SRS_CONSTS_NO_TMMS) {
- st_usleep(cims * 1000);
- }
- }
-
- srs_info("thread %s cycle finished", name);
- // @remark in this callback, user may delete this, so never use this->xxx anymore.
- handler->on_thread_stop();
- }
-
- void* SrsThread::pfn(void* arg)
- {
- SrsThread* obj = (SrsThread*)arg;
- srs_assert(obj);
-
- obj->cycle();
-
- // delete cid for valgrind to detect memory leak.
- SrsThreadContext* ctx = dynamic_cast(_srs_context);
- if (ctx) {
- ctx->clear_cid();
- }
-
- st_thread_exit(NULL);
-
- return NULL;
- }
-}
-
diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp
index 9bf67502e..2d151f06e 100644
--- a/trunk/src/app/srs_app_st.hpp
+++ b/trunk/src/app/srs_app_st.hpp
@@ -87,7 +87,7 @@ private:
std::string name;
ISrsCoroutineHandler* handler;
private:
- st_thread_t trd;
+ srs_thread_t trd;
int context;
int err;
private:
@@ -135,133 +135,5 @@ private:
static void* pfn(void* arg);
};
-// the internal classes, user should never use it.
-// user should use the public classes at the bellow:
-// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread
-namespace internal
-{
- /**
- * the handler for the thread, callback interface.
- * the thread model defines as:
- * handler->on_thread_start()
- * while loop:
- * handler->on_before_cycle()
- * handler->cycle()
- * handler->on_end_cycle()
- * if !loop then break for user stop thread.
- * sleep(CycleIntervalMilliseconds)
- * handler->on_thread_stop()
- * when stop, the thread will interrupt the st_thread,
- * which will cause the socket to return error and
- * terminate the cycle thread.
- *
- * @remark why should check can_loop() in cycle method?
- * when thread interrupt, the socket maybe not got EINT,
- * espectially on st_usleep(), so the cycle must check the loop,
- * when handler->cycle() has loop itself, for example:
- * while (true):
- * if (read_from_socket(skt) < 0) break;
- * if thread stop when read_from_socket, it's ok, the loop will break,
- * but when thread stop interrupt the s_usleep(0), then the loop is
- * death loop.
- * in a word, the handler->cycle() must:
- * while (pthread->can_loop()):
- * if (read_from_socket(skt) < 0) break;
- * check the loop, then it works.
- *
- * @remark why should use stop_loop() to terminate thread in itself?
- * in the thread itself, that is the cycle method,
- * if itself want to terminate the thread, should never use stop(),
- * but use stop_loop() to set the loop to false and terminate normally.
- *
- * @remark when should set the interval_us, and when not?
- * the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
- * so the interval_us used to sleep for each cycle.
- */
- class ISrsThreadHandler
- {
- public:
- ISrsThreadHandler();
- virtual ~ISrsThreadHandler();
- public:
- virtual void on_thread_start();
- virtual int on_before_cycle();
- virtual int cycle() = 0;
- virtual int on_end_cycle();
- virtual void on_thread_stop();
- };
-
- /**
- * provides servies from st_thread_t,
- * for common thread usage.
- */
- class SrsThread
- {
- private:
- st_thread_t trd;
- int context_id;
- bool loop;
- bool joinable;
- const char* name;
- private:
- ISrsThreadHandler* handler;
- // The cycle interval in ms.
- int64_t cims;
- public:
- /**
- * initialize the thread.
- * @param n, human readable name for st debug.
- * @param h, the cycle handler for the thread.
- * @param ims, the sleep interval in ms when cycle finished.
- * @param j, if joinable, other thread must stop the thread.
- * @remark if joinable, thread never quit itself, or memory leak.
- * @see: https://github.com/ossrs/srs/issues/78
- * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag
- */
- /**
- * TODO: FIXME: maybe all thread must be reap by others threads,
- * @see: https://github.com/ossrs/srs/issues/77
- */
- SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j);
- virtual ~SrsThread();
- public:
- /**
- * get the context id. @see: ISrsThreadContext.get_id().
- * used for parent thread to get the id.
- * @remark when start thread, parent thread will block and wait for this id ready.
- */
- virtual int cid();
- /**
- * start the thread, invoke the cycle of handler util
- * user stop the thread.
- * @remark ignore any error of cycle of handler.
- * @remark user can start multiple times, ignore if already started.
- * @remark wait for the cid is set by thread pfn.
- */
- virtual int start();
- /**
- * stop the thread, wait for the thread to terminate.
- * @remark user can stop multiple times, ignore if already stopped.
- */
- virtual void stop();
- public:
- /**
- * whether the thread should loop,
- * used for handler->cycle() which has a loop method,
- * to check this method, break if false.
- */
- virtual bool can_loop();
- /**
- * for the loop thread to stop the loop.
- * other thread can directly use stop() to stop loop and wait for quit.
- * this stop loop method only set loop to false.
- */
- virtual void stop_loop();
- private:
- virtual void cycle();
- static void* pfn(void* arg);
- };
-}
-
#endif
diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp
index 91bf3a778..4839c3586 100755
--- a/trunk/src/app/srs_app_thread.cpp
+++ b/trunk/src/app/srs_app_thread.cpp
@@ -31,14 +31,14 @@ using namespace std;
SrsCoroutineManager::SrsCoroutineManager()
{
- cond = st_cond_new();
+ cond = srs_cond_new();
trd = new SrsCoroutine("manager", this);
}
SrsCoroutineManager::~SrsCoroutineManager()
{
srs_freep(trd);
- st_cond_destroy(cond);
+ srs_cond_destroy(cond);
clear();
}
@@ -51,7 +51,7 @@ int SrsCoroutineManager::start()
int SrsCoroutineManager::cycle()
{
while (!trd->pull()) {
- st_cond_wait(cond);
+ srs_cond_wait(cond);
clear();
}
@@ -61,7 +61,7 @@ int SrsCoroutineManager::cycle()
void SrsCoroutineManager::remove(ISrsConnection* c)
{
conns.push_back(c);
- st_cond_signal(cond);
+ srs_cond_signal(cond);
}
void SrsCoroutineManager::clear()
diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp
index d1c009118..5458b7e0e 100644
--- a/trunk/src/app/srs_app_thread.hpp
+++ b/trunk/src/app/srs_app_thread.hpp
@@ -42,7 +42,7 @@ class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public
private:
SrsCoroutine* trd;
std::vector conns;
- st_cond_t cond;
+ srs_cond_t cond;
public:
SrsCoroutineManager();
virtual ~SrsCoroutineManager();
diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp
index 5e71f2cc2..b4da46fda 100644
--- a/trunk/src/app/srs_app_utility.cpp
+++ b/trunk/src/app/srs_app_utility.cpp
@@ -179,7 +179,7 @@ int srs_kill_forced(int& pid)
// 0 is not quit yet.
if (qpid == 0) {
- st_usleep(10 * 1000);
+ srs_usleep(10 * 1000);
continue;
}
@@ -204,7 +204,7 @@ int srs_kill_forced(int& pid)
// @remark when we use SIGKILL to kill process, it must be killed,
// so we always wait it to quit by infinite loop.
while (waitpid(pid, &status, 0) < 0) {
- st_usleep(10 * 1000);
+ srs_usleep(10 * 1000);
continue;
}
diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp
index 11759fe0f..db5dc8419 100644
--- a/trunk/src/main/srs_main_ingest_hls.cpp
+++ b/trunk/src/main/srs_main_ingest_hls.cpp
@@ -216,7 +216,7 @@ int SrsIngestHlsInput::connect()
int64_t now = srs_update_system_time_ms();
if (now < next_connect_time) {
srs_trace("input hls wait for %dms", next_connect_time - now);
- st_usleep((next_connect_time - now) * 1000);
+ srs_usleep((next_connect_time - now) * 1000);
}
// set all ts to dirty.
diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp
index d054d50e1..dbcd3eac4 100644
--- a/trunk/src/main/srs_main_server.cpp
+++ b/trunk/src/main/srs_main_server.cpp
@@ -37,6 +37,7 @@ using namespace std;
#include
#endif
+#include
using namespace std;
#include
diff --git a/trunk/src/service/srs_service_log.cpp b/trunk/src/service/srs_service_log.cpp
index c71958342..ec7bb5d1a 100644
--- a/trunk/src/service/srs_service_log.cpp
+++ b/trunk/src/service/srs_service_log.cpp
@@ -25,6 +25,7 @@
#include
#include
+#include
using namespace std;
#include
@@ -45,18 +46,18 @@ int SrsThreadContext::generate_id()
static int id = 100;
int gid = id++;
- cache[st_thread_self()] = gid;
+ cache[srs_thread_self()] = gid;
return gid;
}
int SrsThreadContext::get_id()
{
- return cache[st_thread_self()];
+ return cache[srs_thread_self()];
}
int SrsThreadContext::set_id(int v)
{
- st_thread_t self = st_thread_self();
+ srs_thread_t self = srs_thread_self();
int ov = 0;
if (cache.find(self) != cache.end()) {
@@ -70,8 +71,8 @@ int SrsThreadContext::set_id(int v)
void SrsThreadContext::clear_cid()
{
- st_thread_t self = st_thread_self();
- std::map::iterator it = cache.find(self);
+ srs_thread_t self = srs_thread_self();
+ std::map::iterator it = cache.find(self);
if (it != cache.end()) {
cache.erase(it);
}
diff --git a/trunk/src/service/srs_service_log.hpp b/trunk/src/service/srs_service_log.hpp
index 97376402d..f13e94f5c 100644
--- a/trunk/src/service/srs_service_log.hpp
+++ b/trunk/src/service/srs_service_log.hpp
@@ -38,7 +38,7 @@
class SrsThreadContext : public ISrsThreadContext
{
private:
- std::map cache;
+ std::map cache;
public:
SrsThreadContext();
virtual ~SrsThreadContext();
diff --git a/trunk/src/service/srs_service_rtmp_conn.cpp b/trunk/src/service/srs_service_rtmp_conn.cpp
index f0fd0716c..d806742d7 100644
--- a/trunk/src/service/srs_service_rtmp_conn.cpp
+++ b/trunk/src/service/srs_service_rtmp_conn.cpp
@@ -23,6 +23,7 @@
#include
+#include
using namespace std;
#include
diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp
index d921cf54f..858fe2312 100644
--- a/trunk/src/service/srs_service_st.cpp
+++ b/trunk/src/service/srs_service_st.cpp
@@ -23,6 +23,7 @@
#include
+#include
#include
#include
using namespace std;
@@ -30,6 +31,7 @@ using namespace std;
#include
#include
#include
+#include
#ifdef __linux__
#include
@@ -80,11 +82,11 @@ int srs_st_init()
return ret;
}
-void srs_close_stfd(st_netfd_t& stfd)
+void srs_close_stfd(srs_netfd_t& stfd)
{
if (stfd) {
// we must ensure the close is ok.
- int err = st_netfd_close(stfd);
+ int err = st_netfd_close((st_netfd_t)stfd);
srs_assert(err != -1);
stfd = NULL;
}
@@ -103,6 +105,150 @@ void srs_socket_reuse_addr(int fd)
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int));
}
+srs_thread_t srs_thread_self()
+{
+ return (srs_thread_t)st_thread_self();
+}
+
+int srs_socket_connect(string server, int port, int64_t tm, srs_netfd_t* pstfd)
+{
+ int ret = ERROR_SUCCESS;
+
+ st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
+ if (tm != SRS_CONSTS_NO_TMMS) {
+ timeout = (st_utime_t)(tm * 1000);
+ }
+
+ *pstfd = NULL;
+ srs_netfd_t stfd = NULL;
+ sockaddr_in addr;
+
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if(sock == -1){
+ ret = ERROR_SOCKET_CREATE;
+ srs_error("create socket error. ret=%d", ret);
+ return ret;
+ }
+
+ srs_fd_close_exec(sock);
+
+ srs_assert(!stfd);
+ stfd = st_netfd_open_socket(sock);
+ if(stfd == NULL){
+ ret = ERROR_ST_OPEN_SOCKET;
+ srs_error("st_netfd_open_socket failed. ret=%d", ret);
+ return ret;
+ }
+
+ // connect to server.
+ std::string ip = srs_dns_resolve(server);
+ if (ip.empty()) {
+ ret = ERROR_SYSTEM_IP_INVALID;
+ srs_error("dns resolve server error, ip empty. ret=%d", ret);
+ goto failed;
+ }
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = inet_addr(ip.c_str());
+
+ if (st_connect((st_netfd_t)stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
+ ret = ERROR_ST_CONNECT;
+ srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
+ goto failed;
+ }
+ srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
+
+ *pstfd = stfd;
+ return ret;
+
+failed:
+ if (stfd) {
+ srs_close_stfd(stfd);
+ }
+ return ret;
+}
+
+srs_cond_t srs_cond_new()
+{
+ return (srs_cond_t)st_cond_new();
+}
+
+int srs_cond_destroy(srs_cond_t cond)
+{
+ return st_cond_destroy((st_cond_t)cond);
+}
+
+int srs_cond_wait(srs_cond_t cond)
+{
+ return st_cond_wait((st_cond_t)cond);
+}
+
+int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout)
+{
+ return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout);
+}
+
+int srs_cond_signal(srs_cond_t cond)
+{
+ return st_cond_signal((st_cond_t)cond);
+}
+
+srs_mutex_t srs_mutex_new()
+{
+ return (srs_mutex_t)st_mutex_new();
+}
+
+int srs_mutex_destroy(srs_mutex_t mutex)
+{
+ return st_mutex_destroy((st_mutex_t)mutex);
+}
+
+int srs_mutex_lock(srs_mutex_t mutex)
+{
+ return st_mutex_lock((st_mutex_t)mutex);
+}
+
+int srs_mutex_unlock(srs_mutex_t mutex)
+{
+ return st_mutex_unlock((st_mutex_t)mutex);
+}
+
+int srs_netfd_fileno(srs_netfd_t stfd)
+{
+ return st_netfd_fileno((st_netfd_t)stfd);
+}
+
+int srs_usleep(srs_utime_t usecs)
+{
+ return st_usleep((st_utime_t)usecs);
+}
+
+srs_netfd_t srs_netfd_open_socket(int osfd)
+{
+ return (srs_netfd_t)st_netfd_open_socket(osfd);
+}
+
+srs_netfd_t srs_netfd_open(int osfd)
+{
+ return (srs_netfd_t)st_netfd_open(osfd);
+}
+
+int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout)
+{
+ return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
+}
+
+srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
+{
+ return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);
+}
+
+ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
+{
+ return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
+}
+
SrsStSocket::SrsStSocket()
{
stfd = NULL;
@@ -114,7 +260,7 @@ SrsStSocket::~SrsStSocket()
{
}
-int SrsStSocket::initialize(st_netfd_t fd)
+int SrsStSocket::initialize(srs_netfd_t fd)
{
stfd = fd;
return ERROR_SUCCESS;
@@ -161,9 +307,9 @@ int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
- nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
+ nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
- nb_read = st_read(stfd, buf, size, rtm * 1000);
+ nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
@@ -197,9 +343,9 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
- nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
+ nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
- nb_read = st_read_fully(stfd, buf, size, rtm * 1000);
+ nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
@@ -233,9 +379,9 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
- nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
+ nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
- nb_write = st_write(stfd, buf, size, stm * 1000);
+ nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
}
if (nwrite) {
@@ -264,9 +410,9 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
- nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
+ nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
- nb_write = st_writev(stfd, iov, iov_size, stm * 1000);
+ nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
}
if (nwrite) {
diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp
index 1faf95373..5d6cc50e9 100644
--- a/trunk/src/service/srs_service_st.hpp
+++ b/trunk/src/service/srs_service_st.hpp
@@ -27,16 +27,24 @@
#include
#include
-#include
#include
+// Wrap for coroutine.
+typedef void* srs_netfd_t;
+typedef void* srs_thread_t;
+typedef void* srs_cond_t;
+typedef void* srs_mutex_t;
+typedef uint64_t srs_utime_t;
+
+#define SRS_UTIME_NO_TIMEOUT ((srs_utime_t) -1LL)
+
// initialize st, requires epoll.
extern int srs_st_init();
// close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
-extern void srs_close_stfd(st_netfd_t& stfd);
+extern void srs_close_stfd(srs_netfd_t& stfd);
// Set the FD_CLOEXEC of FD.
extern void srs_fd_close_exec(int fd);
@@ -44,6 +52,38 @@ extern void srs_fd_close_exec(int fd);
// Set the SO_REUSEADDR of socket.
extern void srs_socket_reuse_addr(int fd);
+// Get current coroutine/thread.
+extern srs_thread_t srs_thread_self();
+
+// client open socket and connect to server.
+// @param tm The timeout in ms.
+extern int srs_socket_connect(std::string server, int port, int64_t tm, srs_netfd_t* pstfd);
+
+// Wrap for coroutine.
+extern srs_cond_t srs_cond_new();
+extern int srs_cond_destroy(srs_cond_t cond);
+extern int srs_cond_wait(srs_cond_t cond);
+extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
+extern int srs_cond_signal(srs_cond_t cond);
+
+extern srs_mutex_t srs_mutex_new();
+extern int srs_mutex_destroy(srs_mutex_t mutex);
+extern int srs_mutex_lock(srs_mutex_t mutex);
+extern int srs_mutex_unlock(srs_mutex_t mutex);
+
+extern int srs_netfd_fileno(srs_netfd_t stfd);
+
+extern int srs_usleep(srs_utime_t usecs);
+
+extern srs_netfd_t srs_netfd_open_socket(int osfd);
+extern srs_netfd_t srs_netfd_open(int osfd);
+
+extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
+
+extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);
+
+extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
+
/**
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
@@ -59,13 +99,13 @@ private:
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
- st_netfd_t stfd;
+ srs_netfd_t stfd;
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
- virtual int initialize(st_netfd_t fd);
+ virtual int initialize(srs_netfd_t fd);
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
@@ -100,7 +140,7 @@ public:
class SrsTcpClient : public ISrsProtocolReaderWriter
{
private:
- st_netfd_t stfd;
+ srs_netfd_t stfd;
SrsStSocket* io;
private:
std::string host;
diff --git a/trunk/src/service/srs_service_utility.cpp b/trunk/src/service/srs_service_utility.cpp
index 2d36d2387..5b255c967 100644
--- a/trunk/src/service/srs_service_utility.cpp
+++ b/trunk/src/service/srs_service_utility.cpp
@@ -36,65 +36,6 @@ using namespace std;
#include
#include
-int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd)
-{
- int ret = ERROR_SUCCESS;
-
- st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
- if (tm != SRS_CONSTS_NO_TMMS) {
- timeout = (st_utime_t)(tm * 1000);
- }
-
- *pstfd = NULL;
- st_netfd_t stfd = NULL;
- sockaddr_in addr;
-
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if(sock == -1){
- ret = ERROR_SOCKET_CREATE;
- srs_error("create socket error. ret=%d", ret);
- return ret;
- }
-
- srs_fd_close_exec(sock);
-
- srs_assert(!stfd);
- stfd = st_netfd_open_socket(sock);
- if(stfd == NULL){
- ret = ERROR_ST_OPEN_SOCKET;
- srs_error("st_netfd_open_socket failed. ret=%d", ret);
- return ret;
- }
-
- // connect to server.
- std::string ip = srs_dns_resolve(server);
- if (ip.empty()) {
- ret = ERROR_SYSTEM_IP_INVALID;
- srs_error("dns resolve server error, ip empty. ret=%d", ret);
- goto failed;
- }
-
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = inet_addr(ip.c_str());
-
- if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
- ret = ERROR_ST_CONNECT;
- srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
- goto failed;
- }
- srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
-
- *pstfd = stfd;
- return ret;
-
-failed:
- if (stfd) {
- srs_close_stfd(stfd);
- }
- return ret;
-}
-
bool srs_string_is_http(string url)
{
return srs_string_starts_with(url, "http://", "https://");
diff --git a/trunk/src/service/srs_service_utility.hpp b/trunk/src/service/srs_service_utility.hpp
index 6e85a141d..d9b613b9c 100644
--- a/trunk/src/service/srs_service_utility.hpp
+++ b/trunk/src/service/srs_service_utility.hpp
@@ -32,10 +32,6 @@
#include
-// client open socket and connect to server.
-// @param tm The timeout in ms.
-extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd);
-
// whether the url is starts with http:// or https://
extern bool srs_string_is_http(std::string url);
extern bool srs_string_is_rtmp(std::string url);