From 8fd92d1598f6a251e4fa611d7ba0b353019a0482 Mon Sep 17 00:00:00 2001 From: OSSRS-AI Date: Tue, 21 Oct 2025 21:46:54 -0400 Subject: [PATCH] AI: Add utest to cover forwarding module. #4531 --- trunk/conf/full.conf | 9 +- trunk/configure | 2 +- trunk/src/app/srs_app_forward.cpp | 13 +- trunk/src/app/srs_app_forward.hpp | 15 +- trunk/src/app/srs_app_hls.cpp | 2 +- trunk/src/app/srs_app_hls.hpp | 7 +- trunk/src/app/srs_app_rtmp_source.hpp | 4 + trunk/src/protocol/srs_protocol_rtmp_conn.hpp | 1 + trunk/src/utest/srs_utest_ai11.cpp | 20 - trunk/src/utest/srs_utest_ai12.cpp | 6 - trunk/src/utest/srs_utest_ai13.cpp | 30 -- trunk/src/utest/srs_utest_ai13.hpp | 15 - trunk/src/utest/srs_utest_ai14.cpp | 129 +----- trunk/src/utest/srs_utest_ai14.hpp | 43 +- trunk/src/utest/srs_utest_ai18.cpp | 117 ----- trunk/src/utest/srs_utest_ai19.cpp | 198 +------- trunk/src/utest/srs_utest_ai19.hpp | 67 +-- trunk/src/utest/srs_utest_ai22.cpp | 247 +--------- trunk/src/utest/srs_utest_ai22.hpp | 63 +-- trunk/src/utest/srs_utest_ai23.cpp | 83 +--- trunk/src/utest/srs_utest_ai23.hpp | 30 -- trunk/src/utest/srs_utest_forward.cpp | 252 +++++++++++ trunk/src/utest/srs_utest_forward.hpp | 51 +++ trunk/src/utest/srs_utest_http_conn.cpp | 6 +- trunk/src/utest/srs_utest_mock.cpp | 428 +++++++++++++----- trunk/src/utest/srs_utest_mock.hpp | 108 ++++- trunk/src/utest/srs_utest_rtc_conn.cpp | 4 +- trunk/src/utest/srs_utest_rtc_playstream.cpp | 2 +- .../src/utest/srs_utest_rtc_publishstream.cpp | 2 +- trunk/src/utest/srs_utest_rtmp_conn.cpp | 4 +- trunk/src/utest/srs_utest_service.cpp | 7 + trunk/src/utest/srs_utest_srt_conn.cpp | 4 +- 32 files changed, 792 insertions(+), 1177 deletions(-) create mode 100644 trunk/src/utest/srs_utest_forward.cpp create mode 100644 trunk/src/utest/srs_utest_forward.hpp diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 503c19199..dd0900188 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -1711,12 +1711,9 @@ vhost same.vhost.forward.srs.com { # ] # } # } - # PS: you can transform params to backend service, such as: - # { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" } - # then backend return forward's url in response. - # if backend return empty urls, destanition is still disabled. - # only support one api hook, format: - # backend http://xxx/api0 + # If backend response empty urls, the forwarding is also disabled. + # Only one backend is supported, and the format should be: + # backend http://hostport/api backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/configure b/trunk/configure index 780a795db..cac0f9b41 100755 --- a/trunk/configure +++ b/trunk/configure @@ -386,7 +386,7 @@ if [[ $SRS_UTEST == YES ]]; then MODULE_FILES+=("srs_utest_ai01" "srs_utest_ai02" "srs_utest_ai03" "srs_utest_ai04" "srs_utest_ai05" "srs_utest_ai06" "srs_utest_ai07" "srs_utest_ai08" "srs_utest_ai09" "srs_utest_ai10" "srs_utest_ai11" "srs_utest_ai12" "srs_utest_ai13" "srs_utest_ai14" "srs_utest_ai15" "srs_utest_ai16" "srs_utest_ai17" - "srs_utest_ai18" "srs_utest_ai19" "srs_utest_ai20") + "srs_utest_ai18" "srs_utest_ai19" "srs_utest_ai20" "srs_utest_forward") if [[ $SRS_GB28181 == YES ]]; then MODULE_FILES+=("srs_utest_gb28181" "srs_utest_ai23") fi diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index f7962001e..59501c3b1 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -14,6 +14,7 @@ using namespace std; #include +#include #include #include #include @@ -38,7 +39,7 @@ ISrsForwarder::~ISrsForwarder() { } -SrsForwarder::SrsForwarder(SrsOriginHub *h) +SrsForwarder::SrsForwarder(ISrsOriginHub *h) { hub_ = h; @@ -49,6 +50,9 @@ SrsForwarder::SrsForwarder(SrsOriginHub *h) trd_ = new SrsDummyCoroutine(); queue_ = new SrsMessageQueue(); jitter_ = new SrsRtmpJitter(); + + app_factory_ = _srs_app_factory; + config_ = _srs_config; } SrsForwarder::~SrsForwarder() @@ -62,6 +66,9 @@ SrsForwarder::~SrsForwarder() srs_freep(sh_audio_); srs_freep(req_); + + app_factory_ = NULL; + config_ = NULL; } srs_error_t SrsForwarder::initialize(ISrsRequest *r, string ep) @@ -229,7 +236,7 @@ srs_error_t SrsForwarder::do_cycle() srs_freep(sdk_); srs_utime_t cto = SRS_FORWARDER_CIMS; srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT; - sdk_ = new SrsSimpleRtmpClient(url, cto, sto); + sdk_ = app_factory_->create_rtmp_client(url, cto, sto); if ((err = sdk_->connect()) != srs_success) { return srs_error_wrap(err, "sdk connect url=%s, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); @@ -238,7 +245,7 @@ srs_error_t SrsForwarder::do_cycle() // For RTMP client, we pass the vhost in tcUrl when connecting, // so we publish without vhost in stream. string stream; - if ((err = sdk_->publish(_srs_config->get_chunk_size(req_->vhost_), false, &stream)) != srs_success) { + if ((err = sdk_->publish(config_->get_chunk_size(req_->vhost_), false, &stream)) != srs_success) { return srs_error_wrap(err, "sdk publish"); } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index c2b464bfa..73c1d624c 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -22,8 +22,12 @@ class SrsRtmpClient; class ISrsRequest; class SrsLiveSource; class SrsOriginHub; +class ISrsOriginHub; class SrsKbps; class SrsSimpleRtmpClient; +class ISrsBasicRtmpClient; +class ISrsAppFactory; +class ISrsAppConfig; // The forward interface. class ISrsForwarder @@ -45,6 +49,11 @@ public: // Forward the stream to other servers. class SrsForwarder : public ISrsCoroutineHandler, public ISrsForwarder { +// clang-format off +SRS_DECLARE_PRIVATE: // clang-format on + ISrsAppFactory *app_factory_; + ISrsAppConfig *config_; + // clang-format off SRS_DECLARE_PRIVATE: // clang-format on // The ep to forward, server[:port]. @@ -62,8 +71,8 @@ SRS_DECLARE_PRIVATE: // clang-format on // clang-format off SRS_DECLARE_PRIVATE: // clang-format on - SrsOriginHub *hub_; - SrsSimpleRtmpClient *sdk_; + ISrsOriginHub *hub_; + ISrsBasicRtmpClient *sdk_; SrsRtmpJitter *jitter_; SrsMessageQueue *queue_; // Cache the sequence header for retry when slave is failed. @@ -71,7 +80,7 @@ SRS_DECLARE_PRIVATE: // clang-format on SrsMediaPacket *sh_video_; public: - SrsForwarder(SrsOriginHub *h); + SrsForwarder(ISrsOriginHub *h); virtual ~SrsForwarder(); public: diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 3a21c294f..c8f0e19f6 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -2616,7 +2616,7 @@ srs_utime_t SrsHls::cleanup_delay() // IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. // This prevents the race condition where multiple coroutines could create duplicate sources // for the same stream when context switches occurred during initialization. -srs_error_t SrsHls::initialize(SrsOriginHub *h, ISrsRequest *r) +srs_error_t SrsHls::initialize(ISrsOriginHub *h, ISrsRequest *r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index bc27fae54..2623980f0 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -27,6 +27,7 @@ class ISrsRequest; class SrsPithyPrint; class SrsLiveSource; class SrsOriginHub; +class ISrsOriginHub; class ISrsFileWriter; class ISrsAppConfig; class ISrsHttpHooks; @@ -657,7 +658,7 @@ public: virtual ~ISrsHls(); public: - virtual srs_error_t initialize(SrsOriginHub *h, ISrsRequest *r) = 0; + virtual srs_error_t initialize(ISrsOriginHub *h, ISrsRequest *r) = 0; virtual srs_error_t on_audio(SrsMediaPacket *shared_audio, SrsFormat *format) = 0; virtual srs_error_t on_video(SrsMediaPacket *shared_video, SrsFormat *format) = 0; virtual srs_error_t on_publish() = 0; @@ -698,7 +699,7 @@ SRS_DECLARE_PRIVATE: // clang-format on // clang-format off SRS_DECLARE_PRIVATE: // clang-format on - SrsOriginHub *hub_; + ISrsOriginHub *hub_; SrsRtmpJitter *jitter_; SrsPithyPrint *pprint_; @@ -721,7 +722,7 @@ public: public: // Initialize the hls by handler and source. - virtual srs_error_t initialize(SrsOriginHub *h, ISrsRequest *r); + virtual srs_error_t initialize(ISrsOriginHub *h, ISrsRequest *r); // Publish stream event, continue to write the m3u8, // for the muxer object not destroyed. // @param fetch_sequence_header whether fetch sequence from source. diff --git a/trunk/src/app/srs_app_rtmp_source.hpp b/trunk/src/app/srs_app_rtmp_source.hpp index 1cbdbc7a8..8adc17ac0 100644 --- a/trunk/src/app/srs_app_rtmp_source.hpp +++ b/trunk/src/app/srs_app_rtmp_source.hpp @@ -413,6 +413,10 @@ public: virtual void on_unpublish() = 0; // When DVR requests sequence header. virtual srs_error_t on_dvr_request_sh() = 0; + // When HLS requests sequence header. + virtual srs_error_t on_hls_request_sh() = 0; + // When forwarder requests sequence header. + virtual srs_error_t on_forwarder_start(SrsForwarder *forwarder) = 0; }; // The hub for origin is a collection of utilities for origin only, diff --git a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp index c56e63fc2..ca12f7d17 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp @@ -41,6 +41,7 @@ public: virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL) = 0; // Sample kbps for statistics. virtual void kbps_sample(const char *label, srs_utime_t age) = 0; + virtual void kbps_sample(const char *label, srs_utime_t age, int msgs) = 0; // Get stream ID. virtual int sid() = 0; diff --git a/trunk/src/utest/srs_utest_ai11.cpp b/trunk/src/utest/srs_utest_ai11.cpp index 8380d5973..5fb340550 100644 --- a/trunk/src/utest/srs_utest_ai11.cpp +++ b/trunk/src/utest/srs_utest_ai11.cpp @@ -1907,9 +1907,6 @@ VOID TEST(RtcPublishStreamTest, SendRtcpRrSuccess) publish_stream->video_tracks_.push_back(video_track); publish_stream->audio_tracks_.push_back(audio_track); - // Reset receiver count before test - mock_receiver.reset(); - // Test send_rtcp_rr - should succeed when all tracks succeed HELPER_EXPECT_SUCCESS(publish_stream->send_rtcp_rr()); @@ -1942,9 +1939,6 @@ VOID TEST(RtcPublishStreamTest, SendRtcpRrVideoTrackError) // Add track to publish stream (only video track to simplify) publish_stream->video_tracks_.push_back(video_track); - // Reset receiver count before test - mock_receiver.reset(); - // Set receiver to return error after reset srs_error_t mock_error = srs_error_new(ERROR_RTC_RTCP, "mock rtcp rr error"); mock_receiver.set_send_rtcp_rr_error(mock_error); @@ -2020,9 +2014,6 @@ VOID TEST(RtcPublishStreamTest, SendRtcpRrMultipleTracks) publish_stream->audio_tracks_.push_back(audio_track1); publish_stream->audio_tracks_.push_back(audio_track2); - // Reset receiver count before test - mock_receiver.reset(); - // Test send_rtcp_rr - should succeed when all tracks succeed HELPER_EXPECT_SUCCESS(publish_stream->send_rtcp_rr()); @@ -3725,17 +3716,6 @@ VOID TEST(RtcAsyncCallOnUnpublishTest, CallWithContextSwitching) // because it uses _srs_context directly instead of a member variable like SrsRtcAsyncCallOnStop } -void MockSrtSourceManager::reset() -{ - srs_freep(initialize_error_); - srs_freep(fetch_or_create_error_); - initialize_error_ = srs_success; - fetch_or_create_error_ = srs_success; - initialize_count_ = 0; - fetch_or_create_count_ = 0; - can_publish_ = true; -} - VOID TEST(RtcPublishStreamTest, Initialize) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_ai12.cpp b/trunk/src/utest/srs_utest_ai12.cpp index f784526d9..8b5cd25dd 100644 --- a/trunk/src/utest/srs_utest_ai12.cpp +++ b/trunk/src/utest/srs_utest_ai12.cpp @@ -420,9 +420,6 @@ VOID TEST(SrsRtcPublishStreamTest, SendPeriodicTwccTypicalScenario) // send_periodic_twcc should fail due to receiver error HELPER_EXPECT_FAILED(publish_stream->send_periodic_twcc()); - - // Reset receiver error for cleanup - mock_receiver.reset(); } VOID TEST(SrsRtcPublishStreamTest, OnRtcpTypicalScenario) @@ -608,9 +605,6 @@ VOID TEST(SrsRtcPublishStreamTest, RequestKeyframeTypicalScenario) // Verify that PLI packet send was attempted again EXPECT_EQ(2, mock_receiver.send_rtcp_fb_pli_count_); - - // Reset receiver for cleanup - mock_receiver.reset(); } VOID TEST(SrsRtcPublishStreamTest, UpdateRttTypicalScenario) diff --git a/trunk/src/utest/srs_utest_ai13.cpp b/trunk/src/utest/srs_utest_ai13.cpp index 5c5e0c29a..44ef34881 100644 --- a/trunk/src/utest/srs_utest_ai13.cpp +++ b/trunk/src/utest/srs_utest_ai13.cpp @@ -1818,36 +1818,6 @@ void MockHlsController::set_on_unpublish_error(srs_error_t err) on_unpublish_error_ = srs_error_copy(err); } -// Mock origin hub implementation -MockOriginHub::MockOriginHub() -{ - on_hls_request_sh_count_ = 0; - on_hls_request_sh_error_ = srs_success; -} - -MockOriginHub::~MockOriginHub() -{ - srs_freep(on_hls_request_sh_error_); -} - -srs_error_t MockOriginHub::on_hls_request_sh() -{ - on_hls_request_sh_count_++; - return srs_error_copy(on_hls_request_sh_error_); -} - -void MockOriginHub::reset() -{ - on_hls_request_sh_count_ = 0; - srs_freep(on_hls_request_sh_error_); -} - -void MockOriginHub::set_on_hls_request_sh_error(srs_error_t err) -{ - srs_freep(on_hls_request_sh_error_); - on_hls_request_sh_error_ = srs_error_copy(err); -} - // Unit test for SrsHls::reload typical scenario VOID TEST(AppHlsTest, HlsReloadTypicalScenario) { diff --git a/trunk/src/utest/srs_utest_ai13.hpp b/trunk/src/utest/srs_utest_ai13.hpp index 5f5cb6ba8..b8d5e7947 100644 --- a/trunk/src/utest/srs_utest_ai13.hpp +++ b/trunk/src/utest/srs_utest_ai13.hpp @@ -179,19 +179,4 @@ public: void set_on_unpublish_error(srs_error_t err); }; -// Mock origin hub for testing SrsHls::reload -class MockOriginHub : public SrsOriginHub -{ -public: - int on_hls_request_sh_count_; - srs_error_t on_hls_request_sh_error_; - -public: - MockOriginHub(); - virtual ~MockOriginHub(); - virtual srs_error_t on_hls_request_sh(); - void reset(); - void set_on_hls_request_sh_error(srs_error_t err); -}; - #endif diff --git a/trunk/src/utest/srs_utest_ai14.cpp b/trunk/src/utest/srs_utest_ai14.cpp index 6146726bb..7712128a2 100644 --- a/trunk/src/utest/srs_utest_ai14.cpp +++ b/trunk/src/utest/srs_utest_ai14.cpp @@ -18,10 +18,13 @@ using namespace std; #include #include #include +#include #include #include +#include #include #include +#include #include #include #include @@ -1350,7 +1353,7 @@ MockHlsForOriginHub::~MockHlsForOriginHub() srs_freep(initialize_error_); } -srs_error_t MockHlsForOriginHub::initialize(SrsOriginHub *h, ISrsRequest *r) +srs_error_t MockHlsForOriginHub::initialize(ISrsOriginHub *h, ISrsRequest *r) { initialize_count_++; return srs_error_copy(initialize_error_); @@ -2196,55 +2199,6 @@ VOID TEST(AppOriginHubTest, OnVideoTypicalScenario) srs_freep(mock_source); } -MockAppConfigForForwarder::MockAppConfigForForwarder() -{ - forwards_directive_ = NULL; - backend_directive_ = NULL; -} - -MockAppConfigForForwarder::~MockAppConfigForForwarder() -{ - srs_freep(forwards_directive_); - srs_freep(backend_directive_); -} - -bool MockAppConfigForForwarder::get_forward_enabled(std::string vhost) -{ - return forwards_directive_ != NULL || backend_directive_ != NULL; -} - -SrsConfDirective *MockAppConfigForForwarder::get_forwards(std::string vhost) -{ - return forwards_directive_; -} - -SrsConfDirective *MockAppConfigForForwarder::get_forward_backend(std::string vhost) -{ - return backend_directive_; -} - -void MockAppConfigForForwarder::set_forward_destinations(const std::vector &destinations) -{ - srs_freep(forwards_directive_); - - if (!destinations.empty()) { - forwards_directive_ = new SrsConfDirective(); - forwards_directive_->name_ = "destination"; - forwards_directive_->args_ = destinations; - } -} - -void MockAppConfigForForwarder::set_forward_backend(const std::string &backend_url) -{ - srs_freep(backend_directive_); - - if (!backend_url.empty()) { - backend_directive_ = new SrsConfDirective(); - backend_directive_->name_ = "backend"; - backend_directive_->args_.push_back(backend_url); - } -} - MockHttpHooksForBackend::MockHttpHooksForBackend() { on_forward_backend_count_ = 0; @@ -2272,7 +2226,7 @@ VOID TEST(AppOriginHubTest, CreateForwardersTypicalScenario) srs_error_t err; // Create mock config that will outlive the hub - MockAppConfigForForwarder *mock_config = new MockAppConfigForForwarder(); + MockAppConfig *mock_config = new MockAppConfig(); MockStatisticForOriginHub *mock_stat = new MockStatisticForOriginHub(); MockLiveSourceForOriginHub *mock_source = new MockLiveSourceForOriginHub(); @@ -2808,7 +2762,7 @@ VOID TEST(AppOriginHubTest, CreateBackendForwardersTypicalScenario) srs_error_t err; // Create mock config that will outlive the hub - MockAppConfigForForwarder *mock_config = new MockAppConfigForForwarder(); + MockAppConfig *mock_config = new MockAppConfig(); MockStatisticForOriginHub *mock_stat = new MockStatisticForOriginHub(); MockLiveSourceForOriginHub *mock_source = new MockLiveSourceForOriginHub(); MockHttpHooksForBackend *mock_hooks = new MockHttpHooksForBackend(); @@ -2997,74 +2951,9 @@ VOID TEST(SrsLiveSourceTest, OnAggregateSelectionTypical) delete[] payload; } -MockOriginHubForLiveSource::MockOriginHubForLiveSource() -{ - initialize_count_ = 0; - initialize_error_ = srs_success; -} - -MockOriginHubForLiveSource::~MockOriginHubForLiveSource() -{ - srs_freep(initialize_error_); -} - -srs_error_t MockOriginHubForLiveSource::initialize(SrsSharedPtr s, ISrsRequest *r) -{ - initialize_count_++; - return srs_error_copy(initialize_error_); -} - -void MockOriginHubForLiveSource::dispose() -{ -} - -srs_error_t MockOriginHubForLiveSource::cycle() -{ - return srs_success; -} - -bool MockOriginHubForLiveSource::active() -{ - return false; -} - -srs_utime_t MockOriginHubForLiveSource::cleanup_delay() -{ - return 0; -} - -srs_error_t MockOriginHubForLiveSource::on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet) -{ - return srs_success; -} - -srs_error_t MockOriginHubForLiveSource::on_audio(SrsMediaPacket *shared_audio) -{ - return srs_success; -} - -srs_error_t MockOriginHubForLiveSource::on_video(SrsMediaPacket *shared_video, bool is_sequence_header) -{ - return srs_success; -} - -srs_error_t MockOriginHubForLiveSource::on_publish() -{ - return srs_success; -} - -void MockOriginHubForLiveSource::on_unpublish() -{ -} - -srs_error_t MockOriginHubForLiveSource::on_dvr_request_sh() -{ - return srs_success; -} - MockAppFactoryForLiveSource::MockAppFactoryForLiveSource() { - mock_hub_ = new MockOriginHubForLiveSource(); + mock_hub_ = new MockOriginHub(); create_origin_hub_count_ = 0; } @@ -3148,7 +3037,7 @@ VOID TEST(SrsLiveSourceTest, ConsumerDumpsTypicalScenario) SrsSharedPtr wrapper(source); // Set hub to active state to test the typical publishing scenario - mock_factory->mock_hub_ = new MockOriginHubForLiveSource(); + mock_factory->mock_hub_ = new MockOriginHub(); source->hub_ = mock_factory->mock_hub_; mock_factory->mock_hub_ = NULL; @@ -3196,7 +3085,7 @@ VOID TEST(SrsLiveSourceTest, OnMetaDataTypicalScenario) SrsSharedPtr wrapper(source); // Set hub to active state to test the typical publishing scenario - mock_factory->mock_hub_ = new MockOriginHubForLiveSource(); + mock_factory->mock_hub_ = new MockOriginHub(); source->hub_ = mock_factory->mock_hub_; mock_factory->mock_hub_ = NULL; diff --git a/trunk/src/utest/srs_utest_ai14.hpp b/trunk/src/utest/srs_utest_ai14.hpp index 4018cb7f8..73fd5c685 100644 --- a/trunk/src/utest/srs_utest_ai14.hpp +++ b/trunk/src/utest/srs_utest_ai14.hpp @@ -92,7 +92,7 @@ public: public: MockHlsForOriginHub(); virtual ~MockHlsForOriginHub(); - virtual srs_error_t initialize(SrsOriginHub *h, ISrsRequest *r); + virtual srs_error_t initialize(ISrsOriginHub *h, ISrsRequest *r); virtual srs_error_t on_audio(SrsMediaPacket *shared_audio, SrsFormat *format); virtual srs_error_t on_video(SrsMediaPacket *shared_video, SrsFormat *format); virtual srs_error_t on_publish(); @@ -256,22 +256,6 @@ public: #endif // Mock config for testing SrsOriginHub::create_forwarders -class MockAppConfigForForwarder : public MockAppConfig -{ -public: - SrsConfDirective *forwards_directive_; - SrsConfDirective *backend_directive_; - -public: - MockAppConfigForForwarder(); - virtual ~MockAppConfigForForwarder(); - virtual bool get_forward_enabled(std::string vhost); - virtual SrsConfDirective *get_forwards(std::string vhost); - virtual SrsConfDirective *get_forward_backend(std::string vhost); - void set_forward_destinations(const std::vector &destinations); - void set_forward_backend(const std::string &backend_url); -}; - // Mock HTTP hooks for testing SrsOriginHub::create_backend_forwarders class MockHttpHooksForBackend : public MockHttpHooks { @@ -319,34 +303,11 @@ public: virtual SrsLiveSource *create_live_source(); }; -// Mock ISrsOriginHub for testing SrsLiveSource::initialize -class MockOriginHubForLiveSource : public ISrsOriginHub -{ -public: - int initialize_count_; - srs_error_t initialize_error_; - -public: - MockOriginHubForLiveSource(); - virtual ~MockOriginHubForLiveSource(); - virtual srs_error_t initialize(SrsSharedPtr s, ISrsRequest *r); - virtual void dispose(); - virtual srs_error_t cycle(); - virtual bool active(); - virtual srs_utime_t cleanup_delay(); - virtual srs_error_t on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet); - virtual srs_error_t on_audio(SrsMediaPacket *shared_audio); - virtual srs_error_t on_video(SrsMediaPacket *shared_video, bool is_sequence_header); - virtual srs_error_t on_publish(); - virtual void on_unpublish(); - virtual srs_error_t on_dvr_request_sh(); -}; - // Mock ISrsAppFactory for testing SrsLiveSource::initialize class MockAppFactoryForLiveSource : public SrsAppFactory { public: - MockOriginHubForLiveSource *mock_hub_; + MockOriginHub *mock_hub_; int create_origin_hub_count_; public: diff --git a/trunk/src/utest/srs_utest_ai18.cpp b/trunk/src/utest/srs_utest_ai18.cpp index 09c55a853..85dc263fd 100644 --- a/trunk/src/utest/srs_utest_ai18.cpp +++ b/trunk/src/utest/srs_utest_ai18.cpp @@ -2457,123 +2457,6 @@ void MockAppFactoryForSessionManager::reset() create_rtc_connection_count_ = 0; } -// Unit test for SrsRtcSessionManager::create_rtc_session -// This test verifies the major use scenario: token acquisition and error handling -// Note: Full session creation requires complex initialization, so we test the key logic paths -VOID TEST(RtcSessionManagerTest, CreateRtcSession_TokenAcquisitionAndErrorHandling) -{ - srs_error_t err; - - // Create mock dependencies - MockResourceManagerForBindSession mock_conn_manager; - MockStreamPublishTokenManager mock_token_manager; - MockRtcSourceManager mock_rtc_sources; - - // Create SrsRtcSessionManager - SrsUniquePtr session_manager(new SrsRtcSessionManager()); - - // Inject mock dependencies - session_manager->conn_manager_ = &mock_conn_manager; - session_manager->stream_publish_tokens_ = &mock_token_manager; - session_manager->rtc_sources_ = &mock_rtc_sources; - - // Test 1: Verify error handling when token acquisition fails - { - // Set token acquisition to fail - mock_token_manager.set_acquire_token_error(srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "stream busy")); - - // Create RTC user config for publishing - SrsUniquePtr ruc(new SrsRtcUserConfig()); - ruc->publish_ = true; - ruc->req_ = new MockRtcAsyncCallRequest("test.vhost", "live", "stream1"); - - // Create local SDP - SrsSdp local_sdp; - - // Test: Create RTC session should fail due to token acquisition error - ISrsRtcConnection *session = NULL; - HELPER_EXPECT_FAILED(session_manager->create_rtc_session(ruc.get(), local_sdp, &session)); - - // Verify: Token acquisition was attempted - EXPECT_EQ(mock_token_manager.acquire_token_count_, 1); - - // Verify: RTC source was NOT fetched/created (because token acquisition failed first) - EXPECT_EQ(mock_rtc_sources.fetch_or_create_count_, 0); - - // Clean up - srs_freep(session); - srs_freep(ruc->req_); - } - - // Test 2: Verify error handling when source fetch/create fails - { - mock_token_manager.reset(); - mock_rtc_sources.reset(); - - // Set source fetch/create to fail - mock_rtc_sources.set_fetch_or_create_error(srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create source failed")); - - // Create RTC user config for publishing - SrsUniquePtr ruc2(new SrsRtcUserConfig()); - ruc2->publish_ = true; - ruc2->req_ = new MockRtcAsyncCallRequest("test.vhost", "live", "stream2"); - - // Create local SDP - SrsSdp local_sdp2; - - // Test: Create RTC session should fail due to source creation error - ISrsRtcConnection *session2 = NULL; - HELPER_EXPECT_FAILED(session_manager->create_rtc_session(ruc2.get(), local_sdp2, &session2)); - - // Verify: Token acquisition was attempted - EXPECT_EQ(mock_token_manager.acquire_token_count_, 1); - - // Verify: RTC source fetch/create was attempted - EXPECT_EQ(mock_rtc_sources.fetch_or_create_count_, 1); - - // Clean up - srs_freep(session2); - srs_freep(ruc2->req_); - } - - // Test 3: Verify error handling when source cannot publish (stream busy) - { - mock_token_manager.reset(); - mock_rtc_sources.reset(); - - // Set source to not allow publishing (simulate stream busy) - // can_publish() returns !is_created_, so set is_created_ to true - mock_rtc_sources.mock_source_->is_created_ = true; - - // Create RTC user config for publishing - SrsUniquePtr ruc3(new SrsRtcUserConfig()); - ruc3->publish_ = true; - ruc3->req_ = new MockRtcAsyncCallRequest("test.vhost", "live", "stream3"); - - // Create local SDP - SrsSdp local_sdp3; - - // Test: Create RTC session should fail because source is busy - ISrsRtcConnection *session3 = NULL; - HELPER_EXPECT_FAILED(session_manager->create_rtc_session(ruc3.get(), local_sdp3, &session3)); - - // Verify: Token acquisition was attempted - EXPECT_EQ(mock_token_manager.acquire_token_count_, 1); - - // Verify: RTC source fetch/create was attempted - EXPECT_EQ(mock_rtc_sources.fetch_or_create_count_, 1); - - // Clean up - srs_freep(session3); - srs_freep(ruc3->req_); - } - - // Clean up - set to NULL to avoid double-free - session_manager->conn_manager_ = NULL; - session_manager->stream_publish_tokens_ = NULL; - session_manager->rtc_sources_ = NULL; -} - // Mock ISrsRtcConnection implementation for srs_update_rtc_sessions test MockRtcConnectionForUpdateSessions::MockRtcConnectionForUpdateSessions() { diff --git a/trunk/src/utest/srs_utest_ai19.cpp b/trunk/src/utest/srs_utest_ai19.cpp index e7ea70360..362a06356 100644 --- a/trunk/src/utest/srs_utest_ai19.cpp +++ b/trunk/src/utest/srs_utest_ai19.cpp @@ -513,92 +513,6 @@ void MockMpegtsRawH264Stream::reset() is_pps_result_ = false; } -// Mock ISrsBasicRtmpClient implementation -MockMpegtsRtmpClient::MockMpegtsRtmpClient() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - connect_error_ = srs_success; - publish_error_ = srs_success; - stream_id_ = 1; -} - -MockMpegtsRtmpClient::~MockMpegtsRtmpClient() -{ - srs_freep(connect_error_); - srs_freep(publish_error_); -} - -srs_error_t MockMpegtsRtmpClient::connect() -{ - connect_called_ = true; - return srs_error_copy(connect_error_); -} - -void MockMpegtsRtmpClient::close() -{ - close_called_ = true; -} - -srs_error_t MockMpegtsRtmpClient::publish(int chunk_size, bool with_vhost, std::string *pstream) -{ - publish_called_ = true; - return srs_error_copy(publish_error_); -} - -srs_error_t MockMpegtsRtmpClient::play(int chunk_size, bool with_vhost, std::string *pstream) -{ - return srs_success; -} - -void MockMpegtsRtmpClient::kbps_sample(const char *label, srs_utime_t age) -{ -} - -srs_error_t MockMpegtsRtmpClient::recv_message(SrsRtmpCommonMessage **pmsg) -{ - return srs_success; -} - -srs_error_t MockMpegtsRtmpClient::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) -{ - return srs_success; -} - -srs_error_t MockMpegtsRtmpClient::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) -{ - for (int i = 0; i < nb_msgs; i++) { - srs_freep(msgs[i]); - } - return srs_success; -} - -srs_error_t MockMpegtsRtmpClient::send_and_free_message(SrsMediaPacket *msg) -{ - srs_freep(msg); - return srs_success; -} - -void MockMpegtsRtmpClient::set_recv_timeout(srs_utime_t timeout) -{ -} - -int MockMpegtsRtmpClient::sid() -{ - return stream_id_; -} - -void MockMpegtsRtmpClient::reset() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - srs_freep(connect_error_); - srs_freep(publish_error_); - stream_id_ = 1; -} - // Test SrsMpegtsOverUdp::on_ts_video - major use scenario // This test covers the complete video processing flow: // 1. Receive TS video message with H.264 annexb data (SPS, PPS, IDR frame) @@ -615,7 +529,7 @@ VOID TEST(MpegtsOverUdpTest, OnTsVideoWithSpsPpsIdrFrame) // Create mock dependencies MockMpegtsRawH264Stream *mock_avc = new MockMpegtsRawH264Stream(); - MockMpegtsRtmpClient *mock_sdk = new MockMpegtsRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); // Inject mock dependencies udp_handler->avc_ = mock_avc; @@ -702,7 +616,7 @@ VOID TEST(MpegtsOverUdpTest, WriteH264SpsPps) // Create mock dependencies MockMpegtsRawH264Stream *mock_avc = new MockMpegtsRawH264Stream(); - MockMpegtsRtmpClient *mock_sdk = new MockMpegtsRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); SrsUniquePtr mock_queue(new SrsMpegtsQueue()); // Inject mock dependencies @@ -756,7 +670,7 @@ VOID TEST(MpegtsOverUdpTest, WriteH264IpbFrameWithIdrFrame) // Create mock dependencies MockMpegtsRawH264Stream *mock_avc = new MockMpegtsRawH264Stream(); - MockMpegtsRtmpClient *mock_sdk = new MockMpegtsRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); SrsUniquePtr mock_queue(new SrsMpegtsQueue()); // Inject mock dependencies @@ -999,7 +913,7 @@ VOID TEST(MpegtsOverUdpTest, RtmpWritePacketWithVideoData) SrsUniquePtr udp_handler(new SrsMpegtsOverUdp()); // Create mock dependencies - MockMpegtsRtmpClient *mock_sdk = new MockMpegtsRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); SrsUniquePtr mock_factory(new MockAppFactoryForMpegtsOverUdp()); SrsUniquePtr mock_queue(new SrsMpegtsQueue()); @@ -1723,107 +1637,6 @@ void MockFlvDecoderForDynamicConn::reset() tag_data_size_ = 0; } -// Mock ISrsBasicRtmpClient implementation for SrsDynamicHttpConn::do_proxy -MockRtmpClientForDynamicConn::MockRtmpClientForDynamicConn() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - send_and_free_message_called_ = false; - connect_error_ = srs_success; - publish_error_ = srs_success; - send_and_free_message_error_ = srs_success; - stream_id_ = 1; - send_message_count_ = 0; -} - -MockRtmpClientForDynamicConn::~MockRtmpClientForDynamicConn() -{ - srs_freep(connect_error_); - srs_freep(publish_error_); - srs_freep(send_and_free_message_error_); -} - -srs_error_t MockRtmpClientForDynamicConn::connect() -{ - connect_called_ = true; - return srs_error_copy(connect_error_); -} - -void MockRtmpClientForDynamicConn::close() -{ - close_called_ = true; -} - -srs_error_t MockRtmpClientForDynamicConn::publish(int chunk_size, bool with_vhost, std::string *pstream) -{ - publish_called_ = true; - return srs_error_copy(publish_error_); -} - -srs_error_t MockRtmpClientForDynamicConn::play(int chunk_size, bool with_vhost, std::string *pstream) -{ - return srs_success; -} - -void MockRtmpClientForDynamicConn::kbps_sample(const char *label, srs_utime_t age) -{ -} - -srs_error_t MockRtmpClientForDynamicConn::recv_message(SrsRtmpCommonMessage **pmsg) -{ - return srs_success; -} - -srs_error_t MockRtmpClientForDynamicConn::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) -{ - return srs_success; -} - -srs_error_t MockRtmpClientForDynamicConn::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) -{ - for (int i = 0; i < nb_msgs; i++) { - srs_freep(msgs[i]); - } - return srs_success; -} - -srs_error_t MockRtmpClientForDynamicConn::send_and_free_message(SrsMediaPacket *msg) -{ - send_and_free_message_called_ = true; - send_message_count_++; - - if (send_and_free_message_error_ != srs_success) { - srs_freep(msg); - return srs_error_copy(send_and_free_message_error_); - } - - srs_freep(msg); - return srs_success; -} - -void MockRtmpClientForDynamicConn::set_recv_timeout(srs_utime_t timeout) -{ -} - -int MockRtmpClientForDynamicConn::sid() -{ - return stream_id_; -} - -void MockRtmpClientForDynamicConn::reset() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - send_and_free_message_called_ = false; - srs_freep(connect_error_); - srs_freep(publish_error_); - srs_freep(send_and_free_message_error_); - stream_id_ = 1; - send_message_count_ = 0; -} - // Mock ISrsAppFactory implementation for SrsDynamicHttpConn::do_proxy MockAppFactoryForDynamicConn::MockAppFactoryForDynamicConn() { @@ -1902,7 +1715,7 @@ VOID TEST(DynamicHttpConnTest, DoProxyWithVideoAndAudioTags) // Create mock dependencies MockHttpResponseReaderForDynamicConn mock_reader; MockFlvDecoderForDynamicConn mock_decoder; - MockRtmpClientForDynamicConn *mock_rtmp_client = new MockRtmpClientForDynamicConn(); + MockRtmpClient *mock_rtmp_client = new MockRtmpClient(); SrsUniquePtr mock_factory(new MockAppFactoryForDynamicConn()); MockPithyPrintForDynamicConn *mock_pprint = new MockPithyPrintForDynamicConn(); @@ -2772,7 +2585,6 @@ VOID TEST(QueueRecvThreadTest, BasicQueueOperations) EXPECT_TRUE(mock_rtmp->set_auto_response_called_); EXPECT_FALSE(mock_rtmp->auto_response_value_); // Should be set to false - mock_rtmp->reset(); queue_thread->on_stop(); EXPECT_TRUE(mock_rtmp->set_auto_response_called_); EXPECT_TRUE(mock_rtmp->auto_response_value_); // Should be set to true diff --git a/trunk/src/utest/srs_utest_ai19.hpp b/trunk/src/utest/srs_utest_ai19.hpp index a05884ff4..befee6f4b 100644 --- a/trunk/src/utest/srs_utest_ai19.hpp +++ b/trunk/src/utest/srs_utest_ai19.hpp @@ -138,41 +138,11 @@ public: void reset(); }; -// Mock ISrsBasicRtmpClient for testing SrsMpegtsOverUdp::on_ts_video -class MockMpegtsRtmpClient : public ISrsBasicRtmpClient -{ -public: - bool connect_called_; - bool publish_called_; - bool close_called_; - srs_error_t connect_error_; - srs_error_t publish_error_; - int stream_id_; - -public: - MockMpegtsRtmpClient(); - virtual ~MockMpegtsRtmpClient(); - -public: - virtual srs_error_t connect(); - virtual void close(); - virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual void kbps_sample(const char *label, srs_utime_t age); - virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); - virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); - virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs); - virtual srs_error_t send_and_free_message(SrsMediaPacket *msg); - virtual void set_recv_timeout(srs_utime_t timeout); - virtual int sid(); - void reset(); -}; - // Mock ISrsAppFactory for testing SrsMpegtsOverUdp::rtmp_write_packet class MockAppFactoryForMpegtsOverUdp : public SrsAppFactory { public: - MockMpegtsRtmpClient *mock_rtmp_client_; + MockRtmpClient *mock_rtmp_client_; bool create_rtmp_client_called_; public: @@ -355,44 +325,11 @@ public: void reset(); }; -// Mock ISrsBasicRtmpClient for testing SrsDynamicHttpConn::do_proxy -class MockRtmpClientForDynamicConn : public ISrsBasicRtmpClient -{ -public: - bool connect_called_; - bool publish_called_; - bool close_called_; - bool send_and_free_message_called_; - srs_error_t connect_error_; - srs_error_t publish_error_; - srs_error_t send_and_free_message_error_; - int stream_id_; - int send_message_count_; - -public: - MockRtmpClientForDynamicConn(); - virtual ~MockRtmpClientForDynamicConn(); - -public: - virtual srs_error_t connect(); - virtual void close(); - virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual void kbps_sample(const char *label, srs_utime_t age); - virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); - virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); - virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs); - virtual srs_error_t send_and_free_message(SrsMediaPacket *msg); - virtual void set_recv_timeout(srs_utime_t timeout); - virtual int sid(); - void reset(); -}; - // Mock ISrsAppFactory for testing SrsDynamicHttpConn::do_proxy class MockAppFactoryForDynamicConn : public SrsAppFactory { public: - MockRtmpClientForDynamicConn *mock_rtmp_client_; + MockRtmpClient *mock_rtmp_client_; bool create_rtmp_client_called_; public: diff --git a/trunk/src/utest/srs_utest_ai22.cpp b/trunk/src/utest/srs_utest_ai22.cpp index 5c68f3c47..e1bdc7d39 100644 --- a/trunk/src/utest/srs_utest_ai22.cpp +++ b/trunk/src/utest/srs_utest_ai22.cpp @@ -169,94 +169,6 @@ std::string MockEdgeConfig::get_dvr_plan(std::string vhost) return "session"; } -// MockEdgeRtmpClient implementation -MockEdgeRtmpClient::MockEdgeRtmpClient() -{ - connect_called_ = false; - play_called_ = false; - close_called_ = false; - recv_message_called_ = false; - decode_message_called_ = false; - set_recv_timeout_called_ = false; - kbps_sample_called_ = false; - connect_error_ = srs_success; - play_error_ = srs_success; - play_stream_ = ""; - recv_timeout_ = 0; - kbps_label_ = ""; - kbps_age_ = 0; -} - -MockEdgeRtmpClient::~MockEdgeRtmpClient() -{ -} - -srs_error_t MockEdgeRtmpClient::connect() -{ - connect_called_ = true; - return srs_error_copy(connect_error_); -} - -void MockEdgeRtmpClient::close() -{ - close_called_ = true; -} - -srs_error_t MockEdgeRtmpClient::publish(int chunk_size, bool with_vhost, std::string *pstream) -{ - return srs_success; -} - -srs_error_t MockEdgeRtmpClient::play(int chunk_size, bool with_vhost, std::string *pstream) -{ - play_called_ = true; - if (pstream) { - *pstream = "livestream"; // Return the stream name - play_stream_ = *pstream; - } - return srs_error_copy(play_error_); -} - -void MockEdgeRtmpClient::kbps_sample(const char *label, srs_utime_t age) -{ - kbps_sample_called_ = true; - kbps_label_ = label; - kbps_age_ = age; -} - -int MockEdgeRtmpClient::sid() -{ - return 1; -} - -srs_error_t MockEdgeRtmpClient::recv_message(SrsRtmpCommonMessage **pmsg) -{ - recv_message_called_ = true; - return srs_success; -} - -srs_error_t MockEdgeRtmpClient::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) -{ - decode_message_called_ = true; - return srs_success; -} - -srs_error_t MockEdgeRtmpClient::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) -{ - return srs_success; -} - -srs_error_t MockEdgeRtmpClient::send_and_free_message(SrsMediaPacket *msg) -{ - return srs_success; -} - -void MockEdgeRtmpClient::set_recv_timeout(srs_utime_t timeout) -{ - set_recv_timeout_called_ = true; - recv_timeout_ = timeout; -} - // MockEdgeAppFactory implementation MockEdgeAppFactory::MockEdgeAppFactory() { @@ -302,7 +214,7 @@ VOID TEST(EdgeRtmpUpstreamTest, ConnectToOriginWithLoadBalancing) config->chunk_size_ = 60000; // Create mock RTMP client - use raw pointer since upstream will manage it - MockEdgeRtmpClient *mock_sdk = new MockEdgeRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); mock_sdk->connect_error_ = srs_success; mock_sdk->play_error_ = srs_success; @@ -341,87 +253,6 @@ VOID TEST(EdgeRtmpUpstreamTest, ConnectToOriginWithLoadBalancing) // Note: mock_sdk will be deleted by upstream destructor via srs_freep(sdk_) } -// Test SrsEdgeRtmpUpstream message handling methods -// This test covers the major use scenario for edge server receiving and processing -// messages from origin server: -// 1. Set receive timeout for the connection -// 2. Receive RTMP messages from origin -// 3. Decode messages into RTMP commands -// 4. Sample bandwidth statistics -// 5. Query selected server information -// 6. Close the connection -VOID TEST(EdgeRtmpUpstreamTest, MessageHandlingAndClose) -{ - srs_error_t err; - - // Create mock request - SrsUniquePtr req(new MockEdgeRequest("test.vhost", "live", "stream")); - - // Create mock config - SrsUniquePtr config(new MockEdgeConfig()); - config->edge_origin_directive_ = new SrsConfDirective(); - config->edge_origin_directive_->name_ = "origin"; - config->edge_origin_directive_->args_.push_back("192.168.1.10:1935"); - - // Create mock RTMP client - MockEdgeRtmpClient *mock_sdk = new MockEdgeRtmpClient(); - - // Create mock app factory - SrsUniquePtr mock_factory(new MockEdgeAppFactory()); - mock_factory->mock_client_ = mock_sdk; - - // Create load balancer - SrsUniquePtr lb(new SrsLbRoundRobin()); - - // Create edge upstream - SrsUniquePtr upstream(new SrsEdgeRtmpUpstream("")); - - // Inject mock dependencies - upstream->config_ = config.get(); - upstream->app_factory_ = mock_factory.get(); - - // Connect to origin - err = upstream->connect(req.get(), lb.get()); - HELPER_EXPECT_SUCCESS(err); - - // Test 1: Set receive timeout - srs_utime_t timeout = 30 * SRS_UTIME_SECONDS; - upstream->set_recv_timeout(timeout); - EXPECT_TRUE(mock_sdk->set_recv_timeout_called_); - EXPECT_EQ(timeout, mock_sdk->recv_timeout_); - - // Test 2: Receive message from origin - SrsRtmpCommonMessage *msg = NULL; - err = upstream->recv_message(&msg); - HELPER_EXPECT_SUCCESS(err); - EXPECT_TRUE(mock_sdk->recv_message_called_); - - // Test 3: Decode message - SrsRtmpCommand *packet = NULL; - err = upstream->decode_message(msg, &packet); - HELPER_EXPECT_SUCCESS(err); - EXPECT_TRUE(mock_sdk->decode_message_called_); - - // Test 4: Sample bandwidth statistics - srs_utime_t age = 5 * SRS_UTIME_SECONDS; - upstream->kbps_sample("edge-pull", age); - EXPECT_TRUE(mock_sdk->kbps_sample_called_); - EXPECT_STREQ("edge-pull", mock_sdk->kbps_label_.c_str()); - EXPECT_EQ(age, mock_sdk->kbps_age_); - - // Test 5: Query selected server information - std::string selected_server; - int selected_port = 0; - upstream->selected(selected_server, selected_port); - EXPECT_STREQ("192.168.1.10", selected_server.c_str()); - EXPECT_EQ(1935, selected_port); - - // Test 6: Close connection - upstream->close(); - // After close(), sdk_ should be freed, so we can't check mock_sdk anymore - // The test passes if no crash occurs during close() -} - // MockEdgeHttpClient implementation MockEdgeHttpClient::MockEdgeHttpClient() { @@ -4254,72 +4085,6 @@ VOID TEST(DvrSegmentPlanTest, PublishUnpublishTypicalScenario) plan->config_ = NULL; } -// MockOriginHubForDvrSegmentPlan implementation -MockOriginHubForDvrSegmentPlan::MockOriginHubForDvrSegmentPlan() -{ - on_dvr_request_sh_count_ = 0; - on_dvr_request_sh_error_ = srs_success; -} - -MockOriginHubForDvrSegmentPlan::~MockOriginHubForDvrSegmentPlan() -{ - srs_freep(on_dvr_request_sh_error_); -} - -srs_error_t MockOriginHubForDvrSegmentPlan::initialize(SrsSharedPtr s, ISrsRequest *r) -{ - return srs_success; -} - -void MockOriginHubForDvrSegmentPlan::dispose() -{ -} - -srs_error_t MockOriginHubForDvrSegmentPlan::cycle() -{ - return srs_success; -} - -bool MockOriginHubForDvrSegmentPlan::active() -{ - return true; -} - -srs_utime_t MockOriginHubForDvrSegmentPlan::cleanup_delay() -{ - return 0; -} - -srs_error_t MockOriginHubForDvrSegmentPlan::on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet) -{ - return srs_success; -} - -srs_error_t MockOriginHubForDvrSegmentPlan::on_audio(SrsMediaPacket *shared_audio) -{ - return srs_success; -} - -srs_error_t MockOriginHubForDvrSegmentPlan::on_video(SrsMediaPacket *shared_video, bool is_sequence_header) -{ - return srs_success; -} - -srs_error_t MockOriginHubForDvrSegmentPlan::on_publish() -{ - return srs_success; -} - -void MockOriginHubForDvrSegmentPlan::on_unpublish() -{ -} - -srs_error_t MockOriginHubForDvrSegmentPlan::on_dvr_request_sh() -{ - on_dvr_request_sh_count_++; - return srs_error_copy(on_dvr_request_sh_error_); -} - // Test SrsDvrSegmentPlan::on_video with segment reaping when duration exceeds limit // This test covers the major use scenario: // 1. Segment duration exceeds configured limit (cduration_) @@ -4344,7 +4109,7 @@ VOID TEST(DvrSegmentPlanTest, OnVideoReapSegmentWhenDurationExceeds) mock_segmenter->fragment_ = fragment; // Create mock origin hub - SrsUniquePtr mock_hub(new MockOriginHubForDvrSegmentPlan()); + SrsUniquePtr mock_hub(new MockOriginHub()); // Create SrsDvrSegmentPlan instance SrsUniquePtr plan(new SrsDvrSegmentPlan()); @@ -4415,7 +4180,7 @@ VOID TEST(DvrTest, InitializeTypicalScenario) SrsUniquePtr mock_factory(new MockDvrAppFactory()); // Create mock origin hub - SrsUniquePtr mock_hub(new MockOriginHubForDvrSegmentPlan()); + SrsUniquePtr mock_hub(new MockOriginHub()); // Create mock request SrsUniquePtr req(new MockEdgeRequest("test.vhost", "live", "stream1")); @@ -4468,7 +4233,7 @@ VOID TEST(DvrTest, OnPublishUnpublishTypicalScenario) SrsUniquePtr mock_factory(new MockDvrAppFactory()); // Create mock origin hub - SrsUniquePtr mock_hub(new MockOriginHubForDvrSegmentPlan()); + SrsUniquePtr mock_hub(new MockOriginHub()); // Create mock request SrsUniquePtr req(new MockEdgeRequest("test.vhost", "live", "stream1")); @@ -4530,7 +4295,7 @@ VOID TEST(DvrTest, OnMediaPacketsTypicalScenario) SrsUniquePtr mock_factory(new MockDvrAppFactory()); // Create mock origin hub - SrsUniquePtr mock_hub(new MockOriginHubForDvrSegmentPlan()); + SrsUniquePtr mock_hub(new MockOriginHub()); // Create mock request SrsUniquePtr req(new MockEdgeRequest("test.vhost", "live", "stream1")); @@ -4606,7 +4371,7 @@ VOID TEST(DvrSegmentPlanTest, OnAudioTypicalScenario) mock_segmenter->fragment_ = fragment; // Create mock origin hub - SrsUniquePtr mock_hub(new MockOriginHubForDvrSegmentPlan()); + SrsUniquePtr mock_hub(new MockOriginHub()); // Create SrsDvrSegmentPlan instance SrsUniquePtr plan(new SrsDvrSegmentPlan()); diff --git a/trunk/src/utest/srs_utest_ai22.hpp b/trunk/src/utest/srs_utest_ai22.hpp index dc02772e1..a1b580df0 100644 --- a/trunk/src/utest/srs_utest_ai22.hpp +++ b/trunk/src/utest/srs_utest_ai22.hpp @@ -80,47 +80,11 @@ public: virtual std::string get_dvr_plan(std::string vhost); }; -// Mock RTMP client for testing edge upstream -class MockEdgeRtmpClient : public ISrsBasicRtmpClient -{ -public: - bool connect_called_; - bool play_called_; - bool close_called_; - bool recv_message_called_; - bool decode_message_called_; - bool set_recv_timeout_called_; - bool kbps_sample_called_; - srs_error_t connect_error_; - srs_error_t play_error_; - std::string play_stream_; - srs_utime_t recv_timeout_; - std::string kbps_label_; - srs_utime_t kbps_age_; - -public: - MockEdgeRtmpClient(); - virtual ~MockEdgeRtmpClient(); - -public: - virtual srs_error_t connect(); - virtual void close(); - virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual void kbps_sample(const char *label, srs_utime_t age); - virtual int sid(); - virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); - virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); - virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs); - virtual srs_error_t send_and_free_message(SrsMediaPacket *msg); - virtual void set_recv_timeout(srs_utime_t timeout); -}; - // Mock app factory for testing edge upstream class MockEdgeAppFactory : public SrsAppFactory { public: - MockEdgeRtmpClient *mock_client_; + MockRtmpClient *mock_client_; public: MockEdgeAppFactory(); @@ -648,29 +612,4 @@ public: virtual srs_error_t close(); }; -// Mock ISrsOriginHub for testing SrsDvrSegmentPlan -class MockOriginHubForDvrSegmentPlan : public ISrsOriginHub -{ -public: - int on_dvr_request_sh_count_; - srs_error_t on_dvr_request_sh_error_; - -public: - MockOriginHubForDvrSegmentPlan(); - virtual ~MockOriginHubForDvrSegmentPlan(); - -public: - virtual srs_error_t initialize(SrsSharedPtr s, ISrsRequest *r); - virtual void dispose(); - virtual srs_error_t cycle(); - virtual bool active(); - virtual srs_utime_t cleanup_delay(); - virtual srs_error_t on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet); - virtual srs_error_t on_audio(SrsMediaPacket *shared_audio); - virtual srs_error_t on_video(SrsMediaPacket *shared_video, bool is_sequence_header); - virtual srs_error_t on_publish(); - virtual void on_unpublish(); - virtual srs_error_t on_dvr_request_sh(); -}; - #endif diff --git a/trunk/src/utest/srs_utest_ai23.cpp b/trunk/src/utest/srs_utest_ai23.cpp index edb3a46a0..ecf072ab3 100644 --- a/trunk/src/utest/srs_utest_ai23.cpp +++ b/trunk/src/utest/srs_utest_ai23.cpp @@ -1164,87 +1164,6 @@ VOID TEST(MpegpsQueueTest, DequeueMediaPackets) srs_freep(msg); } -// Mock ISrsBasicRtmpClient implementation -MockGbRtmpClient::MockGbRtmpClient() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - connect_error_ = srs_success; - publish_error_ = srs_success; - stream_id_ = 1; -} - -MockGbRtmpClient::~MockGbRtmpClient() -{ - srs_freep(connect_error_); - srs_freep(publish_error_); -} - -srs_error_t MockGbRtmpClient::connect() -{ - connect_called_ = true; - return srs_error_copy(connect_error_); -} - -void MockGbRtmpClient::close() -{ - close_called_ = true; -} - -srs_error_t MockGbRtmpClient::publish(int chunk_size, bool with_vhost, std::string *pstream) -{ - publish_called_ = true; - return srs_error_copy(publish_error_); -} - -srs_error_t MockGbRtmpClient::play(int chunk_size, bool with_vhost, std::string *pstream) -{ - return srs_success; -} - -void MockGbRtmpClient::kbps_sample(const char *label, srs_utime_t age) -{ -} - -int MockGbRtmpClient::sid() -{ - return stream_id_; -} - -srs_error_t MockGbRtmpClient::recv_message(SrsRtmpCommonMessage **pmsg) -{ - return srs_success; -} - -srs_error_t MockGbRtmpClient::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) -{ - return srs_success; -} - -srs_error_t MockGbRtmpClient::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) -{ - return srs_success; -} - -srs_error_t MockGbRtmpClient::send_and_free_message(SrsMediaPacket *msg) -{ - return srs_success; -} - -void MockGbRtmpClient::set_recv_timeout(srs_utime_t timeout) -{ -} - -void MockGbRtmpClient::reset() -{ - connect_called_ = false; - publish_called_ = false; - close_called_ = false; - srs_freep(connect_error_); - srs_freep(publish_error_); -} - // Mock ISrsRawAacStream implementation MockGbRawAacStream::MockGbRawAacStream() { @@ -1720,7 +1639,7 @@ VOID TEST(GbMuxerTest, OnTsMessageAudio) // We need to mock the app_factory to return our mock SDK // For now, let's just test without RTMP connection by setting sdk_ to a mock // that's already "connected" - MockGbRtmpClient *mock_sdk = new MockGbRtmpClient(); + MockRtmpClient *mock_sdk = new MockRtmpClient(); muxer->sdk_ = mock_sdk; // Simulate already connected // Create TS message with audio data (AAC ADTS format) diff --git a/trunk/src/utest/srs_utest_ai23.hpp b/trunk/src/utest/srs_utest_ai23.hpp index bb5329124..ab8cf2ca3 100644 --- a/trunk/src/utest/srs_utest_ai23.hpp +++ b/trunk/src/utest/srs_utest_ai23.hpp @@ -293,36 +293,6 @@ public: void reset(); }; -// Mock ISrsBasicRtmpClient for testing SrsGbMuxer -class MockGbRtmpClient : public ISrsBasicRtmpClient -{ -public: - bool connect_called_; - bool publish_called_; - bool close_called_; - srs_error_t connect_error_; - srs_error_t publish_error_; - int stream_id_; - -public: - MockGbRtmpClient(); - virtual ~MockGbRtmpClient(); - -public: - virtual srs_error_t connect(); - virtual void close(); - virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); - virtual void kbps_sample(const char *label, srs_utime_t age); - virtual int sid(); - virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); - virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); - virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs); - virtual srs_error_t send_and_free_message(SrsMediaPacket *msg); - virtual void set_recv_timeout(srs_utime_t timeout); - void reset(); -}; - // Mock ISrsRawAacStream for testing SrsGbMuxer class MockGbRawAacStream : public ISrsRawAacStream { diff --git a/trunk/src/utest/srs_utest_forward.cpp b/trunk/src/utest/srs_utest_forward.cpp new file mode 100644 index 000000000..a9f5574a6 --- /dev/null +++ b/trunk/src/utest/srs_utest_forward.cpp @@ -0,0 +1,252 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2025 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include + +// Mock ISrsAppFactory implementation +MockAppFactoryForForwarder::MockAppFactoryForForwarder() +{ + mock_rtmp_client_ = NULL; +} + +MockAppFactoryForForwarder::~MockAppFactoryForForwarder() +{ + // Don't free mock_rtmp_client_ - it's managed by the test +} + +ISrsBasicRtmpClient *MockAppFactoryForForwarder::create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto) +{ + if (mock_rtmp_client_) { + mock_rtmp_client_->set_url(url); + } + return mock_rtmp_client_; +} + +// This test is used to verify the basic workflow of the forwarding. +// It's finished with the help of AI, but each step is manually designed +// and verified. So this is not dominated by AI, but by humanbeing. +VOID TEST(BasicWorkflowForwardTest, ManuallyVerifyForwardingHostport) +{ + srs_error_t err; + + // Create mock objects + SrsUniquePtr req(new MockRequest("test.vhost", "live", "stream1")); + MockRtmpClient *mock_sdk = new MockRtmpClient(); + SrsUniquePtr mock_factory(new MockAppFactoryForForwarder()); + mock_factory->mock_rtmp_client_ = mock_sdk; + SrsUniquePtr mock_config(new MockAppConfig()); + + // Create forwarder + SrsUniquePtr mock_hub(new MockOriginHub()); + SrsUniquePtr forwarder(new SrsForwarder(mock_hub.get())); + + forwarder->app_factory_ = mock_factory.get(); + forwarder->config_ = mock_config.get(); + + // Configure destination with traditional host:port format + std::string destination = "127.0.0.1:19350"; + + // Step 1: Initialize forwarder with destination + HELPER_EXPECT_SUCCESS(forwarder->initialize(req.get(), destination)); + EXPECT_STREQ("127.0.0.1:19350", forwarder->ep_forward_.c_str()); + + // Generate a video sequenece header message. + if (true) { + // Create a real H.264 video message with proper format. + // H.264 video format in RTMP/FLV: + // Byte 0: (FrameType << 4) | CodecID (CodecID=7 for H.264) + // FrameType=5 (disposable inter frame), CodecID=7 (H.264) = 0x57 + // Byte 1: AVCPacketType (0=sequence header, 1=NALU, 2=end of sequence) + // Byte 2-4: CompositionTime (3bytes little-endian int24) + // Remaining bytes: H.264 data + int payload_size = 10; + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.initialize_video(payload_size, 0, 1); + msg->create_payload(payload_size); + + // Fill in H.264 video data + SrsBuffer stream(msg->payload(), payload_size); + // Frame type & Codec ID: Disposable inter frame (5) + H.264 (7) = 0x57 + stream.write_1bytes(0x57); + // AVC packet type: 0 = sequence header + stream.write_1bytes(0x00); + // Composition time: 0 (3bytes little-endian int24) + stream.write_3bytes(0x000000); + // H.264 raw data (5 bytes of dummy video data) - SPS and PPS + for (int i = 0; i < 5; i++) { + stream.write_1bytes(0x00); + } + + // Convert to SrsMediaPacket + SrsMediaPacket *pkt = new SrsMediaPacket(); + msg->to_msg(pkt); + forwarder->sh_video_ = pkt; + } + + // Generate the audio sequence header. + if (true) { + // Create a real AAC audio message with proper format. + // AAC audio format in RTMP/FLV: + // Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType + // SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo) + // = 0xAF + // Byte 1: AACPacketType (0=sequence header, 1=raw data) + // Remaining bytes: AAC data + int payload_size = 10; + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.initialize_audio(payload_size, 0, 1); + msg->create_payload(payload_size); + + // Fill in AAC audio data + SrsBuffer stream(msg->payload(), payload_size); + // Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF + stream.write_1bytes(0xAF); + // AAC packet type: 0 = sequence header + stream.write_1bytes(0x00); + // AAC sequence header data (8 bytes of dummy audio data) + for (int i = 0; i < 8; i++) { + stream.write_1bytes(0x00); + } + + // Convert to SrsMediaPacket + SrsMediaPacket *pkt = new SrsMediaPacket(); + msg->to_msg(pkt); + forwarder->sh_audio_ = pkt; + } + + // Step 2: Call on_publish to start forwarding + HELPER_EXPECT_SUCCESS(forwarder->on_publish()); + + // Wait for forwarder to start + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the forwarder. + EXPECT_STREQ("rtmp://127.0.0.1:19350/live/stream1?vhost=test.vhost", mock_sdk->url_.c_str()); + EXPECT_EQ(2, mock_sdk->send_message_count_); + + // Generate an audio message. + if (true) { + // Create a real AAC audio message with proper format. + // AAC audio format in RTMP/FLV: + // Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType + // SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo) + // = 0xAF + // Byte 1: AACPacketType (0=sequence header, 1=raw data) + // Remaining bytes: AAC data + int payload_size = 10; + SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage(); + msg->header_.initialize_audio(payload_size, 0, 1); + msg->create_payload(payload_size); + + // Fill in AAC audio data + SrsBuffer stream(msg->payload(), payload_size); + // Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF + stream.write_1bytes(0xAF); + // AAC packet type: 1 = AAC raw data + stream.write_1bytes(0x01); + // AAC raw data (8 bytes of dummy audio data) + for (int i = 0; i < 8; i++) { + stream.write_1bytes(0x00); + } + + // Convert to SrsMediaPacket + SrsUniquePtr pkt(new SrsMediaPacket()); + msg->to_msg(pkt.get()); + HELPER_EXPECT_SUCCESS(forwarder->on_video(pkt.get())); + + // Use this message to wakeup the forwarder coroutine. + mock_sdk->recv_msgs_.push_back(msg); + mock_sdk->cond_->signal(); + + // Wait for forwarder to process the message + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify that the message is sent to the server. + EXPECT_EQ(1, mock_sdk->send_and_free_messages_count_); + } + + // Notify forwarder to quit. + mock_sdk->recv_err_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit"); + mock_sdk->cond_->signal(); + + // Stop the forwarder coroutine. + forwarder->on_unpublish(); + + // Wait for forwarder to stop + srs_usleep(1 * SRS_UTIME_MILLISECONDS); +} + +// This test is used to verify the forwarding with query parameters (tokens). +// It's finished with the help of AI, but each step is manually designed +// and verified. So this is not dominated by AI, but by humanbeing. +VOID TEST(BasicWorkflowForwardTest, ManuallyVerifyForwardingWithToken) +{ + srs_error_t err; + + // Create mock objects with query parameters in the request + SrsUniquePtr req(new MockRequest("test.vhost", "live", "stream1")); + // Set query parameters that should be forwarded (e.g., authentication tokens) + req->param_ = "?sdkappid=1007&userid=5fe6e61e&usersig=eJyToken123"; + + MockRtmpClient *mock_sdk = new MockRtmpClient(); + SrsUniquePtr mock_factory(new MockAppFactoryForForwarder()); + mock_factory->mock_rtmp_client_ = mock_sdk; + SrsUniquePtr mock_config(new MockAppConfig()); + + // Create forwarder + SrsUniquePtr mock_hub(new MockOriginHub()); + SrsUniquePtr forwarder(new SrsForwarder(mock_hub.get())); + + forwarder->app_factory_ = mock_factory.get(); + forwarder->config_ = mock_config.get(); + + // Configure destination with traditional host:port format + std::string destination = "127.0.0.1:19350"; + + // Step 1: Initialize forwarder with destination + HELPER_EXPECT_SUCCESS(forwarder->initialize(req.get(), destination)); + EXPECT_STREQ("127.0.0.1:19350", forwarder->ep_forward_.c_str()); + + // Step 2: Call on_publish to start forwarding + HELPER_EXPECT_SUCCESS(forwarder->on_publish()); + + // Wait for forwarder to start + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the forwarder URL includes the query parameters from the original request + // The expected URL should be: rtmp://127.0.0.1:19350/live/stream1?sdkappid=1007&userid=5fe6e61e&usersig=eJyToken123&vhost=test.vhost + EXPECT_STREQ("rtmp://127.0.0.1:19350/live/stream1?sdkappid=1007&userid=5fe6e61e&usersig=eJyToken123&vhost=test.vhost", mock_sdk->url_.c_str()); + + // Notify forwarder to quit. + mock_sdk->recv_err_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit"); + mock_sdk->cond_->signal(); + + // Stop the forwarder coroutine. + forwarder->on_unpublish(); + + // Wait for forwarder to stop + srs_usleep(1 * SRS_UTIME_MILLISECONDS); +} diff --git a/trunk/src/utest/srs_utest_forward.hpp b/trunk/src/utest/srs_utest_forward.hpp new file mode 100644 index 000000000..4020e4330 --- /dev/null +++ b/trunk/src/utest/srs_utest_forward.hpp @@ -0,0 +1,51 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2025 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_UTEST_FORWARD_HPP +#define SRS_UTEST_FORWARD_HPP + +/* +#include +*/ +#include + +#include +#include +#include +#include +#include +#include + +// Mock ISrsAppFactory for testing SrsForwarder +class MockAppFactoryForForwarder : public SrsAppFactory +{ +public: + MockRtmpClient *mock_rtmp_client_; + +public: + MockAppFactoryForForwarder(); + virtual ~MockAppFactoryForForwarder(); + virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto); +}; + +#endif diff --git a/trunk/src/utest/srs_utest_http_conn.cpp b/trunk/src/utest/srs_utest_http_conn.cpp index 0771e7024..ad2aee6fd 100644 --- a/trunk/src/utest/srs_utest_http_conn.cpp +++ b/trunk/src/utest/srs_utest_http_conn.cpp @@ -40,7 +40,7 @@ // This test is used to verify the basic workflow of the HTTP connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpRequest) +VOID TEST(BasicWorkflowHttpConnTest, ManuallyVerifyForHttpRequest) { srs_error_t err; @@ -89,7 +89,7 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpRequest) // This test is used to verify the basic workflow of the HTTPx connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest) +VOID TEST(BasicWorkflowHttpConnTest, ManuallyVerifyForHttpxRequest) { srs_error_t err; @@ -152,7 +152,7 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest) // This test is used to verify the basic workflow of the HTTP FLV streaming. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpStream) +VOID TEST(BasicWorkflowHttpConnTest, ManuallyVerifyForHttpStream) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_mock.cpp b/trunk/src/utest/srs_utest_mock.cpp index 503eb5bf8..3d2185e85 100644 --- a/trunk/src/utest/srs_utest_mock.cpp +++ b/trunk/src/utest/srs_utest_mock.cpp @@ -351,16 +351,6 @@ void MockRtcSourceManager::set_fetch_or_create_error(srs_error_t err) fetch_or_create_error_ = srs_error_copy(err); } -void MockRtcSourceManager::reset() -{ - srs_freep(initialize_error_); - srs_freep(fetch_or_create_error_); - initialize_error_ = srs_success; - fetch_or_create_error_ = srs_success; - initialize_count_ = 0; - fetch_or_create_count_ = 0; -} - // MockAppStatistic implementation MockAppStatistic::MockAppStatistic() { @@ -485,18 +475,6 @@ void MockAppStatistic::set_on_client_error(srs_error_t err) on_client_error_ = srs_error_copy(err); } -void MockAppStatistic::reset() -{ - srs_freep(on_client_error_); - on_client_error_ = srs_success; - on_client_count_ = 0; - on_disconnect_count_ = 0; - last_client_id_ = ""; - last_client_req_ = NULL; - last_client_conn_ = NULL; - last_client_type_ = SrsRtmpConnUnknown; -} - // MockRtcAsyncTaskExecutor implementation MockRtcAsyncTaskExecutor::MockRtcAsyncTaskExecutor() { @@ -521,13 +499,6 @@ void MockRtcAsyncTaskExecutor::set_exec_error(srs_error_t err) exec_error_ = err; } -void MockRtcAsyncTaskExecutor::reset() -{ - exec_error_ = srs_success; - exec_count_ = 0; - last_task_ = NULL; -} - // MockRtcPacketSender implementation MockRtcPacketSender::MockRtcPacketSender() { @@ -552,13 +523,6 @@ void MockRtcPacketSender::set_send_packet_error(srs_error_t err) send_packet_error_ = err; } -void MockRtcPacketSender::reset() -{ - send_packet_error_ = srs_success; - send_packet_count_ = 0; - last_sent_packet_ = NULL; -} - // MockAppConfig implementation MockAppConfig::MockAppConfig() { @@ -582,6 +546,8 @@ MockAppConfig::MockAppConfig() default_vhost_ = NULL; srt_to_rtmp_ = true; rtc_from_rtmp_ = false; + forwards_directive_ = NULL; + backend_directive_ = NULL; } MockAppConfig::~MockAppConfig() @@ -590,6 +556,30 @@ MockAppConfig::~MockAppConfig() clear_on_unpublish_directive(); srs_freep(default_vhost_); + srs_freep(forwards_directive_); + srs_freep(backend_directive_); +} + +void MockAppConfig::set_forward_destinations(const std::vector &destinations) +{ + srs_freep(forwards_directive_); + + if (!destinations.empty()) { + forwards_directive_ = new SrsConfDirective(); + forwards_directive_->name_ = "destination"; + forwards_directive_->args_ = destinations; + } +} + +void MockAppConfig::set_forward_backend(const std::string &backend_url) +{ + srs_freep(backend_directive_); + + if (!backend_url.empty()) { + backend_directive_ = new SrsConfDirective(); + backend_directive_->name_ = "backend"; + backend_directive_->args_.push_back(backend_url); + } } srs_utime_t MockAppConfig::get_pithy_print() @@ -859,12 +849,12 @@ bool MockAppConfig::get_hls_recover(std::string vhost) bool MockAppConfig::get_forward_enabled(std::string vhost) { - return false; + return forwards_directive_ != NULL || backend_directive_ != NULL; } SrsConfDirective *MockAppConfig::get_forwards(std::string vhost) { - return NULL; + return forwards_directive_; } srs_utime_t MockAppConfig::get_queue_length(std::string vhost) @@ -874,7 +864,7 @@ srs_utime_t MockAppConfig::get_queue_length(std::string vhost) SrsConfDirective *MockAppConfig::get_forward_backend(std::string vhost) { - return NULL; + return backend_directive_; } bool MockAppConfig::get_atc(std::string vhost) @@ -1076,19 +1066,6 @@ void MockRtcPacketReceiver::set_send_rtcp_fb_pli_error(srs_error_t err) send_rtcp_fb_pli_error_ = err; } -void MockRtcPacketReceiver::reset() -{ - send_rtcp_rr_error_ = srs_success; - send_rtcp_xr_rrtr_error_ = srs_success; - send_rtcp_error_ = srs_success; - send_rtcp_fb_pli_error_ = srs_success; - send_rtcp_rr_count_ = 0; - send_rtcp_xr_rrtr_count_ = 0; - send_rtcp_count_ = 0; - send_rtcp_fb_pli_count_ = 0; - check_send_nacks_count_ = 0; -} - MockSecurity::MockSecurity() { check_error_ = srs_success; @@ -1171,14 +1148,6 @@ void MockLiveSourceManager::set_can_publish(bool can_publish) } } -void MockLiveSourceManager::reset() -{ - srs_freep(fetch_or_create_error_); - fetch_or_create_error_ = srs_success; - fetch_or_create_count_ = 0; - can_publish_ = true; -} - // Mock live source implementation MockLiveSource::MockLiveSource() { @@ -1567,23 +1536,6 @@ void MockRtmpServer::set_auto_response(bool v) auto_response_value_ = v; } -void MockRtmpServer::reset() -{ - srs_freep(decode_message_error_); - srs_freep(decode_message_packet_); - srs_freep(fmle_unpublish_error_); - decode_message_error_ = srs_success; - decode_message_packet_ = NULL; - decode_message_count_ = 0; - fmle_unpublish_error_ = srs_success; - fmle_unpublish_count_ = 0; - send_and_free_packet_count_ = 0; - on_play_client_pause_count_ = 0; - last_pause_state_ = false; - set_auto_response_called_ = false; - auto_response_value_ = true; -} - // Mock ISrsProtocolReadWriter implementation for testing SrsHttpConn::cycle() MockProtocolReadWriter::MockProtocolReadWriter() { @@ -1878,16 +1830,6 @@ srs_error_t MockSslConnection::writev(const iovec *iov, int iov_size, ssize_t *n return srs_error_new(ERROR_NOT_SUPPORTED, "mock ssl writev"); } -void MockSslConnection::reset() -{ - handshake_called_ = false; - srs_freep(handshake_error_); - recv_timeout_ = SRS_UTIME_NO_TIMEOUT; - send_timeout_ = SRS_UTIME_NO_TIMEOUT; - recv_bytes_ = 0; - send_bytes_ = 0; -} - // Mock ISrsProtocolReadWriter implementation for SrsSrtRecvThread MockSrtConnection::MockSrtConnection() { @@ -2022,7 +1964,17 @@ MockHttpParser::MockHttpParser() MockHttpParser::~MockHttpParser() { - reset(); + initialize_called_ = false; + parse_message_called_ = false; + srs_freep(initialize_error_); + srs_freep(parse_message_error_); + + for (vector::iterator it = messages_.begin(); it != messages_.end(); ++it) { + ISrsHttpMessage *msg = *it; + srs_freep(msg); + } + messages_.clear(); + srs_freep(cond_); } @@ -2066,20 +2018,6 @@ srs_error_t MockHttpParser::parse_message(ISrsReader *reader, ISrsHttpMessage ** return srs_success; } -void MockHttpParser::reset() -{ - initialize_called_ = false; - parse_message_called_ = false; - srs_freep(initialize_error_); - srs_freep(parse_message_error_); - - for (vector::iterator it = messages_.begin(); it != messages_.end(); ++it) { - ISrsHttpMessage *msg = *it; - srs_freep(msg); - } - messages_.clear(); -} - // Old mock implementations for backward compatibility MockHttpxConn::MockHttpxConn() { @@ -2144,18 +2082,6 @@ srs_error_t MockHttpxConn::on_conn_done(srs_error_t r0) return r0; } -void MockHttpxConn::reset() -{ - on_start_called_ = false; - on_http_message_called_ = false; - on_message_done_called_ = false; - on_conn_done_called_ = false; - srs_freep(on_start_error_); - srs_freep(on_http_message_error_); - srs_freep(on_message_done_error_); - srs_freep(on_conn_done_error_); -} - MockHttpConn::MockHttpConn() { handler_ = new MockHttpxConn(); @@ -2420,3 +2346,277 @@ void MockHttpHooks::clear_calls() on_unpublish_calls_.clear(); on_unpublish_count_ = 0; } + +// Mock origin hub implementation +MockOriginHub::MockOriginHub() +{ + initialize_count_ = 0; + initialize_error_ = srs_success; + on_hls_request_sh_count_ = 0; + on_hls_request_sh_error_ = srs_success; + on_forwarder_start_count_ = 0; + on_forwarder_start_error_ = srs_success; + on_dvr_request_sh_count_ = 0; + on_dvr_request_sh_error_ = srs_success; +} + +MockOriginHub::~MockOriginHub() +{ + srs_freep(initialize_error_); + srs_freep(on_hls_request_sh_error_); + srs_freep(on_forwarder_start_error_); + srs_freep(on_dvr_request_sh_error_); +} + +srs_error_t MockOriginHub::initialize(SrsSharedPtr s, ISrsRequest *r) +{ + initialize_count_++; + return srs_error_copy(initialize_error_); +} + +void MockOriginHub::dispose() +{ +} + +srs_error_t MockOriginHub::cycle() +{ + return srs_success; +} + +bool MockOriginHub::active() +{ + return true; +} + +srs_utime_t MockOriginHub::cleanup_delay() +{ + return 0; +} + +srs_error_t MockOriginHub::on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet) +{ + return srs_success; +} + +srs_error_t MockOriginHub::on_audio(SrsMediaPacket *shared_audio) +{ + return srs_success; +} + +srs_error_t MockOriginHub::on_video(SrsMediaPacket *shared_video, bool is_sequence_header) +{ + return srs_success; +} + +srs_error_t MockOriginHub::on_publish() +{ + return srs_success; +} + +void MockOriginHub::on_unpublish() +{ +} + +srs_error_t MockOriginHub::on_dvr_request_sh() +{ + on_dvr_request_sh_count_++; + return srs_error_copy(on_dvr_request_sh_error_); +} + +srs_error_t MockOriginHub::on_forwarder_start(SrsForwarder *forwarder) +{ + on_forwarder_start_count_++; + return srs_error_copy(on_forwarder_start_error_); +} + +srs_error_t MockOriginHub::on_hls_request_sh() +{ + on_hls_request_sh_count_++; + return srs_error_copy(on_hls_request_sh_error_); +} + +void MockOriginHub::set_initialize_error(srs_error_t err) +{ + srs_freep(initialize_error_); + initialize_error_ = srs_error_copy(err); +} + +void MockOriginHub::set_on_hls_request_sh_error(srs_error_t err) +{ + srs_freep(on_hls_request_sh_error_); + on_hls_request_sh_error_ = srs_error_copy(err); +} + +void MockOriginHub::set_on_dvr_request_sh_error(srs_error_t err) +{ + srs_freep(on_dvr_request_sh_error_); + on_dvr_request_sh_error_ = srs_error_copy(err); +} + +void MockOriginHub::set_on_forwarder_start_error(srs_error_t err) +{ + srs_freep(on_forwarder_start_error_); + on_forwarder_start_error_ = srs_error_copy(err); +} + +// Mock ISrsBasicRtmpClient implementation +MockRtmpClient::MockRtmpClient() +{ + connect_called_ = false; + publish_called_ = false; + play_called_ = false; + close_called_ = false; + recv_message_called_ = false; + decode_message_called_ = false; + set_recv_timeout_called_ = false; + kbps_sample_called_ = false; + send_and_free_message_called_ = false; + connect_error_ = srs_success; + publish_error_ = srs_success; + play_error_ = srs_success; + send_and_free_message_error_ = srs_success; + publish_chunk_size_ = 0; + stream_id_ = 1; + recv_timeout_ = 0; + kbps_age_ = 0; + send_message_count_ = 0; + send_and_free_messages_count_ = 0; + + recv_err_ = srs_success; + cond_ = new SrsCond(); +} + +MockRtmpClient::~MockRtmpClient() +{ + srs_freep(connect_error_); + srs_freep(publish_error_); + srs_freep(play_error_); + srs_freep(send_and_free_message_error_); + + srs_freep(recv_err_); + srs_freep(cond_); + + for (vector::iterator it = recv_msgs_.begin(); it != recv_msgs_.end(); ++it) { + SrsRtmpCommonMessage *msg = *it; + srs_freep(msg); + } + recv_msgs_.clear(); +} + +srs_error_t MockRtmpClient::connect() +{ + connect_called_ = true; + return srs_error_copy(connect_error_); +} + +void MockRtmpClient::close() +{ + close_called_ = true; +} + +srs_error_t MockRtmpClient::publish(int chunk_size, bool with_vhost, std::string *pstream) +{ + publish_called_ = true; + publish_chunk_size_ = chunk_size; + if (pstream) { + publish_stream_ = *pstream; + } + return srs_error_copy(publish_error_); +} + +srs_error_t MockRtmpClient::play(int chunk_size, bool with_vhost, std::string *pstream) +{ + play_called_ = true; + if (pstream) { + // Return the stream name if play_stream_ is set, otherwise use the input + if (!play_stream_.empty()) { + *pstream = play_stream_; + } else { + *pstream = "livestream"; // Default stream name for compatibility + play_stream_ = *pstream; + } + } + return srs_error_copy(play_error_); +} + +void MockRtmpClient::kbps_sample(const char *label, srs_utime_t age) +{ + kbps_sample_called_ = true; + if (label) { + kbps_label_ = label; + } + kbps_age_ = age; +} + +void MockRtmpClient::kbps_sample(const char *label, srs_utime_t age, int msgs) +{ + kbps_sample_called_ = true; + if (label) { + kbps_label_ = label; + } + kbps_age_ = age; +} + +int MockRtmpClient::sid() +{ + return stream_id_; +} + +srs_error_t MockRtmpClient::recv_message(SrsRtmpCommonMessage **pmsg) +{ + recv_message_called_ = true; + + // No message received during playing util get control event. + if (recv_msgs_.empty()) { + cond_->wait(); + } + + if (recv_err_ != srs_success) { + return srs_error_copy(recv_err_); + } + + if (recv_msgs_.empty()) { + return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message"); + } + + SrsRtmpCommonMessage *msg = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + if (pmsg) { + *pmsg = msg; + } else { + srs_freep(msg); + } + + return srs_success; +} + +srs_error_t MockRtmpClient::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) +{ + decode_message_called_ = true; + return srs_success; +} + +srs_error_t MockRtmpClient::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) +{ + send_and_free_messages_count_ += nb_msgs; + return srs_success; +} + +srs_error_t MockRtmpClient::send_and_free_message(SrsMediaPacket *msg) +{ + send_and_free_message_called_ = true; + send_message_count_++; + return srs_error_copy(send_and_free_message_error_); +} + +void MockRtmpClient::set_recv_timeout(srs_utime_t timeout) +{ + set_recv_timeout_called_ = true; + recv_timeout_ = timeout; +} + +void MockRtmpClient::set_url(std::string url) +{ + url_ = url; +} diff --git a/trunk/src/utest/srs_utest_mock.hpp b/trunk/src/utest/srs_utest_mock.hpp index 521f35e30..69011fccc 100644 --- a/trunk/src/utest/srs_utest_mock.hpp +++ b/trunk/src/utest/srs_utest_mock.hpp @@ -169,7 +169,6 @@ public: virtual SrsSharedPtr fetch(ISrsRequest *r); void set_initialize_error(srs_error_t err); void set_fetch_or_create_error(srs_error_t err); - void reset(); }; // Mock statistic for testing @@ -208,7 +207,6 @@ public: virtual srs_error_t dumps_clients(SrsJsonArray *arr, int start, int count); virtual srs_error_t dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs); void set_on_client_error(srs_error_t err); - void reset(); }; // Mock RTC async task executor for testing @@ -226,7 +224,6 @@ public: public: virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t); void set_exec_error(srs_error_t err); - void reset(); }; // Mock RTC packet sender for testing @@ -244,7 +241,6 @@ public: public: virtual srs_error_t do_send_packet(SrsRtpPacket *pkt); void set_send_packet_error(srs_error_t err); - void reset(); }; // Mock app config for testing @@ -271,10 +267,17 @@ public: SrsConfDirective *default_vhost_; bool srt_to_rtmp_; bool rtc_from_rtmp_; + SrsConfDirective *forwards_directive_; + SrsConfDirective *backend_directive_; public: MockAppConfig(); virtual ~MockAppConfig(); + +public: + // Helper methods for setting forward configuration + void set_forward_destinations(const std::vector &destinations); + void set_forward_backend(const std::string &backend_url); // ISrsConfig methods virtual srs_utime_t get_pithy_print(); virtual std::string get_default_app_name(); @@ -585,7 +588,6 @@ public: void set_send_rtcp_xr_rrtr_error(srs_error_t err); void set_send_rtcp_error(srs_error_t err); void set_send_rtcp_fb_pli_error(srs_error_t err); - void reset(); }; // Mock ISrsSecurity for testing @@ -621,7 +623,6 @@ public: virtual srs_error_t initialize(); void set_fetch_or_create_error(srs_error_t err); void set_can_publish(bool can_publish); - void reset(); }; // Mock live source for testing SrsRtcPublishStream @@ -688,7 +689,6 @@ public: void set_initialize_error(srs_error_t err); void set_fetch_or_create_error(srs_error_t err); void set_can_publish(bool can_publish); - void reset(); }; class MockRtmpServer : public ISrsRtmpServer @@ -770,9 +770,6 @@ public: virtual void set_auto_response(bool v); virtual void set_merge_read(bool v, IMergeReadHandler *handler); virtual void set_recv_buffer(int buffer_size); - -public: - void reset(); }; // Mock ISrsProtocolReadWriter for testing SrsHttpConn::cycle() @@ -870,7 +867,6 @@ public: virtual srs_utime_t get_send_timeout(); virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite); virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); - void reset(); }; // Mock ISrsProtocolReadWriter for testing SrsSrtRecvThread @@ -934,7 +930,6 @@ public: virtual srs_error_t initialize(enum llhttp_type type); virtual void set_jsonp(bool allow_jsonp); virtual srs_error_t parse_message(ISrsReader *reader, ISrsHttpMessage **ppmsg); - void reset(); }; // Mock SrsHttpxConn for testing SrsLiveStream (old version for backward compatibility) @@ -963,7 +958,6 @@ public: virtual srs_error_t on_http_message(ISrsHttpMessage *r, ISrsHttpResponseWriter *w); virtual srs_error_t on_message_done(ISrsHttpMessage *r, ISrsHttpResponseWriter *w); virtual srs_error_t on_conn_done(srs_error_t r0); - void reset(); }; // Mock SrsHttpConn for testing SrsLiveStream (old version for backward compatibility) @@ -1074,4 +1068,92 @@ public: void clear_calls(); }; +// Mock origin hub for testing +class MockOriginHub : public ISrsOriginHub +{ +public: + int initialize_count_; + srs_error_t initialize_error_; + int on_hls_request_sh_count_; + srs_error_t on_hls_request_sh_error_; + int on_forwarder_start_count_; + srs_error_t on_forwarder_start_error_; + int on_dvr_request_sh_count_; + srs_error_t on_dvr_request_sh_error_; + +public: + MockOriginHub(); + virtual ~MockOriginHub(); + virtual srs_error_t initialize(SrsSharedPtr s, ISrsRequest *r); + virtual void dispose(); + virtual srs_error_t cycle(); + virtual bool active(); + virtual srs_utime_t cleanup_delay(); + virtual srs_error_t on_meta_data(SrsMediaPacket *shared_metadata, SrsOnMetaDataPacket *packet); + virtual srs_error_t on_audio(SrsMediaPacket *shared_audio); + virtual srs_error_t on_video(SrsMediaPacket *shared_video, bool is_sequence_header); + virtual srs_error_t on_publish(); + virtual void on_unpublish(); + virtual srs_error_t on_dvr_request_sh(); + virtual srs_error_t on_hls_request_sh(); + virtual srs_error_t on_forwarder_start(SrsForwarder *forwarder); + void set_initialize_error(srs_error_t err); + void set_on_hls_request_sh_error(srs_error_t err); + void set_on_forwarder_start_error(srs_error_t err); + void set_on_dvr_request_sh_error(srs_error_t err); +}; + +// Mock ISrsBasicRtmpClient for testing SrsForwarder +class MockRtmpClient : public ISrsBasicRtmpClient +{ +public: + bool connect_called_; + bool publish_called_; + bool play_called_; + bool close_called_; + bool recv_message_called_; + bool decode_message_called_; + bool set_recv_timeout_called_; + bool kbps_sample_called_; + bool send_and_free_message_called_; + srs_error_t connect_error_; + srs_error_t publish_error_; + srs_error_t play_error_; + srs_error_t send_and_free_message_error_; + std::string publish_stream_; + std::string play_stream_; + int publish_chunk_size_; + int stream_id_; + std::string url_; + srs_utime_t recv_timeout_; + std::string kbps_label_; + srs_utime_t kbps_age_; + int send_message_count_; + int send_and_free_messages_count_; + +public: + srs_error_t recv_err_; + std::vector recv_msgs_; + SrsCond *cond_; + +public: + MockRtmpClient(); + virtual ~MockRtmpClient(); + virtual srs_error_t connect(); + virtual void close(); + virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); + virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL); + virtual void kbps_sample(const char *label, srs_utime_t age); + virtual void kbps_sample(const char *label, srs_utime_t age, int msgs); + virtual int sid(); + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs); + virtual srs_error_t send_and_free_message(SrsMediaPacket *msg); + virtual void set_recv_timeout(srs_utime_t timeout); + +public: + virtual void set_url(std::string url); +}; + #endif diff --git a/trunk/src/utest/srs_utest_rtc_conn.cpp b/trunk/src/utest/srs_utest_rtc_conn.cpp index 2a65644c3..ec6ad3618 100644 --- a/trunk/src/utest/srs_utest_rtc_conn.cpp +++ b/trunk/src/utest/srs_utest_rtc_conn.cpp @@ -104,7 +104,7 @@ srs_error_t MockRtcSourceForRtcConn::on_rtp(SrsRtpPacket *pkt) // This test is used to verify the basic workflow of the RTC connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtcConnTest, ManuallyVerifyBasicWorkflowForPlayer) +VOID TEST(BasicWorkflowRtcConnTest, ManuallyVerifyForPlayer) { srs_error_t err; @@ -254,7 +254,7 @@ VOID TEST(RtcConnTest, ManuallyVerifyBasicWorkflowForPlayer) // This test is used to verify the basic workflow of the RTC connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtcConnTest, ManuallyVerifyBasicWorkflowForPublisher) +VOID TEST(BasicWorkflowRtcConnTest, ManuallyVerifyForPublisher) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_rtc_playstream.cpp b/trunk/src/utest/srs_utest_rtc_playstream.cpp index 75dd079f1..9b2ee1abf 100644 --- a/trunk/src/utest/srs_utest_rtc_playstream.cpp +++ b/trunk/src/utest/srs_utest_rtc_playstream.cpp @@ -16,7 +16,7 @@ // This test is used to verify the basic workflow of the RTC play stream. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtcPlayStreamTest, ManuallyVerifyBasicWorkflow) +VOID TEST(BasicWorkflowRtcPlayStreamTest, ManuallyVerify) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_rtc_publishstream.cpp b/trunk/src/utest/srs_utest_rtc_publishstream.cpp index 5fb771fb3..1b829aa49 100644 --- a/trunk/src/utest/srs_utest_rtc_publishstream.cpp +++ b/trunk/src/utest/srs_utest_rtc_publishstream.cpp @@ -32,7 +32,7 @@ // This test is used to verify the basic workflow of the RTC publish stream. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtcPublishStreamTest, ManuallyVerifyBasicWorkflow) +VOID TEST(BasicWorkflowRtcPublishStreamTest, ManuallyVerify) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_rtmp_conn.cpp b/trunk/src/utest/srs_utest_rtmp_conn.cpp index 60722df11..fb5453173 100644 --- a/trunk/src/utest/srs_utest_rtmp_conn.cpp +++ b/trunk/src/utest/srs_utest_rtmp_conn.cpp @@ -41,7 +41,7 @@ // This test is used to verify the basic workflow of the RTMP connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPublisher) +VOID TEST(BasicWorkflowRtmpConnTest, ManuallyVerifyForPublisher) { srs_error_t err; @@ -213,7 +213,7 @@ VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPublisher) // This test is used to verify the basic workflow of the RTMP connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPlayer) +VOID TEST(BasicWorkflowRtmpConnTest, ManuallyVerifyForPlayer) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index e347cbf4d..ad6f9a35f 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -100,6 +100,7 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -113,6 +114,7 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -135,6 +137,7 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -159,6 +162,7 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -194,6 +198,7 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -216,6 +221,7 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); @@ -238,6 +244,7 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpListener l(&h); l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); diff --git a/trunk/src/utest/srs_utest_srt_conn.cpp b/trunk/src/utest/srs_utest_srt_conn.cpp index 55662d0e6..e2040a94f 100644 --- a/trunk/src/utest/srs_utest_srt_conn.cpp +++ b/trunk/src/utest/srs_utest_srt_conn.cpp @@ -38,7 +38,7 @@ // This test is used to verify the basic workflow of the SRT connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPublisher) +VOID TEST(BasicWorkflowSrtConnTest, ManuallyVerifyForPublisher) { srs_error_t err; @@ -138,7 +138,7 @@ VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPublisher) // This test is used to verify the basic workflow of the SRT connection. // It's finished with the help of AI, but each step is manually designed // and verified. So this is not dominated by AI, but by humanbeing. -VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPlayer) +VOID TEST(BasicWorkflowSrtConnTest, ManuallyVerifyForPlayer) { srs_error_t err;