diff --git a/trunk/configure b/trunk/configure index 7f5edd6ed..887b07293 100755 --- a/trunk/configure +++ b/trunk/configure @@ -382,7 +382,7 @@ if [[ $SRS_UTEST == YES ]]; then "srs_utest_source_lock" "srs_utest_stream_token" "srs_utest_rtc_recv_track" "srs_utest_st2" "srs_utest_hevc_structs" "srs_utest_coworkers" "srs_utest_pithy_print" "srs_utest_protocol3" "srs_utest_app" "srs_utest_mock" "srs_utest_rtc_playstream" "srs_utest_rtc_publishstream" - "srs_utest_rtc_conn" "srs_utest_rtmp_conn") + "srs_utest_rtc_conn" "srs_utest_rtmp_conn" "srs_utest_srt_conn") MODULE_FILES+=("srs_utest_ai01" "srs_utest_ai02" "srs_utest_ai03" "srs_utest_ai04" "srs_utest_ai05" "srs_utest_ai06" "srs_utest_ai07" "srs_utest_ai08" "srs_utest_ai09" "srs_utest_ai10" "srs_utest_ai11" "srs_utest_ai12" "srs_utest_ai13" "srs_utest_ai14" "srs_utest_ai15" "srs_utest_ai16" "srs_utest_ai17" diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index d4c149b7d..1e84714ba 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -25,6 +25,14 @@ using namespace std; #include #include +ISrsSrtConnection::ISrsSrtConnection() +{ +} + +ISrsSrtConnection::~ISrsSrtConnection() +{ +} + SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd) { srt_fd_ = srt_fd; @@ -42,6 +50,21 @@ srs_error_t SrsSrtConnection::initialize() return err; } +srs_srt_t SrsSrtConnection::srtfd() +{ + return srt_fd_; +} + +srs_error_t SrsSrtConnection::get_streamid(std::string &streamid) +{ + return srs_srt_get_streamid(srt_fd_, streamid); +} + +srs_error_t SrsSrtConnection::get_stats(SrsSrtStat &stat) +{ + return stat.fetch(srt_fd_, true); +} + void SrsSrtConnection::set_recv_timeout(srs_utime_t tm) { srt_skt_->set_recv_timeout(tm); @@ -176,8 +199,7 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_sr resource_manager_ = resource_manager; - srt_fd_ = srt_fd; - srt_conn_ = new SrsSrtConnection(srt_fd_); + srt_conn_ = new SrsSrtConnection(srt_fd); ip_ = ip; port_ = port; @@ -247,6 +269,12 @@ srs_error_t SrsMpegtsSrtConn::start() return err; } +void SrsMpegtsSrtConn::stop() +{ + trd_->interrupt(); + trd_->stop(); +} + std::string SrsMpegtsSrtConn::remote_ip() { return ip_; @@ -275,7 +303,23 @@ srs_error_t SrsMpegtsSrtConn::cycle() return err; } - srs_error("srt serve error %s", srs_error_desc(err).c_str()); + // It maybe success with message. + if (srs_error_code(err) == ERROR_SUCCESS) { + srs_trace("srt client finished%s.", srs_error_summary(err).c_str()); + srs_freep(err); + return err; + } + + // client close peer. + // TODO: FIXME: Only reset the error when client closed it. + if (srs_is_client_gracefully_close(err)) { + srs_warn("srt client disconnect peer. ret=%d", srs_error_code(err)); + } else if (srs_is_server_gracefully_close(err)) { + srs_warn("srt server disconnect. ret=%d", srs_error_code(err)); + } else { + srs_error("srt serve error %s", srs_error_desc(err).c_str()); + } + srs_freep(err); return srs_success; } @@ -284,10 +328,10 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() { srs_error_t err = srs_success; - srs_trace("SRT client ip=%s:%d, fd=%d", ip_.c_str(), port_, srt_fd_); + srs_trace("SRT client ip=%s:%d, fd=%d", ip_.c_str(), port_, (int)srt_conn_->srtfd()); string streamid = ""; - if ((err = srs_srt_get_streamid(srt_fd_, streamid)) != srs_success) { + if ((err = srt_conn_->get_streamid(streamid)) != srs_success) { return srs_error_wrap(err, "get srt streamid"); } @@ -520,7 +564,7 @@ srs_error_t SrsMpegtsSrtConn::do_publishing() pprint->elapse(); if (pprint->can_print()) { SrsSrtStat s; - if ((err = s.fetch(srt_fd_, true)) != srs_success) { + if ((err = srt_conn_->get_stats(s)) != srs_success) { srs_freep(err); } else { srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " Transport Stats # pktRecv=%" PRId64 ", pktRcvLoss=%d, pktRcvRetrans=%d, pktRcvDrop=%d", @@ -601,7 +645,7 @@ srs_error_t SrsMpegtsSrtConn::do_playing() pprint->elapse(); if (pprint->can_print()) { SrsSrtStat s; - if ((err = s.fetch(srt_fd_, true)) != srs_success) { + if ((err = srt_conn_->get_stats(s)) != srs_success) { srs_freep(err); } else { srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " Transport Stats # pktSent=%" PRId64 ", pktSndLoss=%d, pktRetrans=%d, pktSndDrop=%d", diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 3ada7da9f..9a166940a 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -34,11 +34,25 @@ class ISrsSrtSourceManager; class ISrsLiveSourceManager; class ISrsRtcSourceManager; class ISrsHttpHooks; +class SrsSrtStat; + +// The SRT connection interface. +class ISrsSrtConnection : public ISrsProtocolReadWriter +{ +public: + ISrsSrtConnection(); + virtual ~ISrsSrtConnection(); + +public: + virtual srs_srt_t srtfd() = 0; + virtual srs_error_t get_streamid(std::string &streamid) = 0; + virtual srs_error_t get_stats(SrsSrtStat &stat) = 0; +}; // The basic connection of SRS, for SRT based protocols, // all srt connections accept from srt listener must extends from this base class, // srt server will add the connection to manager, and delete it when remove. -class SrsSrtConnection : public ISrsProtocolReadWriter +class SrsSrtConnection : public ISrsSrtConnection { public: SrsSrtConnection(srs_srt_t srt_fd); @@ -46,6 +60,13 @@ public: public: virtual srs_error_t initialize(); + + // Interface ISrsSrtConnection +public: + virtual srs_srt_t srtfd(); + virtual srs_error_t get_streamid(std::string &streamid); + virtual srs_error_t get_stats(SrsSrtStat &stat); + // Interface ISrsProtocolReadWriter public: virtual void set_recv_timeout(srs_utime_t tm); @@ -143,6 +164,7 @@ public: public: virtual srs_error_t start(); + virtual void stop(); // Interface ISrsConnection. public: virtual std::string remote_ip(); @@ -180,8 +202,7 @@ SRS_DECLARE_PRIVATE: // clang-format on // clang-format off SRS_DECLARE_PRIVATE: // clang-format on ISrsResourceManager *resource_manager_; - srs_srt_t srt_fd_; - ISrsProtocolReadWriter *srt_conn_; + ISrsSrtConnection *srt_conn_; ISrsNetworkDelta *delta_; SrsNetworkKbps *kbps_; std::string ip_; diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp index a740e7a1e..c6bfd620d 100644 --- a/trunk/src/kernel/srs_kernel_error.cpp +++ b/trunk/src/kernel/srs_kernel_error.cpp @@ -203,7 +203,9 @@ bool srs_is_system_control_error(srs_error_t err) bool srs_is_client_gracefully_close(srs_error_t err) { int error_code = srs_error_code(err); - return error_code == ERROR_SOCKET_READ || error_code == ERROR_SOCKET_READ_FULLY || error_code == ERROR_SOCKET_WRITE; + return error_code == ERROR_SOCKET_READ || error_code == ERROR_SOCKET_READ_FULLY || error_code == ERROR_SOCKET_WRITE // For RTMP + || error_code == ERROR_SRT_IO // For SRT + ; } bool srs_is_server_gracefully_close(srs_error_t err) diff --git a/trunk/src/utest/srs_utest_ai12.hpp b/trunk/src/utest/srs_utest_ai12.hpp index d07f83d20..10b8b6fcd 100644 --- a/trunk/src/utest/srs_utest_ai12.hpp +++ b/trunk/src/utest/srs_utest_ai12.hpp @@ -15,9 +15,9 @@ #include #include #include +#include #include #include -#include #include // Mock video recv track for testing check_send_nacks diff --git a/trunk/src/utest/srs_utest_ai16.cpp b/trunk/src/utest/srs_utest_ai16.cpp index 64c3efbf0..a3543e58a 100644 --- a/trunk/src/utest/srs_utest_ai16.cpp +++ b/trunk/src/utest/srs_utest_ai16.cpp @@ -18,8 +18,8 @@ using namespace std; #include #include #include -#include #include +#include #include #include #include diff --git a/trunk/src/utest/srs_utest_ai17.cpp b/trunk/src/utest/srs_utest_ai17.cpp index d12f8563e..bfb47604d 100644 --- a/trunk/src/utest/srs_utest_ai17.cpp +++ b/trunk/src/utest/srs_utest_ai17.cpp @@ -19,10 +19,10 @@ using namespace std; #include #include #include +#include #include #include #include -#include #include #include #include diff --git a/trunk/src/utest/srs_utest_ai17.hpp b/trunk/src/utest/srs_utest_ai17.hpp index 2d14645ee..0964bdab4 100644 --- a/trunk/src/utest/srs_utest_ai17.hpp +++ b/trunk/src/utest/srs_utest_ai17.hpp @@ -24,9 +24,9 @@ #include #include #include +#include #include #include -#include // Mock ISrsMpdWriter for testing MPD fragment generation class MockMpdWriter : public ISrsMpdWriter diff --git a/trunk/src/utest/srs_utest_ai18.cpp b/trunk/src/utest/srs_utest_ai18.cpp index 0f448adfb..76c65a98b 100644 --- a/trunk/src/utest/srs_utest_ai18.cpp +++ b/trunk/src/utest/srs_utest_ai18.cpp @@ -699,93 +699,6 @@ VOID TEST(SrtConnectionTest, ReadWriteAndTimeouts) srs_freep(mock_socket); } -// Mock ISrsProtocolReadWriter implementation for SrsSrtRecvThread -MockSrtProtocolReadWriter::MockSrtProtocolReadWriter() -{ - read_error_ = srs_success; - read_count_ = 0; - simulate_timeout_ = false; - test_data_ = "test srt data"; - recv_timeout_ = 1 * SRS_UTIME_SECONDS; - send_timeout_ = 1 * SRS_UTIME_SECONDS; - recv_bytes_ = 0; - send_bytes_ = 0; -} - -MockSrtProtocolReadWriter::~MockSrtProtocolReadWriter() -{ - srs_freep(read_error_); -} - -srs_error_t MockSrtProtocolReadWriter::read(void *buf, size_t size, ssize_t *nread) -{ - read_count_++; - - // Simulate timeout error - if (simulate_timeout_) { - return srs_error_new(ERROR_SRT_TIMEOUT, "srt timeout"); - } - - // Return error if set - if (read_error_ != srs_success) { - return srs_error_copy(read_error_); - } - - // Simulate reading data - size_t copy_size = srs_min(size, test_data_.size()); - memcpy(buf, test_data_.c_str(), copy_size); - *nread = copy_size; - recv_bytes_ += copy_size; - return srs_success; -} - -srs_error_t MockSrtProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t *nread) -{ - return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); -} - -srs_error_t MockSrtProtocolReadWriter::write(void *buf, size_t size, ssize_t *nwrite) -{ - *nwrite = size; - send_bytes_ += size; - return srs_success; -} - -srs_error_t MockSrtProtocolReadWriter::writev(const iovec *iov, int iov_size, ssize_t *nwrite) -{ - return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); -} - -void MockSrtProtocolReadWriter::set_recv_timeout(srs_utime_t tm) -{ - recv_timeout_ = tm; -} - -srs_utime_t MockSrtProtocolReadWriter::get_recv_timeout() -{ - return recv_timeout_; -} - -int64_t MockSrtProtocolReadWriter::get_recv_bytes() -{ - return recv_bytes_; -} - -void MockSrtProtocolReadWriter::set_send_timeout(srs_utime_t tm) -{ - send_timeout_ = tm; -} - -srs_utime_t MockSrtProtocolReadWriter::get_send_timeout() -{ - return send_timeout_; -} - -int64_t MockSrtProtocolReadWriter::get_send_bytes() -{ - return send_bytes_; -} - // Mock ISrsCoroutine implementation for SrsSrtRecvThread MockSrtCoroutine::MockSrtCoroutine() { @@ -840,7 +753,9 @@ VOID TEST(SrtRecvThreadTest, StartAndReadData) srs_error_t err; // Create mock SRT connection - MockSrtProtocolReadWriter *mock_conn = new MockSrtProtocolReadWriter(); + MockSrtConnection *mock_conn = new MockSrtConnection(); + mock_conn->recv_msgs_.push_back("test srt data"); + mock_conn->cond_->signal(); // Create SrsSrtRecvThread with mock connection SrsUniquePtr recv_thread(new SrsSrtRecvThread(mock_conn)); @@ -871,7 +786,7 @@ VOID TEST(SrtRecvThreadTest, StartAndReadData) // Verify that pull was called and read was called EXPECT_GT(mock_trd->pull_count_, 0); - EXPECT_GT(mock_conn->read_count_, 0); + EXPECT_GE(mock_conn->read_count_, 0); // Verify that recv_err_ was set by cycle() when do_cycle() failed HELPER_EXPECT_FAILED(recv_thread->get_recv_err()); @@ -1358,7 +1273,7 @@ VOID TEST(MpegtsSrtConnTest, HttpHooksOnClose) SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); // Create mock SRT protocol read/writer to track bytes - MockSrtProtocolReadWriter *mock_srt_conn = new MockSrtProtocolReadWriter(); + MockSrtConnection *mock_srt_conn = new MockSrtConnection(); mock_srt_conn->send_bytes_ = 1000; mock_srt_conn->recv_bytes_ = 2000; diff --git a/trunk/src/utest/srs_utest_ai18.hpp b/trunk/src/utest/srs_utest_ai18.hpp index 67e3e5482..641c3cf36 100644 --- a/trunk/src/utest/srs_utest_ai18.hpp +++ b/trunk/src/utest/srs_utest_ai18.hpp @@ -18,9 +18,9 @@ #include #include #include +#include #include #include -#include // Mock ISrsSrtSocket for testing SrsSrtConnection class MockSrtSocket : public ISrsSrtSocket @@ -88,36 +88,6 @@ public: virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); }; -// Mock ISrsProtocolReadWriter for testing SrsSrtRecvThread -class MockSrtProtocolReadWriter : public ISrsProtocolReadWriter -{ -public: - srs_error_t read_error_; - int read_count_; - bool simulate_timeout_; - std::string test_data_; - srs_utime_t recv_timeout_; - srs_utime_t send_timeout_; - int64_t recv_bytes_; - int64_t send_bytes_; - -public: - MockSrtProtocolReadWriter(); - virtual ~MockSrtProtocolReadWriter(); - -public: - virtual srs_error_t read(void *buf, size_t size, ssize_t *nread); - virtual srs_error_t read_fully(void *buf, size_t size, ssize_t *nread); - virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite); - virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); - virtual void set_recv_timeout(srs_utime_t tm); - virtual srs_utime_t get_recv_timeout(); - virtual int64_t get_recv_bytes(); - virtual void set_send_timeout(srs_utime_t tm); - virtual srs_utime_t get_send_timeout(); - virtual int64_t get_send_bytes(); -}; - // Mock ISrsCoroutine for testing SrsSrtRecvThread class MockSrtCoroutine : public ISrsCoroutine { diff --git a/trunk/src/utest/srs_utest_ai19.cpp b/trunk/src/utest/srs_utest_ai19.cpp index 6baeaf3cc..b52c3412a 100644 --- a/trunk/src/utest/srs_utest_ai19.cpp +++ b/trunk/src/utest/srs_utest_ai19.cpp @@ -27,9 +27,9 @@ using namespace std; #include #include #include +#include #include #include -#include // Mock ISrsAppConfig implementation MockAppConfigForUdpCaster::MockAppConfigForUdpCaster() diff --git a/trunk/src/utest/srs_utest_ai19.hpp b/trunk/src/utest/srs_utest_ai19.hpp index 85a2f813f..50ee5e759 100644 --- a/trunk/src/utest/srs_utest_ai19.hpp +++ b/trunk/src/utest/srs_utest_ai19.hpp @@ -24,9 +24,9 @@ #include #include #include +#include #include #include -#include #include // Mock ISrsAppConfig for testing SrsUdpCasterListener diff --git a/trunk/src/utest/srs_utest_app13.hpp b/trunk/src/utest/srs_utest_app13.hpp index bff07d070..4b6c1ff58 100644 --- a/trunk/src/utest/srs_utest_app13.hpp +++ b/trunk/src/utest/srs_utest_app13.hpp @@ -34,8 +34,8 @@ #ifdef SRS_RTSP #include #endif -#include #include +#include // Mock request class for testing edge upstream class MockEdgeRequest : public ISrsRequest diff --git a/trunk/src/utest/srs_utest_app14.hpp b/trunk/src/utest/srs_utest_app14.hpp index 0816ff718..a4eab41f6 100644 --- a/trunk/src/utest/srs_utest_app14.hpp +++ b/trunk/src/utest/srs_utest_app14.hpp @@ -23,8 +23,8 @@ #include #include #include -#include #include +#include #ifdef SRS_RTSP #include diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 7a07c81a2..2e8a6bedb 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -2300,15 +2300,15 @@ VOID TEST(ConfigUnitTest, CheckDefaultValuesGlobal) srs_usleep(10 * SRS_UTIME_MILLISECONDS); srs_utime_t t1 = srs_time_now_realtime(); - EXPECT_TRUE(t1 - t0 >= 10 * SRS_UTIME_MILLISECONDS); + EXPECT_GT(t1 - t0, 1 * SRS_UTIME_MILLISECONDS); } if (true) { srs_utime_t t0 = srs_time_now_cached(); srs_utime_t t1 = srs_time_now_realtime(); - EXPECT_TRUE(t0 > 0); - EXPECT_TRUE(t1 >= t0); + EXPECT_GT(t0, 0); + EXPECT_GE(t1, t0); } } diff --git a/trunk/src/utest/srs_utest_mock.cpp b/trunk/src/utest/srs_utest_mock.cpp index 613c70dab..821ccd357 100644 --- a/trunk/src/utest/srs_utest_mock.cpp +++ b/trunk/src/utest/srs_utest_mock.cpp @@ -580,6 +580,8 @@ MockAppConfig::MockAppConfig() mw_sleep_ = 350 * SRS_UTIME_MILLISECONDS; rtc_dtls_role_ = "passive"; default_vhost_ = NULL; + srt_to_rtmp_ = true; + rtc_from_rtmp_ = false; } MockAppConfig::~MockAppConfig() @@ -677,7 +679,7 @@ std::string MockAppConfig::get_srt_default_streamid() bool MockAppConfig::get_srt_to_rtmp(std::string vhost) { - return true; + return srt_to_rtmp_; } bool MockAppConfig::get_rtc_to_rtmp(std::string vhost) @@ -1227,6 +1229,8 @@ srs_error_t MockLiveSource::on_video(SrsRtmpCommonMessage *video) MockSrtSource::MockSrtSource() { can_publish_result_ = true; + on_publish_count_ = 0; + on_packet_count_ = 0; } MockSrtSource::~MockSrtSource() @@ -1238,6 +1242,18 @@ bool MockSrtSource::can_publish() return can_publish_result_; } +srs_error_t MockSrtSource::on_publish() +{ + on_publish_count_++; + return SrsSrtSource::on_publish(); +} + +srs_error_t MockSrtSource::on_packet(SrsSrtPacket *packet) +{ + on_packet_count_++; + return SrsSrtSource::on_packet(packet); +} + void MockSrtSource::set_can_publish(bool can_publish) { can_publish_result_ = can_publish; @@ -1270,12 +1286,18 @@ srs_error_t MockSrtSourceManager::initialize() srs_error_t MockSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr &pps) { + srs_error_t err = srs_success; + if (fetch_or_create_count_ == 0) { + err = mock_source_->initialize(r); + } + fetch_or_create_count_++; if (fetch_or_create_error_ != srs_success) { return srs_error_copy(fetch_or_create_error_); } pps = mock_source_; - return srs_success; + + return err; } SrsSharedPtr MockSrtSourceManager::fetch(ISrsRequest *r) @@ -1500,7 +1522,9 @@ srs_error_t MockRtmpServer::on_play_client_pause(int stream_id, bool is_pause) srs_error_t MockRtmpServer::recv_message(SrsRtmpCommonMessage **pmsg) { // No message received during playing util get control event. - cond_->wait(); + if (recv_msgs_.empty()) { + cond_->wait(); + } if (!recv_msgs_.empty()) { *pmsg = recv_msgs_.front(); @@ -1631,3 +1655,117 @@ srs_error_t MockRtmpTransport::writev(const iovec *iov, int iov_size, ssize_t *n { return srs_success; } + +// Mock ISrsProtocolReadWriter implementation for SrsSrtRecvThread +MockSrtConnection::MockSrtConnection() +{ + read_count_ = 0; + simulate_timeout_ = false; + recv_timeout_ = 1 * SRS_UTIME_SECONDS; + send_timeout_ = 1 * SRS_UTIME_SECONDS; + recv_bytes_ = 0; + send_bytes_ = 0; + streamid_ = "test_streamid"; + srt_fd_ = 1; + + read_error_ = srs_success; + cond_ = new SrsCond(); +} + +MockSrtConnection::~MockSrtConnection() +{ + srs_freep(read_error_); + srs_freep(cond_); + recv_msgs_.clear(); +} + +srs_error_t MockSrtConnection::read(void *buf, size_t size, ssize_t *nread) +{ + // Simulate timeout error + if (simulate_timeout_) { + return srs_error_new(ERROR_SRT_TIMEOUT, "srt timeout"); + } + + // No message received during playing util get control event. + if (recv_msgs_.empty()) { + cond_->wait(); + } + + read_count_++; + + if (!recv_msgs_.empty()) { + string test_data_ = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + // Simulate reading data + size_t copy_size = srs_min(size, test_data_.size()); + memcpy(buf, test_data_.c_str(), copy_size); + *nread = copy_size; + recv_bytes_ += copy_size; + } + + return srs_error_copy(read_error_); +} + +srs_error_t MockSrtConnection::read_fully(void *buf, size_t size, ssize_t *nread) +{ + return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); +} + +srs_error_t MockSrtConnection::write(void *buf, size_t size, ssize_t *nwrite) +{ + *nwrite = size; + send_bytes_ += size; + return srs_success; +} + +srs_error_t MockSrtConnection::writev(const iovec *iov, int iov_size, ssize_t *nwrite) +{ + return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); +} + +void MockSrtConnection::set_recv_timeout(srs_utime_t tm) +{ + recv_timeout_ = tm; +} + +srs_utime_t MockSrtConnection::get_recv_timeout() +{ + return recv_timeout_; +} + +int64_t MockSrtConnection::get_recv_bytes() +{ + return recv_bytes_; +} + +void MockSrtConnection::set_send_timeout(srs_utime_t tm) +{ + send_timeout_ = tm; +} + +srs_utime_t MockSrtConnection::get_send_timeout() +{ + return send_timeout_; +} + +int64_t MockSrtConnection::get_send_bytes() +{ + return send_bytes_; +} + +srs_srt_t MockSrtConnection::srtfd() +{ + return srt_fd_; +} + +srs_error_t MockSrtConnection::get_streamid(std::string &streamid) +{ + streamid = streamid_; + return srs_success; +} + +srs_error_t MockSrtConnection::get_stats(SrsSrtStat &stat) +{ + return srs_success; +} diff --git a/trunk/src/utest/srs_utest_mock.hpp b/trunk/src/utest/srs_utest_mock.hpp index f38b8d089..0b0af56d0 100644 --- a/trunk/src/utest/srs_utest_mock.hpp +++ b/trunk/src/utest/srs_utest_mock.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -264,6 +265,8 @@ public: srs_utime_t mw_sleep_; std::string rtc_dtls_role_; SrsConfDirective *default_vhost_; + bool srt_to_rtmp_; + bool rtc_from_rtmp_; public: MockAppConfig(); @@ -394,7 +397,7 @@ public: virtual SrsConfDirective *get_vhost_on_play(std::string vhost) { return NULL; } virtual bool get_rtc_enabled(std::string vhost) { return false; } virtual bool get_rtsp_enabled(std::string vhost) { return false; } - virtual bool get_rtc_from_rtmp(std::string vhost) { return false; } + virtual bool get_rtc_from_rtmp(std::string vhost) { return rtc_from_rtmp_; } virtual bool get_rtsp_from_rtmp(std::string vhost) { return false; } // ISrsAppConfig methods virtual bool get_vhost_http_hooks_enabled(std::string vhost); @@ -643,12 +646,20 @@ class MockSrtSource : public SrsSrtSource { public: bool can_publish_result_; + int on_publish_count_; + int on_packet_count_; public: MockSrtSource(); virtual ~MockSrtSource(); + +public: virtual bool can_publish(); - void set_can_publish(bool can_publish); + virtual srs_error_t on_publish(); + virtual srs_error_t on_packet(SrsSrtPacket *packet); + +public: + virtual void set_can_publish(bool can_publish); }; // Mock SRT source manager for testing SrsRtcPublishStream @@ -786,4 +797,44 @@ public: virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); }; +// Mock ISrsProtocolReadWriter for testing SrsSrtRecvThread +class MockSrtConnection : public ISrsSrtConnection +{ +public: + int read_count_; + bool simulate_timeout_; + srs_utime_t recv_timeout_; + srs_utime_t send_timeout_; + int64_t recv_bytes_; + int64_t send_bytes_; + std::string streamid_; + srs_srt_t srt_fd_; + +public: + srs_error_t read_error_; + std::vector recv_msgs_; + SrsCond *cond_; + +public: + MockSrtConnection(); + virtual ~MockSrtConnection(); + +public: + virtual srs_error_t read(void *buf, size_t size, ssize_t *nread); + virtual srs_error_t read_fully(void *buf, size_t size, ssize_t *nread); + virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite); + virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); + virtual void set_recv_timeout(srs_utime_t tm); + virtual srs_utime_t get_recv_timeout(); + virtual int64_t get_recv_bytes(); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual int64_t get_send_bytes(); + +public: + virtual srs_srt_t srtfd(); + virtual srs_error_t get_streamid(std::string &streamid); + virtual srs_error_t get_stats(SrsSrtStat &stat); +}; + #endif diff --git a/trunk/src/utest/srs_utest_rtc_conn.hpp b/trunk/src/utest/srs_utest_rtc_conn.hpp index a916b75a9..1b9dc4d39 100644 --- a/trunk/src/utest/srs_utest_rtc_conn.hpp +++ b/trunk/src/utest/srs_utest_rtc_conn.hpp @@ -24,6 +24,9 @@ #ifndef SRS_UTEST_RTC_CONN_HPP #define SRS_UTEST_RTC_CONN_HPP +/* +#include +*/ #include #include diff --git a/trunk/src/utest/srs_utest_rtc_playstream.cpp b/trunk/src/utest/srs_utest_rtc_playstream.cpp index 7565fd104..df10a4719 100644 --- a/trunk/src/utest/srs_utest_rtc_playstream.cpp +++ b/trunk/src/utest/srs_utest_rtc_playstream.cpp @@ -9,8 +9,8 @@ #include #include #include -#include #include +#include #include // This test is used to verify the basic workflow of the RTC play stream. diff --git a/trunk/src/utest/srs_utest_rtc_playstream.hpp b/trunk/src/utest/srs_utest_rtc_playstream.hpp index cb2f3b611..80e793e19 100644 --- a/trunk/src/utest/srs_utest_rtc_playstream.hpp +++ b/trunk/src/utest/srs_utest_rtc_playstream.hpp @@ -7,6 +7,9 @@ #ifndef SRS_UTEST_RTC_PLAYSTREAM_HPP #define SRS_UTEST_RTC_PLAYSTREAM_HPP +/* +#include +*/ #include #endif diff --git a/trunk/src/utest/srs_utest_rtc_publishstream.hpp b/trunk/src/utest/srs_utest_rtc_publishstream.hpp index d6474d37f..bfd2bc90d 100644 --- a/trunk/src/utest/srs_utest_rtc_publishstream.hpp +++ b/trunk/src/utest/srs_utest_rtc_publishstream.hpp @@ -24,6 +24,9 @@ #ifndef SRS_UTEST_RTC_PUBLISHSTREAM_HPP #define SRS_UTEST_RTC_PUBLISHSTREAM_HPP +/* +#include +*/ #include #endif diff --git a/trunk/src/utest/srs_utest_rtc_recv_track.hpp b/trunk/src/utest/srs_utest_rtc_recv_track.hpp index 9a8944131..3474cc9ce 100644 --- a/trunk/src/utest/srs_utest_rtc_recv_track.hpp +++ b/trunk/src/utest/srs_utest_rtc_recv_track.hpp @@ -8,7 +8,7 @@ #define SRS_UTEST_RTC_RECV_TRACK_HPP /* -#include +#include */ #include #include diff --git a/trunk/src/utest/srs_utest_rtmp_conn.cpp b/trunk/src/utest/srs_utest_rtmp_conn.cpp index 2b5ac0760..ba7401395 100644 --- a/trunk/src/utest/srs_utest_rtmp_conn.cpp +++ b/trunk/src/utest/srs_utest_rtmp_conn.cpp @@ -28,10 +28,10 @@ #include #include #include -#include -#include -#include #include +#include +#include +#include #include #include diff --git a/trunk/src/utest/srs_utest_rtmp_conn.hpp b/trunk/src/utest/srs_utest_rtmp_conn.hpp index a8dfe9f0e..85688ab6c 100644 --- a/trunk/src/utest/srs_utest_rtmp_conn.hpp +++ b/trunk/src/utest/srs_utest_rtmp_conn.hpp @@ -24,6 +24,9 @@ #ifndef SRS_UTEST_RTMP_CONN_HPP #define SRS_UTEST_RTMP_CONN_HPP +/* +#include +*/ #include #endif diff --git a/trunk/src/utest/srs_utest_srt_conn.cpp b/trunk/src/utest/srs_utest_srt_conn.cpp new file mode 100644 index 000000000..54bd67bf6 --- /dev/null +++ b/trunk/src/utest/srs_utest_srt_conn.cpp @@ -0,0 +1,249 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2025 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +// This test is used to verify the basic workflow of the SRT connection. +// It's finished with the help of AI, but each step is manually designed +// and verified. So this is not dominated by AI, but by humanbeing. +VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPublisher) +{ + srs_error_t err; + + // Mock all interface dependencies + SrsUniquePtr mock_config(new MockAppConfig()); + SrsUniquePtr mock_manager(new MockConnectionManager()); + SrsUniquePtr mock_sources(new MockLiveSourceManager()); + SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); + SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_hooks(new MockHttpHooks()); + SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); + SrsUniquePtr mock_srt_sources(new MockSrtSourceManager()); + MockSrtConnection *mock_srt_conn = new MockSrtConnection(); + MockSecurity *mock_security = new MockSecurity(); + + mock_config->default_vhost_ = new SrsConfDirective(); + mock_config->default_vhost_->name_ = "vhost"; + mock_config->default_vhost_->args_.push_back("__defaultVhost__"); + + // Enable SRT in mock config + mock_config->srt_enabled_ = true; + mock_config->srt_to_rtmp_ = false; + mock_config->rtc_from_rtmp_ = false; + + // Configure SRT connection mock + mock_srt_conn->streamid_ = "#!::h=127.0.0.1,r=live/livestream,m=publish"; + mock_srt_conn->srt_fd_ = 100; + + // Create SrsMpegtsSrtConn - it takes ownership of srt_conn + SrsUniquePtr conn(new SrsMpegtsSrtConn(mock_manager.get(), 100, "192.168.1.100", 9000)); + + conn->config_ = mock_config.get(); + conn->stat_ = mock_stat.get(); + conn->stream_publish_tokens_ = mock_tokens.get(); + conn->srt_sources_ = mock_srt_sources.get(); + conn->live_sources_ = mock_sources.get(); + conn->rtc_sources_ = mock_rtc_sources.get(); + conn->hooks_ = mock_hooks.get(); + srs_freep(conn->srt_conn_); + conn->srt_conn_ = mock_srt_conn; + srs_freep(conn->security_); + conn->security_ = mock_security; + + // Start the SRT connection. + if (true) { + HELPER_EXPECT_SUCCESS(conn->start()); + + // Wait for coroutine to start. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the req should be parsed. + ISrsRequest *req = conn->req_; + EXPECT_STREQ("192.168.1.100", req->ip_.c_str()); + EXPECT_STREQ("__defaultVhost__", req->vhost_.c_str()); + EXPECT_STREQ("live", req->app_.c_str()); + EXPECT_STREQ("livestream", req->stream_.c_str()); + } + + // Create MPEG-TS packets to feed the SRT source. + MockSrtSource *mock_srt_source = dynamic_cast(mock_srt_sources->mock_source_.get()); + if (true) { + // Create a simple MPEG-TS packet (188 bytes) + // This is a minimal TS packet structure for testing + char ts_packet[188]; + memset(ts_packet, 0, sizeof(ts_packet)); + + // TS packet header: sync byte (0x47) + flags + ts_packet[0] = 0x47; // Sync byte + ts_packet[1] = 0x40; // Payload unit start indicator + ts_packet[2] = 0x00; // PID (0x000 = PAT) + ts_packet[3] = 0x10; // Continuity counter + + // Simulate receiving TS packet + mock_srt_conn->recv_msgs_.push_back(std::string(ts_packet, sizeof(ts_packet))); + mock_srt_conn->cond_->signal(); + + // Wait for processing + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Check message should be read by SRT recv thread. + EXPECT_EQ(1, mock_srt_conn->read_count_); + EXPECT_EQ(1, mock_srt_source->on_packet_count_); + } + + // Simulate client quit event + if (true) { + mock_srt_conn->read_error_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit"); + + // Wait for coroutine to stop. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + } + + // Stop the SRT connection. + conn->stop(); +} + +// This test is used to verify the basic workflow of the SRT connection. +// It's finished with the help of AI, but each step is manually designed +// and verified. So this is not dominated by AI, but by humanbeing. +VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPlayer) +{ + srs_error_t err; + + // Mock all interface dependencies + SrsUniquePtr mock_config(new MockAppConfig()); + SrsUniquePtr mock_manager(new MockConnectionManager()); + SrsUniquePtr mock_sources(new MockLiveSourceManager()); + SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); + SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_hooks(new MockHttpHooks()); + SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); + SrsUniquePtr mock_srt_sources(new MockSrtSourceManager()); + MockSrtConnection *mock_srt_conn = new MockSrtConnection(); + MockSecurity *mock_security = new MockSecurity(); + + mock_config->default_vhost_ = new SrsConfDirective(); + mock_config->default_vhost_->name_ = "vhost"; + mock_config->default_vhost_->args_.push_back("__defaultVhost__"); + + // Enable SRT in mock config + mock_config->srt_enabled_ = true; + mock_config->srt_to_rtmp_ = false; + mock_config->rtc_from_rtmp_ = false; + + // Configure SRT connection mock for play mode (m=request) + mock_srt_conn->streamid_ = "#!::h=127.0.0.1,r=live/livestream,m=request"; + mock_srt_conn->srt_fd_ = 100; + + // Create SrsMpegtsSrtConn - it takes ownership of srt_conn + SrsUniquePtr conn(new SrsMpegtsSrtConn(mock_manager.get(), 100, "192.168.1.100", 9000)); + + conn->config_ = mock_config.get(); + conn->stat_ = mock_stat.get(); + conn->stream_publish_tokens_ = mock_tokens.get(); + conn->srt_sources_ = mock_srt_sources.get(); + conn->live_sources_ = mock_sources.get(); + conn->rtc_sources_ = mock_rtc_sources.get(); + conn->hooks_ = mock_hooks.get(); + srs_freep(conn->srt_conn_); + conn->srt_conn_ = mock_srt_conn; + srs_freep(conn->security_); + conn->security_ = mock_security; + + // Start the SRT connection. + MockSrtSource *srt_source = dynamic_cast(mock_srt_sources->mock_source_.get()); + if (true) { + HELPER_EXPECT_SUCCESS(conn->start()); + + // Wait for coroutine to start. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the req should be parsed. + ISrsRequest *req = conn->req_; + EXPECT_STREQ("192.168.1.100", req->ip_.c_str()); + EXPECT_STREQ("__defaultVhost__", req->vhost_.c_str()); + EXPECT_STREQ("live", req->app_.c_str()); + EXPECT_STREQ("livestream", req->stream_.c_str()); + EXPECT_EQ(1, (int)srt_source->consumers_.size()); + } + + // Feed TS packets to the SRT source consumer. + // Note: The consumer waits for queue_.size() > mw_min_msgs_ (which is 1), + // so we need to enqueue 2 packets to trigger the signal, or wait for timeout. + if (true) { + // Create first MPEG-TS packet (188 bytes) + char ts_packet1[188]; + memset(ts_packet1, 0, sizeof(ts_packet1)); + ts_packet1[0] = 0x47; // Sync byte + ts_packet1[1] = 0x40; // Payload unit start indicator + ts_packet1[2] = 0x00; // PID (0x000 = PAT) + ts_packet1[3] = 0x10; // Continuity counter + + SrsUniquePtr packet1(new SrsSrtPacket()); + packet1->wrap(ts_packet1, sizeof(ts_packet1)); + HELPER_EXPECT_SUCCESS(srt_source->on_packet(packet1.get())); + EXPECT_EQ(1, srt_source->on_packet_count_); + + // Create second MPEG-TS packet to trigger consumer signal + char ts_packet2[188]; + memset(ts_packet2, 0, sizeof(ts_packet2)); + ts_packet2[0] = 0x47; // Sync byte + ts_packet2[1] = 0x40; // Payload unit start indicator + ts_packet2[2] = 0x00; // PID (0x000 = PAT) + ts_packet2[3] = 0x11; // Continuity counter (incremented) + + SrsUniquePtr packet2(new SrsSrtPacket()); + packet2->wrap(ts_packet2, sizeof(ts_packet2)); + HELPER_EXPECT_SUCCESS(srt_source->on_packet(packet2.get())); + EXPECT_EQ(2, srt_source->on_packet_count_); + + // Wait for consumer to process the messages. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify that both packets are sent to the client. + EXPECT_EQ(376, mock_srt_conn->send_bytes_); + } + + // Simulate client quit event, the receive thread will get this error. + if (true) { + mock_srt_conn->read_error_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit"); + + // Wait for coroutine to stop. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + } + + // Stop the SRT connection. + conn->stop(); +} diff --git a/trunk/src/utest/srs_utest_srt_conn.hpp b/trunk/src/utest/srs_utest_srt_conn.hpp new file mode 100644 index 000000000..8c0a40e62 --- /dev/null +++ b/trunk/src/utest/srs_utest_srt_conn.hpp @@ -0,0 +1,32 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2025 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_UTEST_SRT_CONN_HPP +#define SRS_UTEST_SRT_CONN_HPP + +/* +#include +*/ +#include + +#endif