From 341c0c000cc6214b731309228713562ef58256cd Mon Sep 17 00:00:00 2001 From: OSSRS-AI Date: Sun, 19 Oct 2025 19:37:00 -0400 Subject: [PATCH] AI: Add workflow utest for http stream. --- trunk/src/app/srs_app_factory.cpp | 2 +- trunk/src/app/srs_app_http_conn.cpp | 2 +- trunk/src/app/srs_app_http_stream.cpp | 20 +- trunk/src/utest/srs_utest.hpp | 6 +- trunk/src/utest/srs_utest_ai11.cpp | 91 +---- trunk/src/utest/srs_utest_ai11.hpp | 27 -- trunk/src/utest/srs_utest_ai14.cpp | 2 +- trunk/src/utest/srs_utest_ai16.cpp | 151 +------ trunk/src/utest/srs_utest_ai16.hpp | 31 -- trunk/src/utest/srs_utest_http_conn.cpp | 215 ++++++++-- trunk/src/utest/srs_utest_mock.cpp | 370 ++++++++++++++++-- trunk/src/utest/srs_utest_mock.hpp | 85 +++- trunk/src/utest/srs_utest_rtc_playstream.cpp | 2 +- .../src/utest/srs_utest_rtc_publishstream.cpp | 2 +- trunk/src/utest/srs_utest_rtmp_conn.cpp | 4 +- trunk/src/utest/srs_utest_srt_conn.cpp | 4 +- 16 files changed, 650 insertions(+), 364 deletions(-) diff --git a/trunk/src/app/srs_app_factory.cpp b/trunk/src/app/srs_app_factory.cpp index d9023e34f..7322d521d 100644 --- a/trunk/src/app/srs_app_factory.cpp +++ b/trunk/src/app/srs_app_factory.cpp @@ -32,9 +32,9 @@ #include #include #include +#include #include #include -#include ISrsAppFactory::ISrsAppFactory() { diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 5d83add9a..463ac997d 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -15,6 +15,7 @@ using namespace std; #include +#include #include #include #include @@ -40,7 +41,6 @@ using namespace std; #include #include #include -#include ISrsHttpConnOwner::ISrsHttpConnOwner() { diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 0f5aee1cb..883d97aa1 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -681,24 +681,28 @@ srs_error_t SrsLiveStream::serve_http_impl(ISrsHttpResponseWriter *w, ISrsHttpMe // Note that we should enable stat for HTTP streaming client, because each HTTP streaming connection is a real // session that should have statistics for itself. + srs_assert(hxc); hxc->set_enable_stat(true); + // Create a distinct request for this request. + SrsUniquePtr req(req_->copy()->as_http()); + // Correct the app and stream by path, which is created from template. // @remark Be careful that the stream has extension now, might cause identify fail. SrsPath path; - req_->stream_ = path.filepath_base(r->path()); + req->stream_ = path.filepath_base(r->path()); // remove the extension of stream if have. for instance, test.flv -> test - req_->stream_ = path.filepath_filename(req_->stream_); + req->stream_ = path.filepath_filename(req->stream_); // update client ip - req_->ip_ = hc->remote_ip(); + req->ip_ = hc->remote_ip(); // We must do stat the client before hooks, because hooks depends on it. - if ((err = stat_->on_client(_srs_context->get_id().c_str(), req_, hc, SrsFlvPlay)) != srs_success) { + if ((err = stat_->on_client(_srs_context->get_id().c_str(), req.get(), hc, SrsFlvPlay)) != srs_success) { return srs_error_wrap(err, "stat on client"); } - if ((err = security_->check(SrsFlvPlay, req_->ip_, req_)) != srs_success) { + if ((err = security_->check(SrsFlvPlay, req->ip_, req.get())) != srs_success) { return srs_error_wrap(err, "flv: security check"); } @@ -714,13 +718,13 @@ srs_error_t SrsLiveStream::serve_http_impl(ISrsHttpResponseWriter *w, ISrsHttpMe // Always try to create the source, because http handler won't create it. SrsSharedPtr live_source; - if ((err = live_sources_->fetch_or_create(req_, live_source)) != srs_success) { + if ((err = live_sources_->fetch_or_create(req.get(), live_source)) != srs_success) { return srs_error_wrap(err, "source create"); } srs_assert(live_source.get() != NULL); - bool enabled_cache = config_->get_gop_cache(req_->vhost_); - int gcmf = config_->get_gop_cache_max_frames(req_->vhost_); + bool enabled_cache = config_->get_gop_cache(req->vhost_); + int gcmf = config_->get_gop_cache_max_frames(req->vhost_); live_source->set_cache(enabled_cache); live_source->set_gop_cache_max_frames(gcmf); diff --git a/trunk/src/utest/srs_utest.hpp b/trunk/src/utest/srs_utest.hpp index 902f35ed9..3ecd21d48 100644 --- a/trunk/src/utest/srs_utest.hpp +++ b/trunk/src/utest/srs_utest.hpp @@ -260,7 +260,7 @@ public: // For example: // SrsCoroutineChan ctx; // ctx.push(1); -// SRS_COROUTINE_GO_CTX(ctx, { +// SRS_COROUTINE_GO_CTX(&ctx, { // int v = (int)ctx.pop(); // srs_usleep(v * SRS_UTIME_MILLISECONDS); // }); @@ -272,11 +272,11 @@ public: // For example: // SrsCoroutineChan ctx; // ctx.push(1); -// SRS_COROUTINE_GO_CTX2(ctx, coroutine1, { +// SRS_COROUTINE_GO_CTX2(&ctx, coroutine1, { // int v = (int)ctx.pop(); // srs_usleep(v * SRS_UTIME_MILLISECONDS); // }); -// SRS_COROUTINE_GO_CTX2(ctx, coroutine2, { +// SRS_COROUTINE_GO_CTX2(&ctx, coroutine2, { // int v = (int)ctx.pop(); // srs_usleep(v * SRS_UTIME_MILLISECONDS); // }); diff --git a/trunk/src/utest/srs_utest_ai11.cpp b/trunk/src/utest/srs_utest_ai11.cpp index 9d8893587..8380d5973 100644 --- a/trunk/src/utest/srs_utest_ai11.cpp +++ b/trunk/src/utest/srs_utest_ai11.cpp @@ -2253,83 +2253,6 @@ VOID TEST(RtcPliWorkerTest, ErrorHandling) EXPECT_TRUE(mock_handler.has_keyframe_request(ssrc2, cid2)); } -// Mock HTTP hooks implementation -MockHttpHooks::MockHttpHooks() -{ - on_stop_count_ = 0; - on_unpublish_count_ = 0; -} - -MockHttpHooks::~MockHttpHooks() -{ - clear_calls(); -} - -srs_error_t MockHttpHooks::on_connect(std::string url, ISrsRequest *req) -{ - return srs_success; -} - -void MockHttpHooks::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) -{ -} - -srs_error_t MockHttpHooks::on_publish(std::string url, ISrsRequest *req) -{ - return srs_success; -} - -void MockHttpHooks::on_unpublish(std::string url, ISrsRequest *req) -{ - on_unpublish_count_++; - on_unpublish_calls_.push_back(std::make_pair(url, req)); -} - -srs_error_t MockHttpHooks::on_play(std::string url, ISrsRequest *req) -{ - return srs_success; -} - -void MockHttpHooks::on_stop(std::string url, ISrsRequest *req) -{ - on_stop_count_++; - on_stop_calls_.push_back(std::make_pair(url, req)); -} - -srs_error_t MockHttpHooks::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) -{ - return srs_success; -} - -srs_error_t MockHttpHooks::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, - std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) -{ - return srs_success; -} - -srs_error_t MockHttpHooks::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) -{ - return srs_success; -} - -srs_error_t MockHttpHooks::discover_co_workers(std::string url, std::string &host, int &port) -{ - return srs_success; -} - -srs_error_t MockHttpHooks::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) -{ - return srs_success; -} - -void MockHttpHooks::clear_calls() -{ - on_stop_calls_.clear(); - on_stop_count_ = 0; - on_unpublish_calls_.clear(); - on_unpublish_count_ = 0; -} - // Mock context implementation MockContext::MockContext() { @@ -2565,7 +2488,7 @@ VOID TEST(RtcPlayStreamTest, InitializeSuccess) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_async_executor; MockExpire mock_expire; @@ -2639,7 +2562,7 @@ VOID TEST(RtcPlayStreamTest, OnStreamChangeSuccess) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_async_executor; MockExpire mock_expire; @@ -2818,7 +2741,7 @@ VOID TEST(RtcPlayStreamTest, SendPacketBasic) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_async_executor; MockExpire mock_expire; @@ -3045,7 +2968,7 @@ VOID TEST(RtcPlayStreamTest, OnRtcpDispatch) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_source_manager; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncTaskExecutor mock_executor; MockRtcPacketSender mock_sender; MockExpire mock_expire; @@ -3103,7 +3026,7 @@ VOID TEST(RtcPlayStreamTest, OnRtcpNack) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_source_manager; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncTaskExecutor mock_executor; MockRtcPacketSender mock_sender; MockExpire mock_expire; @@ -3290,7 +3213,7 @@ VOID TEST(RtcPlayStreamTest, DoRequestKeyframe) // Create mock objects MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_async_executor; MockExpire mock_expire; @@ -3818,7 +3741,7 @@ VOID TEST(RtcPublishStreamTest, Initialize) srs_error_t err; // Create mock objects - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; MockLiveSourceManager mock_live_sources; diff --git a/trunk/src/utest/srs_utest_ai11.hpp b/trunk/src/utest/srs_utest_ai11.hpp index 249dcd9fa..e0420aed6 100644 --- a/trunk/src/utest/srs_utest_ai11.hpp +++ b/trunk/src/utest/srs_utest_ai11.hpp @@ -265,33 +265,6 @@ public: int get_keyframe_request_count(); }; -// Mock HTTP hooks for testing SrsRtcAsyncCallOnStop -class MockHttpHooks : public ISrsHttpHooks -{ -public: - std::vector > on_stop_calls_; - int on_stop_count_; - std::vector > on_unpublish_calls_; - int on_unpublish_count_; - -public: - MockHttpHooks(); - virtual ~MockHttpHooks(); - virtual srs_error_t on_connect(std::string url, ISrsRequest *req); - virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); - virtual srs_error_t on_publish(std::string url, ISrsRequest *req); - virtual void on_unpublish(std::string url, ISrsRequest *req); - virtual srs_error_t on_play(std::string url, ISrsRequest *req); - virtual void on_stop(std::string url, ISrsRequest *req); - virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); - virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, - std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); - virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); - virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); - virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); - void clear_calls(); -}; - // Mock context for testing SrsRtcAsyncCallOnStop class MockContext : public ISrsContext { diff --git a/trunk/src/utest/srs_utest_ai14.cpp b/trunk/src/utest/srs_utest_ai14.cpp index b9c6e3aa9..b4393e672 100644 --- a/trunk/src/utest/srs_utest_ai14.cpp +++ b/trunk/src/utest/srs_utest_ai14.cpp @@ -1777,7 +1777,7 @@ VOID TEST(AppOriginHubTest, OnAudioTypicalScenario) MockHlsRequest mock_req; // Create mock statistic - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; // Create origin hub SrsUniquePtr hub(new SrsOriginHub()); diff --git a/trunk/src/utest/srs_utest_ai16.cpp b/trunk/src/utest/srs_utest_ai16.cpp index 4cd532c70..e6358c41f 100644 --- a/trunk/src/utest/srs_utest_ai16.cpp +++ b/trunk/src/utest/srs_utest_ai16.cpp @@ -34,44 +34,6 @@ extern srs_error_t _srs_reload_err; extern SrsReloadState _srs_reload_state; extern std::string _srs_reload_id; -MockBufferCacheForAac::MockBufferCacheForAac() -{ - dump_cache_count_ = 0; - last_consumer_ = NULL; - last_jitter_ = SrsRtmpJitterAlgorithmOFF; -} - -MockBufferCacheForAac::~MockBufferCacheForAac() -{ -} - -srs_error_t MockBufferCacheForAac::start() -{ - return srs_success; -} - -void MockBufferCacheForAac::stop() -{ -} - -bool MockBufferCacheForAac::alive() -{ - return true; -} - -srs_error_t MockBufferCacheForAac::dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter) -{ - dump_cache_count_++; - last_consumer_ = consumer; - last_jitter_ = jitter; - return srs_success; -} - -srs_error_t MockBufferCacheForAac::update_auth(ISrsRequest *r) -{ - return srs_success; -} - VOID TEST(KernelBalanceTest, RoundRobinBasicSelection) { // Test the major use scenario: round-robin selection across multiple servers @@ -117,79 +79,6 @@ VOID TEST(KernelBalanceTest, RoundRobinBasicSelection) EXPECT_EQ(2, (int)lb->current()); } -// Mock request implementation for SrsBufferCache testing -MockBufferCacheRequest::MockBufferCacheRequest(std::string vhost, std::string app, std::string stream) -{ - vhost_ = vhost; - app_ = app; - stream_ = stream; - host_ = "127.0.0.1"; - port_ = 1935; - tcUrl_ = "rtmp://127.0.0.1/" + app; - schema_ = "rtmp"; - param_ = ""; - duration_ = 0; - args_ = NULL; - protocol_ = "rtmp"; - objectEncoding_ = 0; -} - -MockBufferCacheRequest::~MockBufferCacheRequest() -{ -} - -ISrsRequest *MockBufferCacheRequest::copy() -{ - MockBufferCacheRequest *req = new MockBufferCacheRequest(vhost_, app_, stream_); - req->host_ = host_; - req->port_ = port_; - req->tcUrl_ = tcUrl_; - req->pageUrl_ = pageUrl_; - req->swfUrl_ = swfUrl_; - req->schema_ = schema_; - req->param_ = param_; - req->duration_ = duration_; - req->protocol_ = protocol_; - req->objectEncoding_ = objectEncoding_; - req->ip_ = ip_; - return req; -} - -std::string MockBufferCacheRequest::get_stream_url() -{ - if (vhost_ == "__defaultVhost__" || vhost_.empty()) { - return "/" + app_ + "/" + stream_; - } else { - return vhost_ + "/" + app_ + "/" + stream_; - } -} - -void MockBufferCacheRequest::update_auth(ISrsRequest *req) -{ - if (req) { - pageUrl_ = req->pageUrl_; - swfUrl_ = req->swfUrl_; - tcUrl_ = req->tcUrl_; - } -} - -void MockBufferCacheRequest::strip() -{ - // Mock implementation - basic string cleanup - host_ = srs_strings_remove(host_, "/ \n\r\t"); - vhost_ = srs_strings_remove(vhost_, "/ \n\r\t"); - app_ = srs_strings_remove(app_, " \n\r\t"); - stream_ = srs_strings_remove(stream_, " \n\r\t"); - - app_ = srs_strings_trim_end(app_, "/"); - stream_ = srs_strings_trim_end(stream_, "/"); -} - -ISrsRequest *MockBufferCacheRequest::as_http() -{ - return copy(); -} - VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth) { srs_error_t err; @@ -198,7 +87,7 @@ VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth) // This covers the typical HTTP streaming cache initialization use case // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); mock_request->pageUrl_ = "http://example.com/page"; mock_request->swfUrl_ = "http://example.com/player.swf"; mock_request->tcUrl_ = "rtmp://127.0.0.1/live"; @@ -224,7 +113,7 @@ VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth) EXPECT_TRUE(cache->live_sources_ != NULL); // Test update_auth - should update the request with new auth info - SrsUniquePtr new_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr new_request(new MockRequest("test.vhost", "live", "stream1")); new_request->pageUrl_ = "http://example.com/new_page"; new_request->swfUrl_ = "http://example.com/new_player.swf"; new_request->tcUrl_ = "rtmp://127.0.0.1/live_new"; @@ -248,7 +137,7 @@ VOID TEST(SrsBufferCacheTest, DumpCacheWithMessages) // This covers the typical HTTP streaming cache dump use case // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create buffer cache SrsUniquePtr cache(new SrsBufferCache(mock_request.get())); @@ -658,7 +547,7 @@ VOID TEST(AppHttpStreamTest, AacStreamEncoderMajorScenario) SrsUniquePtr writer(new MockSrsFileWriter()); HELPER_EXPECT_SUCCESS(writer->open("test.aac")); - MockBufferCacheForAac mock_cache; + MockBufferCache mock_cache; // Create AAC stream encoder SrsUniquePtr encoder(new SrsAacStreamEncoder()); @@ -836,10 +725,10 @@ VOID TEST(SrsLiveStreamTest, ServeHttpWithDisabledEntry) // security check and HTTP hooks // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create mock buffer cache - SrsUniquePtr mock_cache(new MockBufferCacheForAac()); + SrsUniquePtr mock_cache(new MockBufferCache()); // Create SrsLiveStream SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); @@ -1116,10 +1005,10 @@ VOID TEST(SrsLiveStreamTest, HttpHooksOnPlayAndStop) // This covers the typical HTTP-FLV/HLS streaming hook notification use case // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create mock buffer cache - SrsUniquePtr mock_cache(new MockBufferCacheForAac()); + SrsUniquePtr mock_cache(new MockBufferCache()); // Create SrsLiveStream SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); @@ -1392,7 +1281,7 @@ VOID TEST(SrsHttpStreamServerTest, HttpMountAndUnmount) server->templateHandlers_[vhost] = tmpl; // Create mock request for stream - SrsUniquePtr mock_request(new MockBufferCacheRequest(vhost, "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest(vhost, "live", "stream1")); // Test http_mount - should create stream entry from template HELPER_EXPECT_SUCCESS(server->http_mount(mock_request.get())); @@ -1947,8 +1836,8 @@ VOID TEST(SrsLiveStreamTest, StreamingSendMessagesWithMixedPackets) // This covers the typical HTTP streaming workflow where encoder writes different packet types // Create mock request and buffer cache - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); - SrsUniquePtr mock_cache(new MockBufferCacheForAac()); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_cache(new MockBufferCache()); // Create SrsLiveStream SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); @@ -2034,10 +1923,10 @@ VOID TEST(SrsLiveStreamTest, DoServeHttpFlvWithDisabledEntry) // This covers the typical HTTP-FLV streaming initialization and immediate exit scenario // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create mock buffer cache - SrsUniquePtr mock_cache(new MockBufferCacheForAac()); + SrsUniquePtr mock_cache(new MockBufferCache()); // Create SrsLiveStream SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); @@ -2080,10 +1969,10 @@ VOID TEST(SrsLiveStreamTest, AliveAndExpireWithViewers) // where multiple viewers are watching the same stream and need to be expired // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create mock buffer cache - SrsUniquePtr mock_cache(new MockBufferCacheForAac()); + SrsUniquePtr mock_cache(new MockBufferCache()); // Create SrsLiveStream SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); @@ -2143,7 +2032,7 @@ VOID TEST(HttpStreamDestroyTest, DestroyStreamSuccess) MockBufferCacheForDestroy *mock_cache = new MockBufferCacheForDestroy(); // Create mock request - MockBufferCacheRequest *mock_req = new MockBufferCacheRequest(); + MockRequest *mock_req = new MockRequest(); entry->stream_ = mock_stream; entry->cache_ = mock_cache; @@ -2180,7 +2069,7 @@ VOID TEST(SrsBufferCacheTest, StopAndAlive) // This covers the typical HTTP streaming cache lifecycle management use case // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create buffer cache SrsUniquePtr cache(new SrsBufferCache(mock_request.get())); @@ -2225,7 +2114,7 @@ VOID TEST(SrsBufferCacheTest, CycleWithThreadPullError) // is interrupted or encounters an error, causing the cycle to exit // Create mock request - SrsUniquePtr mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1")); + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "stream1")); // Create buffer cache SrsUniquePtr cache(new SrsBufferCache(mock_request.get())); @@ -2266,7 +2155,7 @@ VOID TEST(AppHttpStreamTest, Mp3StreamEncoderMajorScenario) SrsUniquePtr writer(new MockSrsFileWriter()); HELPER_EXPECT_SUCCESS(writer->open("test.mp3")); - MockBufferCacheForAac mock_cache; + MockBufferCache mock_cache; // Create MP3 stream encoder SrsUniquePtr encoder(new SrsMp3StreamEncoder()); @@ -3234,7 +3123,7 @@ VOID TEST(HTTPApiTest, ClientsApiGetSpecificClient) test_client->type_ = SrsRtmpConnPlay; // Create mock request for the client - SrsStatisticClient destructor will free this - MockBufferCacheRequest *mock_req = new MockBufferCacheRequest("__defaultVhost__", "live", "livestream"); + MockRequest *mock_req = new MockRequest("__defaultVhost__", "live", "livestream"); test_client->req_ = mock_req; // Create mock vhost and stream for the client diff --git a/trunk/src/utest/srs_utest_ai16.hpp b/trunk/src/utest/srs_utest_ai16.hpp index d07e45d9e..8a682b1ee 100644 --- a/trunk/src/utest/srs_utest_ai16.hpp +++ b/trunk/src/utest/srs_utest_ai16.hpp @@ -21,37 +21,6 @@ #include #include -// Mock request class for testing SrsBufferCache -class MockBufferCacheRequest : public ISrsRequest -{ -public: - MockBufferCacheRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test"); - virtual ~MockBufferCacheRequest(); - virtual ISrsRequest *copy(); - virtual std::string get_stream_url(); - virtual void update_auth(ISrsRequest *req); - virtual void strip(); - virtual ISrsRequest *as_http(); -}; - -// Mock buffer cache for testing AAC stream encoder -class MockBufferCacheForAac : public ISrsBufferCache -{ -public: - int dump_cache_count_; - ISrsLiveConsumer *last_consumer_; - SrsRtmpJitterAlgorithm last_jitter_; - -public: - MockBufferCacheForAac(); - virtual ~MockBufferCacheForAac(); - virtual srs_error_t start(); - virtual void stop(); - virtual bool alive(); - virtual srs_error_t dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter); - virtual srs_error_t update_auth(ISrsRequest *r); -}; - // Mock SrsHttpxConn for testing SrsLiveStream - inherits from real SrsHttpxConn class MockHttpxConnForLiveStream : public SrsHttpxConn { diff --git a/trunk/src/utest/srs_utest_http_conn.cpp b/trunk/src/utest/srs_utest_http_conn.cpp index 6ca1967ac..0771e7024 100644 --- a/trunk/src/utest/srs_utest_http_conn.cpp +++ b/trunk/src/utest/srs_utest_http_conn.cpp @@ -24,12 +24,14 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -50,11 +52,6 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpRequest) MockHttpParser *mock_parser = new MockHttpParser(); SrsUniquePtr mock_http_mux(new MockHttpServeMux()); - // Setup mock config - mock_config->default_vhost_ = new SrsConfDirective(); - mock_config->default_vhost_->name_ = "vhost"; - mock_config->default_vhost_->args_.push_back("__defaultVhost__"); - // Create SrsHttpConn - it takes ownership of mock_io SrsUniquePtr conn(new SrsHttpConn(mock_handler.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080)); @@ -103,38 +100,30 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest) MockHttpParser *mock_parser = new MockHttpParser(); SrsUniquePtr mock_http_mux(new MockHttpServeMux()); MockSslConnection *mock_ssl = new MockSslConnection(); - - // Setup mock config - mock_config->default_vhost_ = new SrsConfDirective(); - mock_config->default_vhost_->name_ = "vhost"; - mock_config->default_vhost_->args_.push_back("__defaultVhost__"); - - // Create mock resource manager SrsUniquePtr mock_manager(new MockConnectionManager()); // Create SrsHttpxConn - it takes ownership of mock_io // Set key and cert to empty to disable SSL - SrsUniquePtr conn(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", "")); + SrsUniquePtr connx(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", "")); // Inject mock dependencies into private fields - conn->config_ = mock_config.get(); + connx->config_ = mock_config.get(); // Inject mock SSL connection - srs_freep(conn->ssl_); - conn->ssl_ = mock_ssl; + srs_freep(connx->ssl_); + connx->ssl_ = mock_ssl; // Access the internal SrsHttpConn through conn_ field (cast from ISrsHttpConn* to SrsHttpConn*) - SrsHttpConn *http_conn = new SrsHttpConn(conn.get(), mock_ssl, mock_http_mux.get(), "192.168.1.100", 8080); - http_conn->config_ = mock_config.get(); - http_conn->app_factory_ = mock_app_factory.get(); - srs_freep(http_conn->parser_); - http_conn->parser_ = mock_parser; + SrsHttpConn *conn = new SrsHttpConn(connx.get(), mock_ssl, mock_http_mux.get(), "192.168.1.100", 8080); + conn->config_ = mock_config.get(); + conn->app_factory_ = mock_app_factory.get(); + srs_freep(conn->parser_); + conn->parser_ = mock_parser; - // Inject the mock SrsHttpConn into conn_ field - srs_freep(conn->conn_); - conn->conn_ = http_conn; + srs_freep(connx->conn_); + connx->conn_ = conn; // Start the HTTPx connection - HELPER_EXPECT_SUCCESS(conn->start()); + HELPER_EXPECT_SUCCESS(connx->start()); // Wait for coroutine to start srs_usleep(1 * SRS_UTIME_MILLISECONDS); @@ -160,3 +149,179 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest) EXPECT_EQ(1, mock_http_mux->serve_http_count_); } +// This test is used to verify the basic workflow of the HTTP FLV streaming. +// It's finished with the help of AI, but each step is manually designed +// and verified. So this is not dominated by AI, but by humanbeing. +VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpStream) +{ + srs_error_t err; + + // Create SrsLiveStream object under test + SrsUniquePtr mock_request(new MockRequest("test.vhost", "live", "livestream")); + SrsUniquePtr mock_cache(new MockBufferCache()); + SrsUniquePtr live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get())); + + // Create mock dependencies + SrsUniquePtr mock_config(new MockAppConfig()); + SrsUniquePtr mock_live_sources(new MockLiveSourceManager()); + SrsUniquePtr mock_stat(new MockAppStatistic()); + SrsUniquePtr mock_hooks(new MockHttpHooks()); + SrsUniquePtr mock_writer(new MockResponseWriter()); + SrsUniquePtr mock_message(new MockHttpMessage()); + SrsUniquePtr mock_entry(new SrsHttpMuxEntry()); + MockProtocolReadWriter *mock_io = new MockProtocolReadWriter(); + SrsUniquePtr mock_http_mux(new MockHttpServeMux()); + SrsUniquePtr mock_manager(new MockConnectionManager()); + SrsUniquePtr mock_app_factory(new MockAppFactory()); + MockHttpParser *mock_parser = new MockHttpParser(); + SrsUniquePtr connx(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", "")); + SrsHttpConn *conn = new SrsHttpConn(connx.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080); + + // Inject mock dependencies into SrsLiveStream private fields + live_stream->config_ = mock_config.get(); + live_stream->live_sources_ = mock_live_sources.get(); + live_stream->stat_ = mock_stat.get(); + live_stream->hooks_ = mock_hooks.get(); + + // Do not wait for utest, consume messages immediately. Remove this when HTTP stream use cond signal. + mock_config->mw_sleep_ = 0; + + mock_entry->enabled = true; + mock_entry->pattern = "/live/livestream.flv"; + live_stream->entry_ = mock_entry.get(); + + connx->config_ = mock_config.get(); + conn->config_ = mock_config.get(); + conn->app_factory_ = mock_app_factory.get(); + srs_freep(conn->parser_); + conn->parser_ = mock_parser; + srs_freep(connx->conn_); + connx->conn_ = conn; + mock_message->set_connection(conn); + + // Start a coroutine to run the streaming, because it's a blocking operation + SrsCoroutineChan ctx; + ctx.push(live_stream.get()); + ctx.push(mock_writer.get()); + ctx.push(mock_message.get()); + srs_error_t r0 = srs_success; + ctx.push(&r0); + SrsUniquePtr cond(new SrsCond()); + ctx.push(cond.get()); + SRS_COROUTINE_GO_CTX(&ctx, { + SrsLiveStream *live_stream = (SrsLiveStream *)ctx.pop(); + ISrsHttpResponseWriter *mock_writer = (ISrsHttpResponseWriter *)ctx.pop(); + ISrsHttpMessage *mock_message = (ISrsHttpMessage *)ctx.pop(); + srs_error_t *r0 = (srs_error_t *)ctx.pop(); + SrsCond *cond = (SrsCond *)ctx.pop(); + *r0 = live_stream->serve_http(mock_writer, mock_message); + cond->signal(); + }); + + // Wait for coroutine to start + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify that live source has a consumer + MockLiveSource *mock_source = dynamic_cast(mock_live_sources->mock_source_.get()); + EXPECT_EQ(1, (int)mock_source->consumers_.size()); + EXPECT_EQ(1, mock_source->on_dump_packets_count_); + + // Feed SSL a message to cover the http recv thread. + if (true) { + mock_io->recv_msgs_.push_back("test data"); + mock_io->cond_->signal(); + + // Wait for http thread to consume the message. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify that the recv thread consumed the message. + EXPECT_EQ(1, mock_io->read_count_); + } + + // Create an RTMP audio message to feed consumer. + if (true) { + // Create a real AAC audio message with proper format. + // AAC audio format in RTMP/FLV: + // Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType + // SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo) + // = 0xAF + // Byte 1: AACPacketType (0=sequence header, 1=raw data) + // Remaining bytes: AAC data + int payload_size = 10; + SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage(); + msg->header_.initialize_audio(payload_size, 0, 1); + msg->create_payload(payload_size); + + // Fill in AAC audio data + SrsBuffer stream(msg->payload(), payload_size); + // Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF + stream.write_1bytes(0xAF); + // AAC packet type: 1 = AAC raw data + stream.write_1bytes(0x01); + // AAC raw data (8 bytes of dummy audio data) + for (int i = 0; i < 8; i++) { + stream.write_1bytes(0x00); + } + + // Feed audio to source. + SrsLiveSource *source = mock_source; + HELPER_EXPECT_SUCCESS(source->on_audio(msg)); + + // Wait for consumer to process the message. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the mock response writer received the audio data + EXPECT_EQ(101, mock_writer->io.out_buffer.length()); + } + + // Create an RTMP video message to feed consumer. + if (true) { + // Create a real H.264 video message with proper format. + // H.264 video format in RTMP/FLV: + // Byte 0: (FrameType << 4) | CodecID (CodecID=7 for H.264) + // FrameType=1 (key frame), CodecID=7 (H.264) = 0x17 + // Byte 1: AVCPacketType (0=sequence header, 1=NALU, 2=end of sequence) + // Byte 2-4: CompositionTime (3bytes little-endian int24) + // Remaining bytes: H.264 data + int payload_size = 10; + SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage(); + msg->header_.initialize_video(payload_size, 0, 1); + msg->create_payload(payload_size); + + // Fill in H.264 video data + SrsBuffer stream(msg->payload(), payload_size); + // Frame type & Codec ID: Key frame (1) + H.264 (7) = 0x17 + stream.write_1bytes(0x17); + // AVC packet type: 1 = NALU + stream.write_1bytes(0x01); + // Composition time: 0 (3bytes little-endian int24) + stream.write_3bytes(0x000000); + // H.264 raw data (5 bytes of dummy video data) + for (int i = 0; i < 5; i++) { + stream.write_1bytes(0x00); + } + + // Feed video to source. + SrsLiveSource *source = mock_source; + HELPER_EXPECT_SUCCESS(source->on_video(msg)); + + // Wait for consumer to process the message. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + + // Verify the mock response writer received the video data + EXPECT_EQ(132, mock_writer->io.out_buffer.length()); + } + + // Simulate client quit event, the receive thread will get this error. + // Note that we should not sleep here, because we need to use cond to get the signal. + // If we sleep, the signal will be lost. + if (true) { + mock_io->read_error_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit"); + mock_io->cond_->signal(); + } + + // Wait fr or coroutine to stop + cond->wait(); + EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(r0)); + srs_freep(r0); +} diff --git a/trunk/src/utest/srs_utest_mock.cpp b/trunk/src/utest/srs_utest_mock.cpp index 7d161ce22..3fdd89b9f 100644 --- a/trunk/src/utest/srs_utest_mock.cpp +++ b/trunk/src/utest/srs_utest_mock.cpp @@ -361,8 +361,8 @@ void MockRtcSourceManager::reset() fetch_or_create_count_ = 0; } -// MockRtcStatistic implementation -MockRtcStatistic::MockRtcStatistic() +// MockAppStatistic implementation +MockAppStatistic::MockAppStatistic() { on_client_error_ = srs_success; on_client_count_ = 0; @@ -373,17 +373,17 @@ MockRtcStatistic::MockRtcStatistic() last_client_type_ = SrsRtmpConnUnknown; } -MockRtcStatistic::~MockRtcStatistic() +MockAppStatistic::~MockAppStatistic() { srs_freep(on_client_error_); } -void MockRtcStatistic::on_disconnect(std::string id, srs_error_t err) +void MockAppStatistic::on_disconnect(std::string id, srs_error_t err) { on_disconnect_count_++; } -srs_error_t MockRtcStatistic::on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type) +srs_error_t MockAppStatistic::on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type) { on_client_count_++; last_client_id_ = id; @@ -393,99 +393,99 @@ srs_error_t MockRtcStatistic::on_client(std::string id, ISrsRequest *req, ISrsEx return srs_error_copy(on_client_error_); } -srs_error_t MockRtcStatistic::on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height) +srs_error_t MockAppStatistic::on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height) { return srs_success; } -srs_error_t MockRtcStatistic::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object) +srs_error_t MockAppStatistic::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object) { return srs_success; } -void MockRtcStatistic::on_stream_publish(ISrsRequest *req, std::string publisher_id) +void MockAppStatistic::on_stream_publish(ISrsRequest *req, std::string publisher_id) { } -void MockRtcStatistic::on_stream_close(ISrsRequest *req) +void MockAppStatistic::on_stream_close(ISrsRequest *req) { } -void MockRtcStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta) +void MockAppStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta) { } -void MockRtcStatistic::kbps_sample() +void MockAppStatistic::kbps_sample() { } -srs_error_t MockRtcStatistic::on_video_frames(ISrsRequest *req, int nb_frames) +srs_error_t MockAppStatistic::on_video_frames(ISrsRequest *req, int nb_frames) { return srs_success; } -std::string MockRtcStatistic::server_id() +std::string MockAppStatistic::server_id() { return ""; } -std::string MockRtcStatistic::service_id() +std::string MockAppStatistic::service_id() { return ""; } -std::string MockRtcStatistic::service_pid() +std::string MockAppStatistic::service_pid() { return ""; } -SrsStatisticVhost *MockRtcStatistic::find_vhost_by_id(std::string vid) +SrsStatisticVhost *MockAppStatistic::find_vhost_by_id(std::string vid) { return NULL; } -SrsStatisticStream *MockRtcStatistic::find_stream(std::string sid) +SrsStatisticStream *MockAppStatistic::find_stream(std::string sid) { return NULL; } -SrsStatisticStream *MockRtcStatistic::find_stream_by_url(std::string url) +SrsStatisticStream *MockAppStatistic::find_stream_by_url(std::string url) { return NULL; } -SrsStatisticClient *MockRtcStatistic::find_client(std::string client_id) +SrsStatisticClient *MockAppStatistic::find_client(std::string client_id) { return NULL; } -srs_error_t MockRtcStatistic::dumps_vhosts(SrsJsonArray *arr) +srs_error_t MockAppStatistic::dumps_vhosts(SrsJsonArray *arr) { return srs_success; } -srs_error_t MockRtcStatistic::dumps_streams(SrsJsonArray *arr, int start, int count) +srs_error_t MockAppStatistic::dumps_streams(SrsJsonArray *arr, int start, int count) { return srs_success; } -srs_error_t MockRtcStatistic::dumps_clients(SrsJsonArray *arr, int start, int count) +srs_error_t MockAppStatistic::dumps_clients(SrsJsonArray *arr, int start, int count) { return srs_success; } -srs_error_t MockRtcStatistic::dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs) +srs_error_t MockAppStatistic::dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs) { return srs_success; } -void MockRtcStatistic::set_on_client_error(srs_error_t err) +void MockAppStatistic::set_on_client_error(srs_error_t err) { srs_freep(on_client_error_); on_client_error_ = srs_error_copy(err); } -void MockRtcStatistic::reset() +void MockAppStatistic::reset() { srs_freep(on_client_error_); on_client_error_ = srs_success; @@ -1185,6 +1185,7 @@ MockLiveSource::MockLiveSource() can_publish_result_ = true; on_audio_count_ = 0; on_video_count_ = 0; + on_dump_packets_count_ = 0; } MockLiveSource::~MockLiveSource() @@ -1213,6 +1214,12 @@ srs_error_t MockLiveSource::on_edge_start_publish() return srs_success; } +srs_error_t MockLiveSource::consumer_dumps(ISrsLiveConsumer *consumer, bool ds, bool dm, bool dg) +{ + on_dump_packets_count_++; + return SrsLiveSource::consumer_dumps(consumer, ds, dm, dg); +} + srs_error_t MockLiveSource::on_audio(SrsRtmpCommonMessage *audio) { on_audio_count_++; @@ -1526,12 +1533,24 @@ srs_error_t MockRtmpServer::recv_message(SrsRtmpCommonMessage **pmsg) cond_->wait(); } - if (!recv_msgs_.empty()) { - *pmsg = recv_msgs_.front(); - recv_msgs_.erase(recv_msgs_.begin()); + if (recv_err_ != srs_success) { + return srs_error_copy(recv_err_); } - return srs_error_copy(recv_err_); + if (recv_msgs_.empty()) { + return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message"); + } + + SrsRtmpCommonMessage *msg = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + if (pmsg) { + *pmsg = msg; + } else { + srs_freep(msg); + } + + return srs_success; } void MockRtmpServer::set_merge_read(bool v, IMergeReadHandler *handler) @@ -1570,10 +1589,18 @@ MockProtocolReadWriter::MockProtocolReadWriter() { recv_timeout_ = SRS_UTIME_NO_TIMEOUT; send_timeout_ = SRS_UTIME_NO_TIMEOUT; + read_count_ = 0; + recv_bytes_ = 0; + + read_error_ = srs_success; + cond_ = new SrsCond(); } MockProtocolReadWriter::~MockProtocolReadWriter() { + srs_freep(read_error_); + srs_freep(cond_); + recv_msgs_.clear(); } srs_error_t MockProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t *nread) @@ -1583,6 +1610,32 @@ srs_error_t MockProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t * srs_error_t MockProtocolReadWriter::read(void *buf, size_t size, ssize_t *nread) { + // No message received during playing util get control event. + if (recv_msgs_.empty()) { + cond_->wait(); + } + + read_count_++; + + if (read_error_ != srs_success) { + return srs_error_copy(read_error_); + } + + if (recv_msgs_.empty()) { + return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message"); + } + + string test_data_ = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + // Simulate reading data + size_t copy_size = srs_min(size, test_data_.size()); + memcpy(buf, test_data_.c_str(), copy_size); + if (nread) { + *nread = copy_size; + } + recv_bytes_ += copy_size; + return srs_success; } @@ -1726,11 +1779,19 @@ MockSslConnection::MockSslConnection() send_timeout_ = SRS_UTIME_NO_TIMEOUT; recv_bytes_ = 0; send_bytes_ = 0; + read_count_ = 0; + + read_error_ = srs_success; + cond_ = new SrsCond(); } MockSslConnection::~MockSslConnection() { srs_freep(handshake_error_); + + srs_freep(read_error_); + srs_freep(cond_); + recv_msgs_.clear(); } srs_error_t MockSslConnection::handshake(std::string key_file, std::string crt_file) @@ -1766,7 +1827,33 @@ int64_t MockSslConnection::get_send_bytes() srs_error_t MockSslConnection::read(void *buf, size_t size, ssize_t *nread) { - return srs_error_new(ERROR_NOT_SUPPORTED, "mock ssl read"); + // No message received during playing util get control event. + if (recv_msgs_.empty()) { + cond_->wait(); + } + + read_count_++; + + if (read_error_ != srs_success) { + return srs_error_copy(read_error_); + } + + if (recv_msgs_.empty()) { + return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message"); + } + + string test_data_ = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + // Simulate reading data + size_t copy_size = srs_min(size, test_data_.size()); + memcpy(buf, test_data_.c_str(), copy_size); + if (nread) { + *nread = copy_size; + } + recv_bytes_ += copy_size; + + return srs_success; } void MockSslConnection::set_send_timeout(srs_utime_t tm) @@ -1838,18 +1925,26 @@ srs_error_t MockSrtConnection::read(void *buf, size_t size, ssize_t *nread) read_count_++; - if (!recv_msgs_.empty()) { - string test_data_ = recv_msgs_.front(); - recv_msgs_.erase(recv_msgs_.begin()); - - // Simulate reading data - size_t copy_size = srs_min(size, test_data_.size()); - memcpy(buf, test_data_.c_str(), copy_size); - *nread = copy_size; - recv_bytes_ += copy_size; + if (read_error_ != srs_success) { + return srs_error_copy(read_error_); } - return srs_error_copy(read_error_); + if (recv_msgs_.empty()) { + return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message"); + } + + string test_data_ = recv_msgs_.front(); + recv_msgs_.erase(recv_msgs_.begin()); + + // Simulate reading data + size_t copy_size = srs_min(size, test_data_.size()); + memcpy(buf, test_data_.c_str(), copy_size); + if (nread) { + *nread = copy_size; + } + recv_bytes_ += copy_size; + + return srs_success; } srs_error_t MockSrtConnection::read_fully(void *buf, size_t size, ssize_t *nread) @@ -1962,7 +2057,12 @@ srs_error_t MockHttpParser::parse_message(ISrsReader *reader, ISrsHttpMessage ** ISrsHttpMessage *msg = messages_.front(); messages_.erase(messages_.begin()); - *ppmsg = msg; + if (ppmsg) { + *ppmsg = msg; + } else { + srs_freep(msg); + } + return srs_success; } @@ -2132,3 +2232,191 @@ srs_error_t MockHttpServeMux::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMess serve_http_count_++; return srs_success; } + +// Mock request implementation for SrsBufferCache testing +MockRequest::MockRequest(std::string vhost, std::string app, std::string stream) +{ + vhost_ = vhost; + app_ = app; + stream_ = stream; + host_ = "127.0.0.1"; + port_ = 1935; + tcUrl_ = "rtmp://127.0.0.1/" + app; + schema_ = "rtmp"; + param_ = ""; + duration_ = 0; + args_ = NULL; + protocol_ = "rtmp"; + objectEncoding_ = 0; +} + +MockRequest::~MockRequest() +{ +} + +ISrsRequest *MockRequest::copy() +{ + MockRequest *req = new MockRequest(vhost_, app_, stream_); + req->host_ = host_; + req->port_ = port_; + req->tcUrl_ = tcUrl_; + req->pageUrl_ = pageUrl_; + req->swfUrl_ = swfUrl_; + req->schema_ = schema_; + req->param_ = param_; + req->duration_ = duration_; + req->protocol_ = protocol_; + req->objectEncoding_ = objectEncoding_; + req->ip_ = ip_; + return req; +} + +std::string MockRequest::get_stream_url() +{ + if (vhost_ == "__defaultVhost__" || vhost_.empty()) { + return "/" + app_ + "/" + stream_; + } else { + return vhost_ + "/" + app_ + "/" + stream_; + } +} + +void MockRequest::update_auth(ISrsRequest *req) +{ + if (req) { + pageUrl_ = req->pageUrl_; + swfUrl_ = req->swfUrl_; + tcUrl_ = req->tcUrl_; + } +} + +void MockRequest::strip() +{ + // Mock implementation - basic string cleanup + host_ = srs_strings_remove(host_, "/ \n\r\t"); + vhost_ = srs_strings_remove(vhost_, "/ \n\r\t"); + app_ = srs_strings_remove(app_, " \n\r\t"); + stream_ = srs_strings_remove(stream_, " \n\r\t"); + + app_ = srs_strings_trim_end(app_, "/"); + stream_ = srs_strings_trim_end(stream_, "/"); +} + +ISrsRequest *MockRequest::as_http() +{ + return copy(); +} + +MockBufferCache::MockBufferCache() +{ + dump_cache_count_ = 0; + last_consumer_ = NULL; + last_jitter_ = SrsRtmpJitterAlgorithmOFF; +} + +MockBufferCache::~MockBufferCache() +{ +} + +srs_error_t MockBufferCache::start() +{ + return srs_success; +} + +void MockBufferCache::stop() +{ +} + +bool MockBufferCache::alive() +{ + return true; +} + +srs_error_t MockBufferCache::dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter) +{ + dump_cache_count_++; + last_consumer_ = consumer; + last_jitter_ = jitter; + return srs_success; +} + +srs_error_t MockBufferCache::update_auth(ISrsRequest *r) +{ + return srs_success; +} + +// Mock HTTP hooks implementation +MockHttpHooks::MockHttpHooks() +{ + on_stop_count_ = 0; + on_unpublish_count_ = 0; +} + +MockHttpHooks::~MockHttpHooks() +{ + clear_calls(); +} + +srs_error_t MockHttpHooks::on_connect(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooks::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) +{ +} + +srs_error_t MockHttpHooks::on_publish(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooks::on_unpublish(std::string url, ISrsRequest *req) +{ + on_unpublish_count_++; + on_unpublish_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooks::on_play(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooks::on_stop(std::string url, ISrsRequest *req) +{ + on_stop_count_++; + on_stop_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooks::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) +{ + return srs_success; +} + +srs_error_t MockHttpHooks::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) +{ + return srs_success; +} + +srs_error_t MockHttpHooks::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) +{ + return srs_success; +} + +srs_error_t MockHttpHooks::discover_co_workers(std::string url, std::string &host, int &port) +{ + return srs_success; +} + +srs_error_t MockHttpHooks::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) +{ + return srs_success; +} + +void MockHttpHooks::clear_calls() +{ + on_stop_calls_.clear(); + on_stop_count_ = 0; + on_unpublish_calls_.clear(); + on_unpublish_count_ = 0; +} diff --git a/trunk/src/utest/srs_utest_mock.hpp b/trunk/src/utest/srs_utest_mock.hpp index df73700e0..521f35e30 100644 --- a/trunk/src/utest/srs_utest_mock.hpp +++ b/trunk/src/utest/srs_utest_mock.hpp @@ -35,15 +35,17 @@ #ifdef SRS_GB28181 #include #endif +#include +#include +#include #include #include #include #include #include +#include #include #include -#include -#include // Forward declarations class SrsRtcTrackDescription; @@ -171,7 +173,7 @@ public: }; // Mock statistic for testing -class MockRtcStatistic : public ISrsStatistic +class MockAppStatistic : public ISrsStatistic { public: srs_error_t on_client_error_; @@ -183,8 +185,8 @@ public: SrsRtmpConnType last_client_type_; public: - MockRtcStatistic(); - virtual ~MockRtcStatistic(); + MockAppStatistic(); + virtual ~MockAppStatistic(); virtual void on_disconnect(std::string id, srs_error_t err); virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type); virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height); @@ -629,6 +631,7 @@ public: bool can_publish_result_; int on_audio_count_; int on_video_count_; + int on_dump_packets_count_; public: MockLiveSource(); @@ -637,6 +640,7 @@ public: void set_can_publish(bool can_publish); virtual srs_error_t on_publish(); virtual srs_error_t on_edge_start_publish(); + virtual srs_error_t consumer_dumps(ISrsLiveConsumer *consumer, bool ds, bool dm, bool dg); public: virtual srs_error_t on_audio(SrsRtmpCommonMessage *audio); @@ -777,6 +781,13 @@ class MockProtocolReadWriter : public ISrsProtocolReadWriter public: srs_utime_t recv_timeout_; srs_utime_t send_timeout_; + int64_t recv_bytes_; + int read_count_; + +public: + srs_error_t read_error_; + std::vector recv_msgs_; + SrsCond *cond_; public: MockProtocolReadWriter(); @@ -836,6 +847,12 @@ public: srs_utime_t send_timeout_; int64_t recv_bytes_; int64_t send_bytes_; + int read_count_; + +public: + srs_error_t read_error_; + std::vector recv_msgs_; + SrsCond *cond_; public: MockSslConnection(); @@ -999,4 +1016,62 @@ public: virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); }; +// Mock request class for testing SrsBufferCache +class MockRequest : public ISrsRequest +{ +public: + MockRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test"); + virtual ~MockRequest(); + virtual ISrsRequest *copy(); + virtual std::string get_stream_url(); + virtual void update_auth(ISrsRequest *req); + virtual void strip(); + virtual ISrsRequest *as_http(); +}; + +// Mock buffer cache for testing AAC stream encoder +class MockBufferCache : public ISrsBufferCache +{ +public: + int dump_cache_count_; + ISrsLiveConsumer *last_consumer_; + SrsRtmpJitterAlgorithm last_jitter_; + +public: + MockBufferCache(); + virtual ~MockBufferCache(); + virtual srs_error_t start(); + virtual void stop(); + virtual bool alive(); + virtual srs_error_t dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter); + virtual srs_error_t update_auth(ISrsRequest *r); +}; + +// Mock HTTP hooks for testing SrsRtcAsyncCallOnStop +class MockHttpHooks : public ISrsHttpHooks +{ +public: + std::vector > on_stop_calls_; + int on_stop_count_; + std::vector > on_unpublish_calls_; + int on_unpublish_count_; + +public: + MockHttpHooks(); + virtual ~MockHttpHooks(); + virtual srs_error_t on_connect(std::string url, ISrsRequest *req); + virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); + virtual srs_error_t on_publish(std::string url, ISrsRequest *req); + virtual void on_unpublish(std::string url, ISrsRequest *req); + virtual srs_error_t on_play(std::string url, ISrsRequest *req); + virtual void on_stop(std::string url, ISrsRequest *req); + virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); + virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); + virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); + virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); + virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); + void clear_calls(); +}; + #endif diff --git a/trunk/src/utest/srs_utest_rtc_playstream.cpp b/trunk/src/utest/srs_utest_rtc_playstream.cpp index df10a4719..75dd079f1 100644 --- a/trunk/src/utest/srs_utest_rtc_playstream.cpp +++ b/trunk/src/utest/srs_utest_rtc_playstream.cpp @@ -23,7 +23,7 @@ VOID TEST(RtcPlayStreamTest, ManuallyVerifyBasicWorkflow) // Create mock objects for dependencies MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_exec; MockExpire mock_expire; diff --git a/trunk/src/utest/srs_utest_rtc_publishstream.cpp b/trunk/src/utest/srs_utest_rtc_publishstream.cpp index f68692de9..5fb771fb3 100644 --- a/trunk/src/utest/srs_utest_rtc_publishstream.cpp +++ b/trunk/src/utest/srs_utest_rtc_publishstream.cpp @@ -39,7 +39,7 @@ VOID TEST(RtcPublishStreamTest, ManuallyVerifyBasicWorkflow) // Create mock objects for dependencies MockAppConfig mock_config; MockRtcSourceManager mock_rtc_sources; - MockRtcStatistic mock_stat; + MockAppStatistic mock_stat; MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1"); MockRtcAsyncTaskExecutor mock_exec; MockExpire mock_expire; diff --git a/trunk/src/utest/srs_utest_rtmp_conn.cpp b/trunk/src/utest/srs_utest_rtmp_conn.cpp index ba7401395..4e0396812 100644 --- a/trunk/src/utest/srs_utest_rtmp_conn.cpp +++ b/trunk/src/utest/srs_utest_rtmp_conn.cpp @@ -50,7 +50,7 @@ VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPublisher) SrsUniquePtr mock_manager(new MockConnectionManager()); SrsUniquePtr mock_sources(new MockLiveSourceManager()); SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); - SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_stat(new MockAppStatistic()); SrsUniquePtr mock_hooks(new MockHttpHooks()); SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); SrsUniquePtr mock_srt_sources(new MockSrtSourceManager()); @@ -222,7 +222,7 @@ VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPlayer) SrsUniquePtr mock_manager(new MockConnectionManager()); SrsUniquePtr mock_sources(new MockLiveSourceManager()); SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); - SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_stat(new MockAppStatistic()); SrsUniquePtr mock_hooks(new MockHttpHooks()); SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); SrsUniquePtr mock_srt_sources(new MockSrtSourceManager()); diff --git a/trunk/src/utest/srs_utest_srt_conn.cpp b/trunk/src/utest/srs_utest_srt_conn.cpp index 54bd67bf6..55662d0e6 100644 --- a/trunk/src/utest/srs_utest_srt_conn.cpp +++ b/trunk/src/utest/srs_utest_srt_conn.cpp @@ -47,7 +47,7 @@ VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPublisher) SrsUniquePtr mock_manager(new MockConnectionManager()); SrsUniquePtr mock_sources(new MockLiveSourceManager()); SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); - SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_stat(new MockAppStatistic()); SrsUniquePtr mock_hooks(new MockHttpHooks()); SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); SrsUniquePtr mock_srt_sources(new MockSrtSourceManager()); @@ -147,7 +147,7 @@ VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPlayer) SrsUniquePtr mock_manager(new MockConnectionManager()); SrsUniquePtr mock_sources(new MockLiveSourceManager()); SrsUniquePtr mock_tokens(new MockStreamPublishTokenManager()); - SrsUniquePtr mock_stat(new MockRtcStatistic()); + SrsUniquePtr mock_stat(new MockAppStatistic()); SrsUniquePtr mock_hooks(new MockHttpHooks()); SrsUniquePtr mock_rtc_sources(new MockRtcSourceManager()); SrsUniquePtr mock_srt_sources(new MockSrtSourceManager());