From 6009395c10b10c151f2fe56bbe6c2011633cff51 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 7 Jun 2022 21:04:04 +0800 Subject: [PATCH] SRT: Hide srt implements from API. --- trunk/src/app/srs_app_srt_conn.cpp | 4 +- trunk/src/app/srs_app_srt_conn.hpp | 8 +- trunk/src/app/srs_app_srt_listener.cpp | 7 +- trunk/src/app/srs_app_srt_listener.hpp | 6 +- trunk/src/app/srs_app_srt_server.cpp | 12 +-- trunk/src/app/srs_app_srt_server.hpp | 6 +- trunk/src/protocol/srs_protocol_srt.cpp | 115 ++++++++++++++---------- trunk/src/protocol/srs_protocol_srt.hpp | 82 +++++++++-------- trunk/src/utest/srs_utest_srt.cpp | 44 ++++----- 9 files changed, 157 insertions(+), 127 deletions(-) diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index a524605f4..6071918b8 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -20,7 +20,7 @@ using namespace std; #include #include -SrsSrtConnection::SrsSrtConnection(SRTSOCKET srt_fd) +SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd) { srt_fd_ = srt_fd; srt_skt_ = new SrsSrtSocket(_srt_eventloop->poller(), srt_fd_); @@ -148,7 +148,7 @@ srs_error_t SrsSrtRecvThread::get_recv_err() return srs_error_copy(recv_err_); } -SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port) +SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 84a1cf79d..c1d54edf2 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -28,7 +28,7 @@ class SrsSrtServer; class SrsSrtConnection : public ISrsProtocolReadWriter { public: - SrsSrtConnection(SRTSOCKET srt_fd); + SrsSrtConnection(srs_srt_t srt_fd); virtual ~SrsSrtConnection(); public: virtual srs_error_t initialize(); @@ -46,7 +46,7 @@ public: virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); private: // The underlayer srt fd handler. - SRTSOCKET srt_fd_; + srs_srt_t srt_fd_; // The underlayer srt socket. SrsSrtSocket* srt_skt_; }; @@ -73,7 +73,7 @@ private: class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler { public: - SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port); + SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port); virtual ~SrsMpegtsSrtConn(); // Interface ISrsResource. public: @@ -111,7 +111,7 @@ private: void http_hooks_on_stop(); private: SrsSrtServer* srt_server_; - SRTSOCKET srt_fd_; + srs_srt_t srt_fd_; SrsSrtConnection* srt_conn_; SrsWallClock* clock_; SrsKbps* kbps_; diff --git a/trunk/src/app/srs_app_srt_listener.cpp b/trunk/src/app/srs_app_srt_listener.cpp index 5931ca93d..4f223957c 100644 --- a/trunk/src/app/srs_app_srt_listener.cpp +++ b/trunk/src/app/srs_app_srt_listener.cpp @@ -26,7 +26,7 @@ SrsSrtListener::SrsSrtListener(ISrsSrtHandler* h, std::string i, int p) ip_ = i; port_ = p; - lfd_ = SRT_INVALID_SOCK; + lfd_ = srs_srt_socket_invalid(); srt_skt_ = NULL; trd_ = new SrsDummyCoroutine(); @@ -36,7 +36,8 @@ SrsSrtListener::~SrsSrtListener() { srs_freep(trd_); srs_freep(srt_skt_); - srt_close(lfd_); + // TODO: FIXME: Handle error. + srs_srt_close(lfd_); } int SrsSrtListener::fd() @@ -84,7 +85,7 @@ srs_error_t SrsSrtListener::cycle() return srs_error_wrap(err, "srt listener"); } - SRTSOCKET client_srt_fd = SRT_INVALID_SOCK; + srs_srt_t client_srt_fd = srs_srt_socket_invalid(); if ((err = srt_skt_->accept(&client_srt_fd)) != srs_success) { return srs_error_wrap(err, "srt accept"); } diff --git a/trunk/src/app/srs_app_srt_listener.hpp b/trunk/src/app/srs_app_srt_listener.hpp index fe9f9466b..86a3a7ae0 100644 --- a/trunk/src/app/srs_app_srt_listener.hpp +++ b/trunk/src/app/srs_app_srt_listener.hpp @@ -21,14 +21,14 @@ public: virtual ~ISrsSrtHandler(); public: // When got srt client. - virtual srs_error_t on_srt_client(SRTSOCKET srt_fd) = 0; + virtual srs_error_t on_srt_client(srs_srt_t srt_fd) = 0; }; // Bind and listen SRT(udp) port, use handler to process the client. class SrsSrtListener : public ISrsCoroutineHandler { private: - SRTSOCKET lfd_; + srs_srt_t lfd_; SrsSrtSocket* srt_skt_; SrsCoroutine* trd_; private: @@ -39,7 +39,7 @@ public: SrsSrtListener(ISrsSrtHandler* h, std::string i, int p); virtual ~SrsSrtListener(); public: - virtual SRTSOCKET fd(); + virtual srs_srt_t fd(); public: // Create srt socket, separate this step because of srt have some option must set before listen. virtual srs_error_t create_socket(); diff --git a/trunk/src/app/srs_app_srt_server.cpp b/trunk/src/app/srs_app_srt_server.cpp index 38baab9ab..b8443d8e6 100644 --- a/trunk/src/app/srs_app_srt_server.cpp +++ b/trunk/src/app/srs_app_srt_server.cpp @@ -141,7 +141,7 @@ srs_error_t SrsSrtMessageAcceptor::set_srt_opt() return err; } -srs_error_t SrsSrtMessageAcceptor::on_srt_client(SRTSOCKET srt_fd) +srs_error_t SrsSrtMessageAcceptor::on_srt_client(srs_srt_t srt_fd) { // Notify srt server to accept srt client, and create new SrsSrtConn on it. srs_error_t err = srt_server_->accept_srt_client(type_, srt_fd); @@ -226,7 +226,7 @@ void SrsSrtServer::close_listeners(SrsSrtListenerType type) } } -srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET srt_fd) +srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, srs_srt_t srt_fd) { srs_error_t err = srs_success; @@ -234,7 +234,8 @@ srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET s if ((err = fd_to_resource(type, srt_fd, &conn)) != srs_success) { //close fd on conn error, otherwise will lead to fd leak -gs - srt_close(srt_fd); + // TODO: FIXME: Handle error. + srs_srt_close(srt_fd); return srs_error_wrap(err, "srt fd to resource"); } srs_assert(conn); @@ -249,7 +250,7 @@ srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET s return err; } -srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_fd, ISrsStartableConneciton** pr) +srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, srs_srt_t srt_fd, ISrsStartableConneciton** pr) { srs_error_t err = srs_success; @@ -271,7 +272,8 @@ srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_ *pr = new SrsMpegtsSrtConn(this, srt_fd, ip, port); } else { srs_warn("close for no service handler. srtfd=%d, ip=%s:%d", srt_fd, ip.c_str(), port); - srt_close(srt_fd); + // TODO: FIXME: Handle error. + srs_srt_close(srt_fd); return err; } diff --git a/trunk/src/app/srs_app_srt_server.hpp b/trunk/src/app/srs_app_srt_server.hpp index 386806cf8..11d56a5f7 100644 --- a/trunk/src/app/srs_app_srt_server.hpp +++ b/trunk/src/app/srs_app_srt_server.hpp @@ -50,7 +50,7 @@ public: virtual srs_error_t set_srt_opt(); // Interface ISrsSrtHandler public: - virtual srs_error_t on_srt_client(SRTSOCKET srt_fd); + virtual srs_error_t on_srt_client(srs_srt_t srt_fd); }; // SRS SRT server, initialize and listen, start connection service thread, destroy client. @@ -78,9 +78,9 @@ public: // @param type, the client type, used to create concrete connection, // for instance SRT connection to serve client. // @param srt_fd, the client fd in srt boxed, the underlayer fd. - virtual srs_error_t accept_srt_client(SrsSrtListenerType type, SRTSOCKET srt_fd); + virtual srs_error_t accept_srt_client(SrsSrtListenerType type, srs_srt_t srt_fd); private: - virtual srs_error_t fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_fd, ISrsStartableConneciton** pr); + virtual srs_error_t fd_to_resource(SrsSrtListenerType type, srs_srt_t srt_fd, ISrsStartableConneciton** pr); // Interface ISrsResourceManager public: // A callback for connection to remove itself. diff --git a/trunk/src/protocol/srs_protocol_srt.cpp b/trunk/src/protocol/srs_protocol_srt.cpp index 0e61aadfc..53b77c2c9 100644 --- a/trunk/src/protocol/srs_protocol_srt.cpp +++ b/trunk/src/protocol/srs_protocol_srt.cpp @@ -14,6 +14,8 @@ using namespace std; #include #include +#include + #define SET_SRT_OPT_STR(srtfd, optname, buf, size) \ if (srt_setsockflag(srtfd, optname, buf, size) == SRT_ERROR) { \ std::stringstream ss; \ @@ -41,7 +43,7 @@ using namespace std; } \ } while (0) -static srs_error_t do_srs_srt_listen(SRTSOCKET srt_fd, addrinfo* r) +static srs_error_t do_srs_srt_listen(srs_srt_t srt_fd, addrinfo* r) { srs_error_t err = srs_success; @@ -60,7 +62,7 @@ static srs_error_t do_srs_srt_listen(SRTSOCKET srt_fd, addrinfo* r) return err; } -static srs_error_t do_srs_srt_get_streamid(SRTSOCKET srt_fd, string& streamid) +static srs_error_t do_srs_srt_get_streamid(srs_srt_t srt_fd, string& streamid) { // SRT max streamid length is 512. char sid[512]; @@ -70,11 +72,16 @@ static srs_error_t do_srs_srt_get_streamid(SRTSOCKET srt_fd, string& streamid) return srs_success; } -srs_error_t srs_srt_socket(SRTSOCKET* pfd) +srs_srt_t srs_srt_socket_invalid() +{ + return SRT_INVALID_SOCK; +} + +srs_error_t srs_srt_socket(srs_srt_t* pfd) { srs_error_t err = srs_success; - SRTSOCKET srt_fd = 0; + srs_srt_t srt_fd = 0; if ((srt_fd = srt_create_socket()) < 0) { return srs_error_new(ERROR_SOCKET_CREATE, "create srt socket"); } @@ -84,11 +91,18 @@ srs_error_t srs_srt_socket(SRTSOCKET* pfd) return err; } -srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd) +srs_error_t srs_srt_close(srs_srt_t fd) +{ + // TODO: FIXME: Handle error. + srt_close(fd); + return srs_success; +} + +srs_error_t srs_srt_socket_with_default_option(srs_srt_t* pfd) { srs_error_t err = srs_success; - SRTSOCKET srt_fd = 0; + srs_srt_t srt_fd = 0; if ((srt_fd = srt_create_socket()) < 0) { return srs_error_new(ERROR_SOCKET_CREATE, "create srt socket"); } @@ -114,7 +128,7 @@ srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd) return err; } -srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port) +srs_error_t srs_srt_listen(srs_srt_t srt_fd, std::string ip, int port) { srs_error_t err = srs_success; @@ -142,7 +156,7 @@ srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port) return err; } -srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd) +srs_error_t srs_srt_nonblock(srs_srt_t srt_fd) { int sync = 0; SET_SRT_OPT(srt_fd, SRTO_SNDSYN, sync); @@ -151,157 +165,157 @@ srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd) return srs_success; } -srs_error_t srs_srt_set_maxbw(SRTSOCKET srt_fd, int maxbw) +srs_error_t srs_srt_set_maxbw(srs_srt_t srt_fd, int maxbw) { SET_SRT_OPT(srt_fd, SRTO_MAXBW, maxbw); return srs_success; } -srs_error_t srs_srt_set_mss(SRTSOCKET srt_fd, int mss) +srs_error_t srs_srt_set_mss(srs_srt_t srt_fd, int mss) { SET_SRT_OPT(srt_fd, SRTO_MSS, mss); return srs_success; } -srs_error_t srs_srt_set_payload_size(SRTSOCKET srt_fd, int payload_size) +srs_error_t srs_srt_set_payload_size(srs_srt_t srt_fd, int payload_size) { SET_SRT_OPT(srt_fd, SRTO_PAYLOADSIZE, payload_size); return srs_success; } -srs_error_t srs_srt_set_connect_timeout(SRTSOCKET srt_fd, int timeout) +srs_error_t srs_srt_set_connect_timeout(srs_srt_t srt_fd, int timeout) { SET_SRT_OPT(srt_fd, SRTO_CONNTIMEO, timeout); return srs_success; } -srs_error_t srs_srt_set_peer_idle_timeout(SRTSOCKET srt_fd, int timeout) +srs_error_t srs_srt_set_peer_idle_timeout(srs_srt_t srt_fd, int timeout) { SET_SRT_OPT(srt_fd, SRTO_PEERIDLETIMEO, timeout); return srs_success; } -srs_error_t srs_srt_set_tsbpdmode(SRTSOCKET srt_fd, bool tsbpdmode) +srs_error_t srs_srt_set_tsbpdmode(srs_srt_t srt_fd, bool tsbpdmode) { SET_SRT_OPT(srt_fd, SRTO_TSBPDMODE, tsbpdmode); return srs_success; } -srs_error_t srs_srt_set_sndbuf(SRTSOCKET srt_fd, int sndbuf) +srs_error_t srs_srt_set_sndbuf(srs_srt_t srt_fd, int sndbuf) { SET_SRT_OPT(srt_fd, SRTO_SNDBUF, sndbuf); return srs_success; } -srs_error_t srs_srt_set_rcvbuf(SRTSOCKET srt_fd, int rcvbuf) +srs_error_t srs_srt_set_rcvbuf(srs_srt_t srt_fd, int rcvbuf) { SET_SRT_OPT(srt_fd, SRTO_RCVBUF, rcvbuf); return srs_success; } -srs_error_t srs_srt_set_tlpktdrop(SRTSOCKET srt_fd, bool tlpktdrop) +srs_error_t srs_srt_set_tlpktdrop(srs_srt_t srt_fd, bool tlpktdrop) { SET_SRT_OPT(srt_fd, SRTO_TLPKTDROP, tlpktdrop); return srs_success; } -srs_error_t srs_srt_set_latency(SRTSOCKET srt_fd, int latency) +srs_error_t srs_srt_set_latency(srs_srt_t srt_fd, int latency) { SET_SRT_OPT(srt_fd, SRTO_LATENCY, latency); return srs_success; } -srs_error_t srs_srt_set_rcv_latency(SRTSOCKET srt_fd, int rcv_latency) +srs_error_t srs_srt_set_rcv_latency(srs_srt_t srt_fd, int rcv_latency) { SET_SRT_OPT(srt_fd, SRTO_RCVLATENCY, rcv_latency); return srs_success; } -srs_error_t srs_srt_set_peer_latency(SRTSOCKET srt_fd, int peer_latency) +srs_error_t srs_srt_set_peer_latency(srs_srt_t srt_fd, int peer_latency) { SET_SRT_OPT(srt_fd, SRTO_PEERLATENCY, peer_latency); return srs_success; } -srs_error_t srs_srt_set_streamid(SRTSOCKET srt_fd, const std::string& streamid) +srs_error_t srs_srt_set_streamid(srs_srt_t srt_fd, const std::string& streamid) { SET_SRT_OPT_STR(srt_fd, SRTO_STREAMID, streamid.data(), streamid.size()); return srs_success; } -srs_error_t srs_srt_get_maxbw(SRTSOCKET srt_fd, int& maxbw) +srs_error_t srs_srt_get_maxbw(srs_srt_t srt_fd, int& maxbw) { GET_SRT_OPT(srt_fd, SRTO_MAXBW, maxbw); return srs_success; } -srs_error_t srs_srt_get_mss(SRTSOCKET srt_fd, int& mss) +srs_error_t srs_srt_get_mss(srs_srt_t srt_fd, int& mss) { GET_SRT_OPT(srt_fd, SRTO_MSS, mss); return srs_success; } -srs_error_t srs_srt_get_payload_size(SRTSOCKET srt_fd, int& payload_size) +srs_error_t srs_srt_get_payload_size(srs_srt_t srt_fd, int& payload_size) { GET_SRT_OPT(srt_fd, SRTO_PAYLOADSIZE, payload_size); return srs_success; } -srs_error_t srs_srt_get_connect_timeout(SRTSOCKET srt_fd, int& timeout) +srs_error_t srs_srt_get_connect_timeout(srs_srt_t srt_fd, int& timeout) { GET_SRT_OPT(srt_fd, SRTO_CONNTIMEO, timeout); return srs_success; } -srs_error_t srs_srt_get_peer_idle_timeout(SRTSOCKET srt_fd, int& timeout) +srs_error_t srs_srt_get_peer_idle_timeout(srs_srt_t srt_fd, int& timeout) { GET_SRT_OPT(srt_fd, SRTO_PEERIDLETIMEO, timeout); return srs_success; } -srs_error_t srs_srt_get_tsbpdmode(SRTSOCKET srt_fd, bool& tsbpdmode) +srs_error_t srs_srt_get_tsbpdmode(srs_srt_t srt_fd, bool& tsbpdmode) { GET_SRT_OPT(srt_fd, SRTO_TSBPDMODE, tsbpdmode); return srs_success; } -srs_error_t srs_srt_get_sndbuf(SRTSOCKET srt_fd, int& sndbuf) +srs_error_t srs_srt_get_sndbuf(srs_srt_t srt_fd, int& sndbuf) { GET_SRT_OPT(srt_fd, SRTO_SNDBUF, sndbuf); return srs_success; } -srs_error_t srs_srt_get_rcvbuf(SRTSOCKET srt_fd, int& rcvbuf) +srs_error_t srs_srt_get_rcvbuf(srs_srt_t srt_fd, int& rcvbuf) { GET_SRT_OPT(srt_fd, SRTO_RCVBUF, rcvbuf); return srs_success; } -srs_error_t srs_srt_get_tlpktdrop(SRTSOCKET srt_fd, bool& tlpktdrop) +srs_error_t srs_srt_get_tlpktdrop(srs_srt_t srt_fd, bool& tlpktdrop) { GET_SRT_OPT(srt_fd, SRTO_TLPKTDROP, tlpktdrop); return srs_success; } -srs_error_t srs_srt_get_latency(SRTSOCKET srt_fd, int& latency) +srs_error_t srs_srt_get_latency(srs_srt_t srt_fd, int& latency) { GET_SRT_OPT(srt_fd, SRTO_LATENCY, latency); return srs_success; } -srs_error_t srs_srt_get_rcv_latency(SRTSOCKET srt_fd, int& rcv_latency) +srs_error_t srs_srt_get_rcv_latency(srs_srt_t srt_fd, int& rcv_latency) { GET_SRT_OPT(srt_fd, SRTO_RCVLATENCY, rcv_latency); return srs_success; } -srs_error_t srs_srt_get_peer_latency(SRTSOCKET srt_fd, int& peer_latency) +srs_error_t srs_srt_get_peer_latency(srs_srt_t srt_fd, int& peer_latency) { GET_SRT_OPT(srt_fd, SRTO_PEERLATENCY, peer_latency); return srs_success; } -srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid) +srs_error_t srs_srt_get_streamid(srs_srt_t srt_fd, std::string& streamid) { srs_error_t err = srs_success; @@ -312,7 +326,7 @@ srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid) return err; } -srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port) +srs_error_t srs_srt_get_local_ip_port(srs_srt_t srt_fd, std::string& ip, int& port) { srs_error_t err = srs_success; @@ -344,7 +358,7 @@ srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& po return err; } -srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port) +srs_error_t srs_srt_get_remote_ip_port(srs_srt_t srt_fd, std::string& ip, int& port) { srs_error_t err = srs_success; @@ -427,7 +441,7 @@ int SrsSrtStat::pktSndDrop() return ((SRT_TRACEBSTATS*)stat_)->pktSndDrop; } -srs_error_t SrsSrtStat::fetch(SRTSOCKET srt_fd, bool clear) +srs_error_t SrsSrtStat::fetch(srs_srt_t srt_fd, bool clear) { srs_error_t err = srs_success; @@ -450,9 +464,11 @@ public: srs_error_t mod_socket(SrsSrtSocket* srt_skt); srs_error_t del_socket(SrsSrtSocket* srt_skt); srs_error_t wait(int timeout_ms, int* pn_fds); +public: + virtual int size(); private: - // Find SrsSrtSocket* context by SRTSOCKET. - std::map fd_sockets_; + // Find SrsSrtSocket* context by srs_srt_t. + std::map fd_sockets_; int srt_epoller_fd_; std::vector events_; }; @@ -487,7 +503,7 @@ srs_error_t SrsSrtPoller::add_socket(SrsSrtSocket* srt_skt) srs_error_t err = srs_success; int events = srt_skt->events(); - SRTSOCKET srtfd = srt_skt->fd(); + srs_srt_t srtfd = srt_skt->fd(); int ret = srt_epoll_add_usock(srt_epoller_fd_, srtfd, &events); @@ -506,7 +522,7 @@ srs_error_t SrsSrtPoller::del_socket(SrsSrtSocket* srt_skt) { srs_error_t err = srs_success; - SRTSOCKET srtfd = srt_skt->fd(); + srs_srt_t srtfd = srt_skt->fd(); int ret = srt_epoll_remove_usock(srt_epoller_fd_, srtfd); srs_info("srt poller %d remove srt socket %d", srt_epoller_fd_, srtfd); @@ -533,7 +549,7 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) for (int i = 0; i < ret; ++i) { SRT_EPOLL_EVENT event = events_[i]; - map::iterator iter = fd_sockets_.find(event.fd); + map::iterator iter = fd_sockets_.find(event.fd); if (iter == fd_sockets_.end()) { srs_assert(false); } @@ -557,12 +573,17 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) return err; } +int SrsSrtPoller::size() +{ + return (int)fd_sockets_.size(); +} + srs_error_t SrsSrtPoller::mod_socket(SrsSrtSocket* srt_skt) { srs_error_t err = srs_success; int events = srt_skt->events(); - SRTSOCKET srtfd = srt_skt->fd(); + srs_srt_t srtfd = srt_skt->fd(); int ret = srt_epoll_update_usock(srt_epoller_fd_, srtfd, &events); srs_info("srt poller %d update srt socket %d, events=%d", srt_epoller_fd_, srtfd, events); @@ -587,7 +608,7 @@ ISrsSrtPoller* srs_srt_poller_new() return new SrsSrtPoller(); } -SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd) +SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller* srt_poller, srs_srt_t srt_fd) { srt_poller_ = srt_poller; srt_fd_ = srt_fd; @@ -657,7 +678,7 @@ srs_error_t SrsSrtSocket::connect(const string& ip, int port) return err; } -srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd) +srs_error_t SrsSrtSocket::accept(srs_srt_t* client_srt_fd) { srs_error_t err = srs_success; @@ -665,10 +686,10 @@ srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd) sockaddr_in inaddr; int addrlen = sizeof(inaddr); // @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_accept - SRTSOCKET srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen); + srs_srt_t srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen); // Accept ok, return with the SRT client fd. - if (srt_fd != SRT_INVALID_SOCK) { + if (srt_fd != srs_srt_socket_invalid()) { *client_srt_fd = srt_fd; return err; } diff --git a/trunk/src/protocol/srs_protocol_srt.hpp b/trunk/src/protocol/srs_protocol_srt.hpp index df2cb309b..a2f4189fe 100644 --- a/trunk/src/protocol/srs_protocol_srt.hpp +++ b/trunk/src/protocol/srs_protocol_srt.hpp @@ -13,55 +13,57 @@ #include #include -#include - class SrsSrtSocket; +typedef int srs_srt_t; +extern srs_srt_t srs_srt_socket_invalid(); + // Create srt socket only, with libsrt's default option. -extern srs_error_t srs_srt_socket(SRTSOCKET* pfd); +extern srs_error_t srs_srt_socket(srs_srt_t* pfd); +extern srs_error_t srs_srt_close(srs_srt_t fd); // Create srt socket with srs recommend default option(tsbpdmode=false,tlpktdrop=false,latency=0,sndsyn=0,rcvsyn=0) -extern srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd); +extern srs_error_t srs_srt_socket_with_default_option(srs_srt_t* pfd); // For server, listen at SRT endpoint. -extern srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port); +extern srs_error_t srs_srt_listen(srs_srt_t srt_fd, std::string ip, int port); // Set read/write no block. -extern srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd); +extern srs_error_t srs_srt_nonblock(srs_srt_t srt_fd); // Set SRT options. -extern srs_error_t srs_srt_set_maxbw(SRTSOCKET srt_fd, int maxbw); -extern srs_error_t srs_srt_set_mss(SRTSOCKET srt_fd, int mss); -extern srs_error_t srs_srt_set_payload_size(SRTSOCKET srt_fd, int payload_size); -extern srs_error_t srs_srt_set_connect_timeout(SRTSOCKET srt_fd, int timeout); -extern srs_error_t srs_srt_set_peer_idle_timeout(SRTSOCKET srt_fd, int timeout); -extern srs_error_t srs_srt_set_tsbpdmode(SRTSOCKET srt_fd, bool tsbpdmode); -extern srs_error_t srs_srt_set_sndbuf(SRTSOCKET srt_fd, int sndbuf); -extern srs_error_t srs_srt_set_rcvbuf(SRTSOCKET srt_fd, int rcvbuf); -extern srs_error_t srs_srt_set_tlpktdrop(SRTSOCKET srt_fd, bool tlpktdrop); -extern srs_error_t srs_srt_set_latency(SRTSOCKET srt_fd, int latency); -extern srs_error_t srs_srt_set_rcv_latency(SRTSOCKET srt_fd, int rcv_latency); -extern srs_error_t srs_srt_set_peer_latency(SRTSOCKET srt_fd, int peer_latency); -extern srs_error_t srs_srt_set_streamid(SRTSOCKET srt_fd, const std::string& streamid); +extern srs_error_t srs_srt_set_maxbw(srs_srt_t srt_fd, int maxbw); +extern srs_error_t srs_srt_set_mss(srs_srt_t srt_fd, int mss); +extern srs_error_t srs_srt_set_payload_size(srs_srt_t srt_fd, int payload_size); +extern srs_error_t srs_srt_set_connect_timeout(srs_srt_t srt_fd, int timeout); +extern srs_error_t srs_srt_set_peer_idle_timeout(srs_srt_t srt_fd, int timeout); +extern srs_error_t srs_srt_set_tsbpdmode(srs_srt_t srt_fd, bool tsbpdmode); +extern srs_error_t srs_srt_set_sndbuf(srs_srt_t srt_fd, int sndbuf); +extern srs_error_t srs_srt_set_rcvbuf(srs_srt_t srt_fd, int rcvbuf); +extern srs_error_t srs_srt_set_tlpktdrop(srs_srt_t srt_fd, bool tlpktdrop); +extern srs_error_t srs_srt_set_latency(srs_srt_t srt_fd, int latency); +extern srs_error_t srs_srt_set_rcv_latency(srs_srt_t srt_fd, int rcv_latency); +extern srs_error_t srs_srt_set_peer_latency(srs_srt_t srt_fd, int peer_latency); +extern srs_error_t srs_srt_set_streamid(srs_srt_t srt_fd, const std::string& streamid); // Get SRT options. -extern srs_error_t srs_srt_get_maxbw(SRTSOCKET srt_fd, int& maxbw); -extern srs_error_t srs_srt_get_mss(SRTSOCKET srt_fd, int& mss); -extern srs_error_t srs_srt_get_payload_size(SRTSOCKET srt_fd, int& payload_size); -extern srs_error_t srs_srt_get_connect_timeout(SRTSOCKET srt_fd, int& timeout); -extern srs_error_t srs_srt_get_peer_idle_timeout(SRTSOCKET srt_fd, int& timeout); -extern srs_error_t srs_srt_get_tsbpdmode(SRTSOCKET srt_fd, bool& tsbpdmode); -extern srs_error_t srs_srt_get_sndbuf(SRTSOCKET srt_fd, int& sndbuf); -extern srs_error_t srs_srt_get_rcvbuf(SRTSOCKET srt_fd, int& rcvbuf); -extern srs_error_t srs_srt_get_tlpktdrop(SRTSOCKET srt_fd, bool& tlpktdrop); -extern srs_error_t srs_srt_get_latency(SRTSOCKET srt_fd, int& latency); -extern srs_error_t srs_srt_get_rcv_latency(SRTSOCKET srt_fd, int& rcv_latency); -extern srs_error_t srs_srt_get_peer_latency(SRTSOCKET srt_fd, int& peer_latency); -extern srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid); +extern srs_error_t srs_srt_get_maxbw(srs_srt_t srt_fd, int& maxbw); +extern srs_error_t srs_srt_get_mss(srs_srt_t srt_fd, int& mss); +extern srs_error_t srs_srt_get_payload_size(srs_srt_t srt_fd, int& payload_size); +extern srs_error_t srs_srt_get_connect_timeout(srs_srt_t srt_fd, int& timeout); +extern srs_error_t srs_srt_get_peer_idle_timeout(srs_srt_t srt_fd, int& timeout); +extern srs_error_t srs_srt_get_tsbpdmode(srs_srt_t srt_fd, bool& tsbpdmode); +extern srs_error_t srs_srt_get_sndbuf(srs_srt_t srt_fd, int& sndbuf); +extern srs_error_t srs_srt_get_rcvbuf(srs_srt_t srt_fd, int& rcvbuf); +extern srs_error_t srs_srt_get_tlpktdrop(srs_srt_t srt_fd, bool& tlpktdrop); +extern srs_error_t srs_srt_get_latency(srs_srt_t srt_fd, int& latency); +extern srs_error_t srs_srt_get_rcv_latency(srs_srt_t srt_fd, int& rcv_latency); +extern srs_error_t srs_srt_get_peer_latency(srs_srt_t srt_fd, int& peer_latency); +extern srs_error_t srs_srt_get_streamid(srs_srt_t srt_fd, std::string& streamid); // Get SRT socket info. -extern srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port); -extern srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port); +extern srs_error_t srs_srt_get_local_ip_port(srs_srt_t srt_fd, std::string& ip, int& port); +extern srs_error_t srs_srt_get_remote_ip_port(srs_srt_t srt_fd, std::string& ip, int& port); // Get SRT stats. class SrsSrtStat @@ -82,7 +84,7 @@ public: int pktRetrans(); int pktSndDrop(); public: - srs_error_t fetch(SRTSOCKET srt_fd, bool clear); + srs_error_t fetch(srs_srt_t srt_fd, bool clear); }; // Srt poller, subscribe/unsubscribed events and wait them fired. @@ -99,6 +101,8 @@ public: // Wait for the fds in its epoll to be fired in specified timeout_ms, where the pn_fds is the number of active fds. // Note that for ST, please always use timeout_ms(0) and switch coroutine by yourself. virtual srs_error_t wait(int timeout_ms, int* pn_fds) = 0; +public: + virtual int size() = 0; }; ISrsSrtPoller* srs_srt_poller_new(); @@ -106,15 +110,15 @@ ISrsSrtPoller* srs_srt_poller_new(); class SrsSrtSocket { public: - SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd); + SrsSrtSocket(ISrsSrtPoller* srt_poller, srs_srt_t srt_fd); virtual ~SrsSrtSocket(); public: // IO API srs_error_t connect(const std::string& ip, int port); - srs_error_t accept(SRTSOCKET* client_srt_fd); + srs_error_t accept(srs_srt_t* client_srt_fd); srs_error_t recvmsg(void* buf, size_t size, ssize_t* nread); srs_error_t sendmsg(void* buf, size_t size, ssize_t* nwrite); public: - SRTSOCKET fd() const { return srt_fd_; } + srs_srt_t fd() const { return srt_fd_; } int events() const { return events_; } public: void set_recv_timeout(srs_utime_t tm) { recv_timeout_ = tm; } @@ -148,7 +152,7 @@ private: srs_error_t check_error(); private: - SRTSOCKET srt_fd_; + srs_srt_t srt_fd_; // Mark if some error occured in srt socket. bool has_error_; // When read operator like recvmsg/accept would block, wait this condition timeout or notified, diff --git a/trunk/src/utest/srs_utest_srt.cpp b/trunk/src/utest/srs_utest_srt.cpp index bd3dd49a1..f01cfd763 100644 --- a/trunk/src/utest/srs_utest_srt.cpp +++ b/trunk/src/utest/srs_utest_srt.cpp @@ -16,6 +16,8 @@ #include using namespace std; +#include + extern SrsSrtEventLoop* _srt_eventloop; // Test srt st service @@ -26,7 +28,7 @@ VOID TEST(ServiceSrtPoller, SrtPollOperateSocket) ISrsSrtPoller* srt_poller = srs_srt_poller_new(); HELPER_EXPECT_SUCCESS(srt_poller->initialize()); - SRTSOCKET srt_fd = SRT_INVALID_SOCK; + srs_srt_t srt_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_fd)); EXPECT_TRUE(srt_fd > 0); @@ -53,10 +55,10 @@ VOID TEST(ServiceSrtPoller, SrtPollOperateSocket) EXPECT_FALSE(srt_socket->events() & SRT_EPOLL_OUT); EXPECT_TRUE(srt_socket->events() & SRT_EPOLL_ERR); - EXPECT_EQ(srt_poller->fd_sockets_.size(), 1); + EXPECT_EQ(srt_poller->size(), 1); // Delete socket, will remove in srt poller. srs_freep(srt_socket); - EXPECT_EQ(srt_poller->fd_sockets_.size(), 0); + EXPECT_EQ(srt_poller->size(), 0); srs_freep(srt_poller); } @@ -65,7 +67,7 @@ VOID TEST(ServiceSrtPoller, SrtSetGetSocketOpt) { srs_error_t err = srs_success; - SRTSOCKET srt_fd = SRT_INVALID_SOCK; + srs_srt_t srt_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_fd)); HELPER_EXPECT_SUCCESS(srs_srt_nonblock(srt_fd)); @@ -129,10 +131,10 @@ class MockSrtServer { public: SrsSrtSocket* srt_socket_; - SRTSOCKET srt_server_fd_; + srs_srt_t srt_server_fd_; MockSrtServer() { - srt_server_fd_ = SRT_INVALID_SOCK; + srt_server_fd_ = srs_srt_socket_invalid(); srt_socket_ = NULL; } @@ -160,7 +162,7 @@ public: srs_freep(srt_socket_); } - virtual srs_error_t accept(SRTSOCKET* client_fd) { + virtual srs_error_t accept(srs_srt_t* client_fd) { srs_error_t err = srs_success; if ((err = srt_socket_->accept(client_fd)) != srs_success) { @@ -182,32 +184,32 @@ VOID TEST(ServiceStSRTTest, ListenConnectAccept) HELPER_EXPECT_SUCCESS(srt_server.create_socket()); HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port)); - SRTSOCKET srt_client_fd = SRT_INVALID_SOCK; + srs_srt_t srt_client_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_client_fd)); SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd); // No client connected, accept will timeout. - SRTSOCKET srt_fd = SRT_INVALID_SOCK; + srs_srt_t srt_fd = srs_srt_socket_invalid(); // Make utest fast timeout. srt_server.srt_socket_->set_recv_timeout(50 * SRS_UTIME_MILLISECONDS); err = srt_server.accept(&srt_fd); EXPECT_EQ(srs_error_code(err), ERROR_SRT_TIMEOUT); - EXPECT_EQ(srt_fd, SRT_INVALID_SOCK); + EXPECT_EQ(srt_fd, srs_srt_socket_invalid()); // Client connect to server HELPER_EXPECT_SUCCESS(srt_client_socket->connect(server_ip, server_port)); // Server will accept one client. HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_fd)); - EXPECT_NE(srt_fd, SRT_INVALID_SOCK); + EXPECT_NE(srt_fd, srs_srt_socket_invalid()); } VOID TEST(ServiceStSRTTest, ConnectTimeout) { srs_error_t err = srs_success; - SRTSOCKET srt_client_fd = SRT_INVALID_SOCK; + srs_srt_t srt_client_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd)); SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd); @@ -228,16 +230,16 @@ VOID TEST(ServiceStSRTTest, ConnectWithStreamid) HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port)); std::string streamid = "SRS_SRT_Streamid"; - SRTSOCKET srt_client_fd = SRT_INVALID_SOCK; + srs_srt_t srt_client_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd)); HELPER_EXPECT_SUCCESS(srs_srt_set_streamid(srt_client_fd, streamid)); SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd); HELPER_EXPECT_SUCCESS(srt_client_socket->connect("127.0.0.1", 9000)); - SRTSOCKET srt_server_accepted_fd = SRT_INVALID_SOCK; + srs_srt_t srt_server_accepted_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_server_accepted_fd)); - EXPECT_NE(srt_server_accepted_fd, SRT_INVALID_SOCK); + EXPECT_NE(srt_server_accepted_fd, srs_srt_socket_invalid()); std::string s; HELPER_EXPECT_SUCCESS(srs_srt_get_streamid(srt_server_accepted_fd, s)); EXPECT_EQ(s, streamid); @@ -254,7 +256,7 @@ VOID TEST(ServiceStSRTTest, ReadWrite) HELPER_EXPECT_SUCCESS(srt_server.create_socket()); HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port)); - SRTSOCKET srt_client_fd = SRT_INVALID_SOCK; + srs_srt_t srt_client_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd)); SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd); @@ -262,9 +264,9 @@ VOID TEST(ServiceStSRTTest, ReadWrite) HELPER_EXPECT_SUCCESS(srt_client_socket->connect(server_ip, server_port)); // Server will accept one client. - SRTSOCKET srt_server_accepted_fd = SRT_INVALID_SOCK; + srs_srt_t srt_server_accepted_fd = srs_srt_socket_invalid(); HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_server_accepted_fd)); - EXPECT_NE(srt_server_accepted_fd, SRT_INVALID_SOCK); + EXPECT_NE(srt_server_accepted_fd, srs_srt_socket_invalid()); SrsSrtSocket* srt_server_accepted_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_server_accepted_fd); if (true) { @@ -307,15 +309,15 @@ VOID TEST(ServiceStSRTTest, ReadWrite) class MockSrtHandler : public ISrsSrtHandler { private: - SRTSOCKET srt_fd; + srs_srt_t srt_fd; public: MockSrtHandler() { - srt_fd = SRT_INVALID_SOCK; + srt_fd = srs_srt_socket_invalid(); } virtual ~MockSrtHandler() { } public: - virtual srs_error_t on_srt_client(SRTSOCKET fd) { + virtual srs_error_t on_srt_client(srs_srt_t fd) { srt_fd = fd; return srs_success; }