AI: Add workflow utest for http stream.

This commit is contained in:
OSSRS-AI 2025-10-19 19:37:00 -04:00 committed by winlin
parent ce7ac11eae
commit 341c0c000c
16 changed files with 650 additions and 364 deletions

View File

@ -32,9 +32,9 @@
#include <srs_kernel_ts.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_protocol_http_conn.hpp>
#include <srs_protocol_st.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_http_conn.hpp>
ISrsAppFactory::ISrsAppFactory()
{

View File

@ -15,6 +15,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_http_static.hpp>
@ -40,7 +41,6 @@ using namespace std;
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_stream.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_factory.hpp>
ISrsHttpConnOwner::ISrsHttpConnOwner()
{

View File

@ -681,24 +681,28 @@ srs_error_t SrsLiveStream::serve_http_impl(ISrsHttpResponseWriter *w, ISrsHttpMe
// Note that we should enable stat for HTTP streaming client, because each HTTP streaming connection is a real
// session that should have statistics for itself.
srs_assert(hxc);
hxc->set_enable_stat(true);
// Create a distinct request for this request.
SrsUniquePtr<ISrsRequest> req(req_->copy()->as_http());
// Correct the app and stream by path, which is created from template.
// @remark Be careful that the stream has extension now, might cause identify fail.
SrsPath path;
req_->stream_ = path.filepath_base(r->path());
req->stream_ = path.filepath_base(r->path());
// remove the extension of stream if have. for instance, test.flv -> test
req_->stream_ = path.filepath_filename(req_->stream_);
req->stream_ = path.filepath_filename(req->stream_);
// update client ip
req_->ip_ = hc->remote_ip();
req->ip_ = hc->remote_ip();
// We must do stat the client before hooks, because hooks depends on it.
if ((err = stat_->on_client(_srs_context->get_id().c_str(), req_, hc, SrsFlvPlay)) != srs_success) {
if ((err = stat_->on_client(_srs_context->get_id().c_str(), req.get(), hc, SrsFlvPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
if ((err = security_->check(SrsFlvPlay, req_->ip_, req_)) != srs_success) {
if ((err = security_->check(SrsFlvPlay, req->ip_, req.get())) != srs_success) {
return srs_error_wrap(err, "flv: security check");
}
@ -714,13 +718,13 @@ srs_error_t SrsLiveStream::serve_http_impl(ISrsHttpResponseWriter *w, ISrsHttpMe
// Always try to create the source, because http handler won't create it.
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = live_sources_->fetch_or_create(req_, live_source)) != srs_success) {
if ((err = live_sources_->fetch_or_create(req.get(), live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(live_source.get() != NULL);
bool enabled_cache = config_->get_gop_cache(req_->vhost_);
int gcmf = config_->get_gop_cache_max_frames(req_->vhost_);
bool enabled_cache = config_->get_gop_cache(req->vhost_);
int gcmf = config_->get_gop_cache_max_frames(req->vhost_);
live_source->set_cache(enabled_cache);
live_source->set_gop_cache_max_frames(gcmf);

View File

@ -260,7 +260,7 @@ public:
// For example:
// SrsCoroutineChan ctx;
// ctx.push(1);
// SRS_COROUTINE_GO_CTX(ctx, {
// SRS_COROUTINE_GO_CTX(&ctx, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });
@ -272,11 +272,11 @@ public:
// For example:
// SrsCoroutineChan ctx;
// ctx.push(1);
// SRS_COROUTINE_GO_CTX2(ctx, coroutine1, {
// SRS_COROUTINE_GO_CTX2(&ctx, coroutine1, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });
// SRS_COROUTINE_GO_CTX2(ctx, coroutine2, {
// SRS_COROUTINE_GO_CTX2(&ctx, coroutine2, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });

View File

@ -2253,83 +2253,6 @@ VOID TEST(RtcPliWorkerTest, ErrorHandling)
EXPECT_TRUE(mock_handler.has_keyframe_request(ssrc2, cid2));
}
// Mock HTTP hooks implementation
MockHttpHooks::MockHttpHooks()
{
on_stop_count_ = 0;
on_unpublish_count_ = 0;
}
MockHttpHooks::~MockHttpHooks()
{
clear_calls();
}
srs_error_t MockHttpHooks::on_connect(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes)
{
}
srs_error_t MockHttpHooks::on_publish(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_unpublish(std::string url, ISrsRequest *req)
{
on_unpublish_count_++;
on_unpublish_calls_.push_back(std::make_pair(url, req));
}
srs_error_t MockHttpHooks::on_play(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_stop(std::string url, ISrsRequest *req)
{
on_stop_count_++;
on_stop_calls_.push_back(std::make_pair(url, req));
}
srs_error_t MockHttpHooks::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify)
{
return srs_success;
}
srs_error_t MockHttpHooks::discover_co_workers(std::string url, std::string &host, int &port)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls)
{
return srs_success;
}
void MockHttpHooks::clear_calls()
{
on_stop_calls_.clear();
on_stop_count_ = 0;
on_unpublish_calls_.clear();
on_unpublish_count_ = 0;
}
// Mock context implementation
MockContext::MockContext()
{
@ -2565,7 +2488,7 @@ VOID TEST(RtcPlayStreamTest, InitializeSuccess)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_async_executor;
MockExpire mock_expire;
@ -2639,7 +2562,7 @@ VOID TEST(RtcPlayStreamTest, OnStreamChangeSuccess)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_async_executor;
MockExpire mock_expire;
@ -2818,7 +2741,7 @@ VOID TEST(RtcPlayStreamTest, SendPacketBasic)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_async_executor;
MockExpire mock_expire;
@ -3045,7 +2968,7 @@ VOID TEST(RtcPlayStreamTest, OnRtcpDispatch)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_source_manager;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncTaskExecutor mock_executor;
MockRtcPacketSender mock_sender;
MockExpire mock_expire;
@ -3103,7 +3026,7 @@ VOID TEST(RtcPlayStreamTest, OnRtcpNack)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_source_manager;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncTaskExecutor mock_executor;
MockRtcPacketSender mock_sender;
MockExpire mock_expire;
@ -3290,7 +3213,7 @@ VOID TEST(RtcPlayStreamTest, DoRequestKeyframe)
// Create mock objects
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_async_executor;
MockExpire mock_expire;
@ -3818,7 +3741,7 @@ VOID TEST(RtcPublishStreamTest, Initialize)
srs_error_t err;
// Create mock objects
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockLiveSourceManager mock_live_sources;

View File

@ -265,33 +265,6 @@ public:
int get_keyframe_request_count();
};
// Mock HTTP hooks for testing SrsRtcAsyncCallOnStop
class MockHttpHooks : public ISrsHttpHooks
{
public:
std::vector<std::pair<std::string, ISrsRequest *> > on_stop_calls_;
int on_stop_count_;
std::vector<std::pair<std::string, ISrsRequest *> > on_unpublish_calls_;
int on_unpublish_count_;
public:
MockHttpHooks();
virtual ~MockHttpHooks();
virtual srs_error_t on_connect(std::string url, ISrsRequest *req);
virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes);
virtual srs_error_t on_publish(std::string url, ISrsRequest *req);
virtual void on_unpublish(std::string url, ISrsRequest *req);
virtual srs_error_t on_play(std::string url, ISrsRequest *req);
virtual void on_stop(std::string url, ISrsRequest *req);
virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file);
virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration);
virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify);
virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port);
virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls);
void clear_calls();
};
// Mock context for testing SrsRtcAsyncCallOnStop
class MockContext : public ISrsContext
{

View File

@ -1777,7 +1777,7 @@ VOID TEST(AppOriginHubTest, OnAudioTypicalScenario)
MockHlsRequest mock_req;
// Create mock statistic
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
// Create origin hub
SrsUniquePtr<SrsOriginHub> hub(new SrsOriginHub());

View File

@ -34,44 +34,6 @@ extern srs_error_t _srs_reload_err;
extern SrsReloadState _srs_reload_state;
extern std::string _srs_reload_id;
MockBufferCacheForAac::MockBufferCacheForAac()
{
dump_cache_count_ = 0;
last_consumer_ = NULL;
last_jitter_ = SrsRtmpJitterAlgorithmOFF;
}
MockBufferCacheForAac::~MockBufferCacheForAac()
{
}
srs_error_t MockBufferCacheForAac::start()
{
return srs_success;
}
void MockBufferCacheForAac::stop()
{
}
bool MockBufferCacheForAac::alive()
{
return true;
}
srs_error_t MockBufferCacheForAac::dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter)
{
dump_cache_count_++;
last_consumer_ = consumer;
last_jitter_ = jitter;
return srs_success;
}
srs_error_t MockBufferCacheForAac::update_auth(ISrsRequest *r)
{
return srs_success;
}
VOID TEST(KernelBalanceTest, RoundRobinBasicSelection)
{
// Test the major use scenario: round-robin selection across multiple servers
@ -117,79 +79,6 @@ VOID TEST(KernelBalanceTest, RoundRobinBasicSelection)
EXPECT_EQ(2, (int)lb->current());
}
// Mock request implementation for SrsBufferCache testing
MockBufferCacheRequest::MockBufferCacheRequest(std::string vhost, std::string app, std::string stream)
{
vhost_ = vhost;
app_ = app;
stream_ = stream;
host_ = "127.0.0.1";
port_ = 1935;
tcUrl_ = "rtmp://127.0.0.1/" + app;
schema_ = "rtmp";
param_ = "";
duration_ = 0;
args_ = NULL;
protocol_ = "rtmp";
objectEncoding_ = 0;
}
MockBufferCacheRequest::~MockBufferCacheRequest()
{
}
ISrsRequest *MockBufferCacheRequest::copy()
{
MockBufferCacheRequest *req = new MockBufferCacheRequest(vhost_, app_, stream_);
req->host_ = host_;
req->port_ = port_;
req->tcUrl_ = tcUrl_;
req->pageUrl_ = pageUrl_;
req->swfUrl_ = swfUrl_;
req->schema_ = schema_;
req->param_ = param_;
req->duration_ = duration_;
req->protocol_ = protocol_;
req->objectEncoding_ = objectEncoding_;
req->ip_ = ip_;
return req;
}
std::string MockBufferCacheRequest::get_stream_url()
{
if (vhost_ == "__defaultVhost__" || vhost_.empty()) {
return "/" + app_ + "/" + stream_;
} else {
return vhost_ + "/" + app_ + "/" + stream_;
}
}
void MockBufferCacheRequest::update_auth(ISrsRequest *req)
{
if (req) {
pageUrl_ = req->pageUrl_;
swfUrl_ = req->swfUrl_;
tcUrl_ = req->tcUrl_;
}
}
void MockBufferCacheRequest::strip()
{
// Mock implementation - basic string cleanup
host_ = srs_strings_remove(host_, "/ \n\r\t");
vhost_ = srs_strings_remove(vhost_, "/ \n\r\t");
app_ = srs_strings_remove(app_, " \n\r\t");
stream_ = srs_strings_remove(stream_, " \n\r\t");
app_ = srs_strings_trim_end(app_, "/");
stream_ = srs_strings_trim_end(stream_, "/");
}
ISrsRequest *MockBufferCacheRequest::as_http()
{
return copy();
}
VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth)
{
srs_error_t err;
@ -198,7 +87,7 @@ VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth)
// This covers the typical HTTP streaming cache initialization use case
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
mock_request->pageUrl_ = "http://example.com/page";
mock_request->swfUrl_ = "http://example.com/player.swf";
mock_request->tcUrl_ = "rtmp://127.0.0.1/live";
@ -224,7 +113,7 @@ VOID TEST(SrsBufferCacheTest, ConstructorAndUpdateAuth)
EXPECT_TRUE(cache->live_sources_ != NULL);
// Test update_auth - should update the request with new auth info
SrsUniquePtr<MockBufferCacheRequest> new_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> new_request(new MockRequest("test.vhost", "live", "stream1"));
new_request->pageUrl_ = "http://example.com/new_page";
new_request->swfUrl_ = "http://example.com/new_player.swf";
new_request->tcUrl_ = "rtmp://127.0.0.1/live_new";
@ -248,7 +137,7 @@ VOID TEST(SrsBufferCacheTest, DumpCacheWithMessages)
// This covers the typical HTTP streaming cache dump use case
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create buffer cache
SrsUniquePtr<SrsBufferCache> cache(new SrsBufferCache(mock_request.get()));
@ -658,7 +547,7 @@ VOID TEST(AppHttpStreamTest, AacStreamEncoderMajorScenario)
SrsUniquePtr<MockSrsFileWriter> writer(new MockSrsFileWriter());
HELPER_EXPECT_SUCCESS(writer->open("test.aac"));
MockBufferCacheForAac mock_cache;
MockBufferCache mock_cache;
// Create AAC stream encoder
SrsUniquePtr<SrsAacStreamEncoder> encoder(new SrsAacStreamEncoder());
@ -836,10 +725,10 @@ VOID TEST(SrsLiveStreamTest, ServeHttpWithDisabledEntry)
// security check and HTTP hooks
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create mock buffer cache
SrsUniquePtr<MockBufferCacheForAac> mock_cache(new MockBufferCacheForAac());
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
// Create SrsLiveStream
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
@ -1116,10 +1005,10 @@ VOID TEST(SrsLiveStreamTest, HttpHooksOnPlayAndStop)
// This covers the typical HTTP-FLV/HLS streaming hook notification use case
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create mock buffer cache
SrsUniquePtr<MockBufferCacheForAac> mock_cache(new MockBufferCacheForAac());
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
// Create SrsLiveStream
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
@ -1392,7 +1281,7 @@ VOID TEST(SrsHttpStreamServerTest, HttpMountAndUnmount)
server->templateHandlers_[vhost] = tmpl;
// Create mock request for stream
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest(vhost, "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest(vhost, "live", "stream1"));
// Test http_mount - should create stream entry from template
HELPER_EXPECT_SUCCESS(server->http_mount(mock_request.get()));
@ -1947,8 +1836,8 @@ VOID TEST(SrsLiveStreamTest, StreamingSendMessagesWithMixedPackets)
// This covers the typical HTTP streaming workflow where encoder writes different packet types
// Create mock request and buffer cache
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockBufferCacheForAac> mock_cache(new MockBufferCacheForAac());
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
// Create SrsLiveStream
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
@ -2034,10 +1923,10 @@ VOID TEST(SrsLiveStreamTest, DoServeHttpFlvWithDisabledEntry)
// This covers the typical HTTP-FLV streaming initialization and immediate exit scenario
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create mock buffer cache
SrsUniquePtr<MockBufferCacheForAac> mock_cache(new MockBufferCacheForAac());
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
// Create SrsLiveStream
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
@ -2080,10 +1969,10 @@ VOID TEST(SrsLiveStreamTest, AliveAndExpireWithViewers)
// where multiple viewers are watching the same stream and need to be expired
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create mock buffer cache
SrsUniquePtr<MockBufferCacheForAac> mock_cache(new MockBufferCacheForAac());
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
// Create SrsLiveStream
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
@ -2143,7 +2032,7 @@ VOID TEST(HttpStreamDestroyTest, DestroyStreamSuccess)
MockBufferCacheForDestroy *mock_cache = new MockBufferCacheForDestroy();
// Create mock request
MockBufferCacheRequest *mock_req = new MockBufferCacheRequest();
MockRequest *mock_req = new MockRequest();
entry->stream_ = mock_stream;
entry->cache_ = mock_cache;
@ -2180,7 +2069,7 @@ VOID TEST(SrsBufferCacheTest, StopAndAlive)
// This covers the typical HTTP streaming cache lifecycle management use case
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create buffer cache
SrsUniquePtr<SrsBufferCache> cache(new SrsBufferCache(mock_request.get()));
@ -2225,7 +2114,7 @@ VOID TEST(SrsBufferCacheTest, CycleWithThreadPullError)
// is interrupted or encounters an error, causing the cycle to exit
// Create mock request
SrsUniquePtr<MockBufferCacheRequest> mock_request(new MockBufferCacheRequest("test.vhost", "live", "stream1"));
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "stream1"));
// Create buffer cache
SrsUniquePtr<SrsBufferCache> cache(new SrsBufferCache(mock_request.get()));
@ -2266,7 +2155,7 @@ VOID TEST(AppHttpStreamTest, Mp3StreamEncoderMajorScenario)
SrsUniquePtr<MockSrsFileWriter> writer(new MockSrsFileWriter());
HELPER_EXPECT_SUCCESS(writer->open("test.mp3"));
MockBufferCacheForAac mock_cache;
MockBufferCache mock_cache;
// Create MP3 stream encoder
SrsUniquePtr<SrsMp3StreamEncoder> encoder(new SrsMp3StreamEncoder());
@ -3234,7 +3123,7 @@ VOID TEST(HTTPApiTest, ClientsApiGetSpecificClient)
test_client->type_ = SrsRtmpConnPlay;
// Create mock request for the client - SrsStatisticClient destructor will free this
MockBufferCacheRequest *mock_req = new MockBufferCacheRequest("__defaultVhost__", "live", "livestream");
MockRequest *mock_req = new MockRequest("__defaultVhost__", "live", "livestream");
test_client->req_ = mock_req;
// Create mock vhost and stream for the client

View File

@ -21,37 +21,6 @@
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_utest_ai11.hpp>
// Mock request class for testing SrsBufferCache
class MockBufferCacheRequest : public ISrsRequest
{
public:
MockBufferCacheRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test");
virtual ~MockBufferCacheRequest();
virtual ISrsRequest *copy();
virtual std::string get_stream_url();
virtual void update_auth(ISrsRequest *req);
virtual void strip();
virtual ISrsRequest *as_http();
};
// Mock buffer cache for testing AAC stream encoder
class MockBufferCacheForAac : public ISrsBufferCache
{
public:
int dump_cache_count_;
ISrsLiveConsumer *last_consumer_;
SrsRtmpJitterAlgorithm last_jitter_;
public:
MockBufferCacheForAac();
virtual ~MockBufferCacheForAac();
virtual srs_error_t start();
virtual void stop();
virtual bool alive();
virtual srs_error_t dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t update_auth(ISrsRequest *r);
};
// Mock SrsHttpxConn for testing SrsLiveStream - inherits from real SrsHttpxConn
class MockHttpxConnForLiveStream : public SrsHttpxConn
{

View File

@ -24,12 +24,14 @@
#include <srs_utest_http_conn.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_http_stream.hpp>
#include <srs_protocol_conn.hpp>
#include <srs_protocol_http_conn.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_protocol_st.hpp>
#include <srs_utest_ai13.hpp>
#include <srs_utest_ai15.hpp>
#include <srs_utest_ai16.hpp>
#include <srs_utest_ai19.hpp>
#include <srs_utest_http.hpp>
#include <srs_utest_mock.hpp>
@ -50,11 +52,6 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpRequest)
MockHttpParser *mock_parser = new MockHttpParser();
SrsUniquePtr<MockHttpServeMux> mock_http_mux(new MockHttpServeMux());
// Setup mock config
mock_config->default_vhost_ = new SrsConfDirective();
mock_config->default_vhost_->name_ = "vhost";
mock_config->default_vhost_->args_.push_back("__defaultVhost__");
// Create SrsHttpConn - it takes ownership of mock_io
SrsUniquePtr<SrsHttpConn> conn(new SrsHttpConn(mock_handler.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080));
@ -103,38 +100,30 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest)
MockHttpParser *mock_parser = new MockHttpParser();
SrsUniquePtr<MockHttpServeMux> mock_http_mux(new MockHttpServeMux());
MockSslConnection *mock_ssl = new MockSslConnection();
// Setup mock config
mock_config->default_vhost_ = new SrsConfDirective();
mock_config->default_vhost_->name_ = "vhost";
mock_config->default_vhost_->args_.push_back("__defaultVhost__");
// Create mock resource manager
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
// Create SrsHttpxConn - it takes ownership of mock_io
// Set key and cert to empty to disable SSL
SrsUniquePtr<SrsHttpxConn> conn(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", ""));
SrsUniquePtr<SrsHttpxConn> connx(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", ""));
// Inject mock dependencies into private fields
conn->config_ = mock_config.get();
connx->config_ = mock_config.get();
// Inject mock SSL connection
srs_freep(conn->ssl_);
conn->ssl_ = mock_ssl;
srs_freep(connx->ssl_);
connx->ssl_ = mock_ssl;
// Access the internal SrsHttpConn through conn_ field (cast from ISrsHttpConn* to SrsHttpConn*)
SrsHttpConn *http_conn = new SrsHttpConn(conn.get(), mock_ssl, mock_http_mux.get(), "192.168.1.100", 8080);
http_conn->config_ = mock_config.get();
http_conn->app_factory_ = mock_app_factory.get();
srs_freep(http_conn->parser_);
http_conn->parser_ = mock_parser;
SrsHttpConn *conn = new SrsHttpConn(connx.get(), mock_ssl, mock_http_mux.get(), "192.168.1.100", 8080);
conn->config_ = mock_config.get();
conn->app_factory_ = mock_app_factory.get();
srs_freep(conn->parser_);
conn->parser_ = mock_parser;
// Inject the mock SrsHttpConn into conn_ field
srs_freep(conn->conn_);
conn->conn_ = http_conn;
srs_freep(connx->conn_);
connx->conn_ = conn;
// Start the HTTPx connection
HELPER_EXPECT_SUCCESS(conn->start());
HELPER_EXPECT_SUCCESS(connx->start());
// Wait for coroutine to start
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
@ -160,3 +149,179 @@ VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpxRequest)
EXPECT_EQ(1, mock_http_mux->serve_http_count_);
}
// This test is used to verify the basic workflow of the HTTP FLV streaming.
// It's finished with the help of AI, but each step is manually designed
// and verified. So this is not dominated by AI, but by humanbeing.
VOID TEST(HttpConnTest, ManuallyVerifyBasicWorkflowForHttpStream)
{
srs_error_t err;
// Create SrsLiveStream object under test
SrsUniquePtr<MockRequest> mock_request(new MockRequest("test.vhost", "live", "livestream"));
SrsUniquePtr<MockBufferCache> mock_cache(new MockBufferCache());
SrsUniquePtr<SrsLiveStream> live_stream(new SrsLiveStream(mock_request.get(), mock_cache.get()));
// Create mock dependencies
SrsUniquePtr<MockAppConfig> mock_config(new MockAppConfig());
SrsUniquePtr<MockLiveSourceManager> mock_live_sources(new MockLiveSourceManager());
SrsUniquePtr<MockAppStatistic> mock_stat(new MockAppStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockResponseWriter> mock_writer(new MockResponseWriter());
SrsUniquePtr<MockHttpMessage> mock_message(new MockHttpMessage());
SrsUniquePtr<SrsHttpMuxEntry> mock_entry(new SrsHttpMuxEntry());
MockProtocolReadWriter *mock_io = new MockProtocolReadWriter();
SrsUniquePtr<MockHttpServeMux> mock_http_mux(new MockHttpServeMux());
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockAppFactory> mock_app_factory(new MockAppFactory());
MockHttpParser *mock_parser = new MockHttpParser();
SrsUniquePtr<SrsHttpxConn> connx(new SrsHttpxConn(mock_manager.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080, "", ""));
SrsHttpConn *conn = new SrsHttpConn(connx.get(), mock_io, mock_http_mux.get(), "192.168.1.100", 8080);
// Inject mock dependencies into SrsLiveStream private fields
live_stream->config_ = mock_config.get();
live_stream->live_sources_ = mock_live_sources.get();
live_stream->stat_ = mock_stat.get();
live_stream->hooks_ = mock_hooks.get();
// Do not wait for utest, consume messages immediately. Remove this when HTTP stream use cond signal.
mock_config->mw_sleep_ = 0;
mock_entry->enabled = true;
mock_entry->pattern = "/live/livestream.flv";
live_stream->entry_ = mock_entry.get();
connx->config_ = mock_config.get();
conn->config_ = mock_config.get();
conn->app_factory_ = mock_app_factory.get();
srs_freep(conn->parser_);
conn->parser_ = mock_parser;
srs_freep(connx->conn_);
connx->conn_ = conn;
mock_message->set_connection(conn);
// Start a coroutine to run the streaming, because it's a blocking operation
SrsCoroutineChan ctx;
ctx.push(live_stream.get());
ctx.push(mock_writer.get());
ctx.push(mock_message.get());
srs_error_t r0 = srs_success;
ctx.push(&r0);
SrsUniquePtr<SrsCond> cond(new SrsCond());
ctx.push(cond.get());
SRS_COROUTINE_GO_CTX(&ctx, {
SrsLiveStream *live_stream = (SrsLiveStream *)ctx.pop();
ISrsHttpResponseWriter *mock_writer = (ISrsHttpResponseWriter *)ctx.pop();
ISrsHttpMessage *mock_message = (ISrsHttpMessage *)ctx.pop();
srs_error_t *r0 = (srs_error_t *)ctx.pop();
SrsCond *cond = (SrsCond *)ctx.pop();
*r0 = live_stream->serve_http(mock_writer, mock_message);
cond->signal();
});
// Wait for coroutine to start
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that live source has a consumer
MockLiveSource *mock_source = dynamic_cast<MockLiveSource *>(mock_live_sources->mock_source_.get());
EXPECT_EQ(1, (int)mock_source->consumers_.size());
EXPECT_EQ(1, mock_source->on_dump_packets_count_);
// Feed SSL a message to cover the http recv thread.
if (true) {
mock_io->recv_msgs_.push_back("test data");
mock_io->cond_->signal();
// Wait for http thread to consume the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that the recv thread consumed the message.
EXPECT_EQ(1, mock_io->read_count_);
}
// Create an RTMP audio message to feed consumer.
if (true) {
// Create a real AAC audio message with proper format.
// AAC audio format in RTMP/FLV:
// Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType
// SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo)
// = 0xAF
// Byte 1: AACPacketType (0=sequence header, 1=raw data)
// Remaining bytes: AAC data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_audio(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in AAC audio data
SrsBuffer stream(msg->payload(), payload_size);
// Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF
stream.write_1bytes(0xAF);
// AAC packet type: 1 = AAC raw data
stream.write_1bytes(0x01);
// AAC raw data (8 bytes of dummy audio data)
for (int i = 0; i < 8; i++) {
stream.write_1bytes(0x00);
}
// Feed audio to source.
SrsLiveSource *source = mock_source;
HELPER_EXPECT_SUCCESS(source->on_audio(msg));
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify the mock response writer received the audio data
EXPECT_EQ(101, mock_writer->io.out_buffer.length());
}
// Create an RTMP video message to feed consumer.
if (true) {
// Create a real H.264 video message with proper format.
// H.264 video format in RTMP/FLV:
// Byte 0: (FrameType << 4) | CodecID (CodecID=7 for H.264)
// FrameType=1 (key frame), CodecID=7 (H.264) = 0x17
// Byte 1: AVCPacketType (0=sequence header, 1=NALU, 2=end of sequence)
// Byte 2-4: CompositionTime (3bytes little-endian int24)
// Remaining bytes: H.264 data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_video(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in H.264 video data
SrsBuffer stream(msg->payload(), payload_size);
// Frame type & Codec ID: Key frame (1) + H.264 (7) = 0x17
stream.write_1bytes(0x17);
// AVC packet type: 1 = NALU
stream.write_1bytes(0x01);
// Composition time: 0 (3bytes little-endian int24)
stream.write_3bytes(0x000000);
// H.264 raw data (5 bytes of dummy video data)
for (int i = 0; i < 5; i++) {
stream.write_1bytes(0x00);
}
// Feed video to source.
SrsLiveSource *source = mock_source;
HELPER_EXPECT_SUCCESS(source->on_video(msg));
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify the mock response writer received the video data
EXPECT_EQ(132, mock_writer->io.out_buffer.length());
}
// Simulate client quit event, the receive thread will get this error.
// Note that we should not sleep here, because we need to use cond to get the signal.
// If we sleep, the signal will be lost.
if (true) {
mock_io->read_error_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit");
mock_io->cond_->signal();
}
// Wait fr or coroutine to stop
cond->wait();
EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(r0));
srs_freep(r0);
}

View File

@ -361,8 +361,8 @@ void MockRtcSourceManager::reset()
fetch_or_create_count_ = 0;
}
// MockRtcStatistic implementation
MockRtcStatistic::MockRtcStatistic()
// MockAppStatistic implementation
MockAppStatistic::MockAppStatistic()
{
on_client_error_ = srs_success;
on_client_count_ = 0;
@ -373,17 +373,17 @@ MockRtcStatistic::MockRtcStatistic()
last_client_type_ = SrsRtmpConnUnknown;
}
MockRtcStatistic::~MockRtcStatistic()
MockAppStatistic::~MockAppStatistic()
{
srs_freep(on_client_error_);
}
void MockRtcStatistic::on_disconnect(std::string id, srs_error_t err)
void MockAppStatistic::on_disconnect(std::string id, srs_error_t err)
{
on_disconnect_count_++;
}
srs_error_t MockRtcStatistic::on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type)
srs_error_t MockAppStatistic::on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type)
{
on_client_count_++;
last_client_id_ = id;
@ -393,99 +393,99 @@ srs_error_t MockRtcStatistic::on_client(std::string id, ISrsRequest *req, ISrsEx
return srs_error_copy(on_client_error_);
}
srs_error_t MockRtcStatistic::on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height)
srs_error_t MockAppStatistic::on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height)
{
return srs_success;
}
srs_error_t MockRtcStatistic::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object)
srs_error_t MockAppStatistic::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object)
{
return srs_success;
}
void MockRtcStatistic::on_stream_publish(ISrsRequest *req, std::string publisher_id)
void MockAppStatistic::on_stream_publish(ISrsRequest *req, std::string publisher_id)
{
}
void MockRtcStatistic::on_stream_close(ISrsRequest *req)
void MockAppStatistic::on_stream_close(ISrsRequest *req)
{
}
void MockRtcStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta)
void MockAppStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta)
{
}
void MockRtcStatistic::kbps_sample()
void MockAppStatistic::kbps_sample()
{
}
srs_error_t MockRtcStatistic::on_video_frames(ISrsRequest *req, int nb_frames)
srs_error_t MockAppStatistic::on_video_frames(ISrsRequest *req, int nb_frames)
{
return srs_success;
}
std::string MockRtcStatistic::server_id()
std::string MockAppStatistic::server_id()
{
return "";
}
std::string MockRtcStatistic::service_id()
std::string MockAppStatistic::service_id()
{
return "";
}
std::string MockRtcStatistic::service_pid()
std::string MockAppStatistic::service_pid()
{
return "";
}
SrsStatisticVhost *MockRtcStatistic::find_vhost_by_id(std::string vid)
SrsStatisticVhost *MockAppStatistic::find_vhost_by_id(std::string vid)
{
return NULL;
}
SrsStatisticStream *MockRtcStatistic::find_stream(std::string sid)
SrsStatisticStream *MockAppStatistic::find_stream(std::string sid)
{
return NULL;
}
SrsStatisticStream *MockRtcStatistic::find_stream_by_url(std::string url)
SrsStatisticStream *MockAppStatistic::find_stream_by_url(std::string url)
{
return NULL;
}
SrsStatisticClient *MockRtcStatistic::find_client(std::string client_id)
SrsStatisticClient *MockAppStatistic::find_client(std::string client_id)
{
return NULL;
}
srs_error_t MockRtcStatistic::dumps_vhosts(SrsJsonArray *arr)
srs_error_t MockAppStatistic::dumps_vhosts(SrsJsonArray *arr)
{
return srs_success;
}
srs_error_t MockRtcStatistic::dumps_streams(SrsJsonArray *arr, int start, int count)
srs_error_t MockAppStatistic::dumps_streams(SrsJsonArray *arr, int start, int count)
{
return srs_success;
}
srs_error_t MockRtcStatistic::dumps_clients(SrsJsonArray *arr, int start, int count)
srs_error_t MockAppStatistic::dumps_clients(SrsJsonArray *arr, int start, int count)
{
return srs_success;
}
srs_error_t MockRtcStatistic::dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs)
srs_error_t MockAppStatistic::dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs)
{
return srs_success;
}
void MockRtcStatistic::set_on_client_error(srs_error_t err)
void MockAppStatistic::set_on_client_error(srs_error_t err)
{
srs_freep(on_client_error_);
on_client_error_ = srs_error_copy(err);
}
void MockRtcStatistic::reset()
void MockAppStatistic::reset()
{
srs_freep(on_client_error_);
on_client_error_ = srs_success;
@ -1185,6 +1185,7 @@ MockLiveSource::MockLiveSource()
can_publish_result_ = true;
on_audio_count_ = 0;
on_video_count_ = 0;
on_dump_packets_count_ = 0;
}
MockLiveSource::~MockLiveSource()
@ -1213,6 +1214,12 @@ srs_error_t MockLiveSource::on_edge_start_publish()
return srs_success;
}
srs_error_t MockLiveSource::consumer_dumps(ISrsLiveConsumer *consumer, bool ds, bool dm, bool dg)
{
on_dump_packets_count_++;
return SrsLiveSource::consumer_dumps(consumer, ds, dm, dg);
}
srs_error_t MockLiveSource::on_audio(SrsRtmpCommonMessage *audio)
{
on_audio_count_++;
@ -1526,12 +1533,24 @@ srs_error_t MockRtmpServer::recv_message(SrsRtmpCommonMessage **pmsg)
cond_->wait();
}
if (!recv_msgs_.empty()) {
*pmsg = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
if (recv_err_ != srs_success) {
return srs_error_copy(recv_err_);
}
return srs_error_copy(recv_err_);
if (recv_msgs_.empty()) {
return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message");
}
SrsRtmpCommonMessage *msg = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
if (pmsg) {
*pmsg = msg;
} else {
srs_freep(msg);
}
return srs_success;
}
void MockRtmpServer::set_merge_read(bool v, IMergeReadHandler *handler)
@ -1570,10 +1589,18 @@ MockProtocolReadWriter::MockProtocolReadWriter()
{
recv_timeout_ = SRS_UTIME_NO_TIMEOUT;
send_timeout_ = SRS_UTIME_NO_TIMEOUT;
read_count_ = 0;
recv_bytes_ = 0;
read_error_ = srs_success;
cond_ = new SrsCond();
}
MockProtocolReadWriter::~MockProtocolReadWriter()
{
srs_freep(read_error_);
srs_freep(cond_);
recv_msgs_.clear();
}
srs_error_t MockProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t *nread)
@ -1583,6 +1610,32 @@ srs_error_t MockProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t *
srs_error_t MockProtocolReadWriter::read(void *buf, size_t size, ssize_t *nread)
{
// No message received during playing util get control event.
if (recv_msgs_.empty()) {
cond_->wait();
}
read_count_++;
if (read_error_ != srs_success) {
return srs_error_copy(read_error_);
}
if (recv_msgs_.empty()) {
return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message");
}
string test_data_ = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
// Simulate reading data
size_t copy_size = srs_min(size, test_data_.size());
memcpy(buf, test_data_.c_str(), copy_size);
if (nread) {
*nread = copy_size;
}
recv_bytes_ += copy_size;
return srs_success;
}
@ -1726,11 +1779,19 @@ MockSslConnection::MockSslConnection()
send_timeout_ = SRS_UTIME_NO_TIMEOUT;
recv_bytes_ = 0;
send_bytes_ = 0;
read_count_ = 0;
read_error_ = srs_success;
cond_ = new SrsCond();
}
MockSslConnection::~MockSslConnection()
{
srs_freep(handshake_error_);
srs_freep(read_error_);
srs_freep(cond_);
recv_msgs_.clear();
}
srs_error_t MockSslConnection::handshake(std::string key_file, std::string crt_file)
@ -1766,7 +1827,33 @@ int64_t MockSslConnection::get_send_bytes()
srs_error_t MockSslConnection::read(void *buf, size_t size, ssize_t *nread)
{
return srs_error_new(ERROR_NOT_SUPPORTED, "mock ssl read");
// No message received during playing util get control event.
if (recv_msgs_.empty()) {
cond_->wait();
}
read_count_++;
if (read_error_ != srs_success) {
return srs_error_copy(read_error_);
}
if (recv_msgs_.empty()) {
return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message");
}
string test_data_ = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
// Simulate reading data
size_t copy_size = srs_min(size, test_data_.size());
memcpy(buf, test_data_.c_str(), copy_size);
if (nread) {
*nread = copy_size;
}
recv_bytes_ += copy_size;
return srs_success;
}
void MockSslConnection::set_send_timeout(srs_utime_t tm)
@ -1838,18 +1925,26 @@ srs_error_t MockSrtConnection::read(void *buf, size_t size, ssize_t *nread)
read_count_++;
if (!recv_msgs_.empty()) {
string test_data_ = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
// Simulate reading data
size_t copy_size = srs_min(size, test_data_.size());
memcpy(buf, test_data_.c_str(), copy_size);
*nread = copy_size;
recv_bytes_ += copy_size;
if (read_error_ != srs_success) {
return srs_error_copy(read_error_);
}
return srs_error_copy(read_error_);
if (recv_msgs_.empty()) {
return srs_error_new(ERROR_SOCKET_READ, "mock rtmp server no message");
}
string test_data_ = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
// Simulate reading data
size_t copy_size = srs_min(size, test_data_.size());
memcpy(buf, test_data_.c_str(), copy_size);
if (nread) {
*nread = copy_size;
}
recv_bytes_ += copy_size;
return srs_success;
}
srs_error_t MockSrtConnection::read_fully(void *buf, size_t size, ssize_t *nread)
@ -1962,7 +2057,12 @@ srs_error_t MockHttpParser::parse_message(ISrsReader *reader, ISrsHttpMessage **
ISrsHttpMessage *msg = messages_.front();
messages_.erase(messages_.begin());
*ppmsg = msg;
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
return srs_success;
}
@ -2132,3 +2232,191 @@ srs_error_t MockHttpServeMux::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMess
serve_http_count_++;
return srs_success;
}
// Mock request implementation for SrsBufferCache testing
MockRequest::MockRequest(std::string vhost, std::string app, std::string stream)
{
vhost_ = vhost;
app_ = app;
stream_ = stream;
host_ = "127.0.0.1";
port_ = 1935;
tcUrl_ = "rtmp://127.0.0.1/" + app;
schema_ = "rtmp";
param_ = "";
duration_ = 0;
args_ = NULL;
protocol_ = "rtmp";
objectEncoding_ = 0;
}
MockRequest::~MockRequest()
{
}
ISrsRequest *MockRequest::copy()
{
MockRequest *req = new MockRequest(vhost_, app_, stream_);
req->host_ = host_;
req->port_ = port_;
req->tcUrl_ = tcUrl_;
req->pageUrl_ = pageUrl_;
req->swfUrl_ = swfUrl_;
req->schema_ = schema_;
req->param_ = param_;
req->duration_ = duration_;
req->protocol_ = protocol_;
req->objectEncoding_ = objectEncoding_;
req->ip_ = ip_;
return req;
}
std::string MockRequest::get_stream_url()
{
if (vhost_ == "__defaultVhost__" || vhost_.empty()) {
return "/" + app_ + "/" + stream_;
} else {
return vhost_ + "/" + app_ + "/" + stream_;
}
}
void MockRequest::update_auth(ISrsRequest *req)
{
if (req) {
pageUrl_ = req->pageUrl_;
swfUrl_ = req->swfUrl_;
tcUrl_ = req->tcUrl_;
}
}
void MockRequest::strip()
{
// Mock implementation - basic string cleanup
host_ = srs_strings_remove(host_, "/ \n\r\t");
vhost_ = srs_strings_remove(vhost_, "/ \n\r\t");
app_ = srs_strings_remove(app_, " \n\r\t");
stream_ = srs_strings_remove(stream_, " \n\r\t");
app_ = srs_strings_trim_end(app_, "/");
stream_ = srs_strings_trim_end(stream_, "/");
}
ISrsRequest *MockRequest::as_http()
{
return copy();
}
MockBufferCache::MockBufferCache()
{
dump_cache_count_ = 0;
last_consumer_ = NULL;
last_jitter_ = SrsRtmpJitterAlgorithmOFF;
}
MockBufferCache::~MockBufferCache()
{
}
srs_error_t MockBufferCache::start()
{
return srs_success;
}
void MockBufferCache::stop()
{
}
bool MockBufferCache::alive()
{
return true;
}
srs_error_t MockBufferCache::dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter)
{
dump_cache_count_++;
last_consumer_ = consumer;
last_jitter_ = jitter;
return srs_success;
}
srs_error_t MockBufferCache::update_auth(ISrsRequest *r)
{
return srs_success;
}
// Mock HTTP hooks implementation
MockHttpHooks::MockHttpHooks()
{
on_stop_count_ = 0;
on_unpublish_count_ = 0;
}
MockHttpHooks::~MockHttpHooks()
{
clear_calls();
}
srs_error_t MockHttpHooks::on_connect(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes)
{
}
srs_error_t MockHttpHooks::on_publish(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_unpublish(std::string url, ISrsRequest *req)
{
on_unpublish_count_++;
on_unpublish_calls_.push_back(std::make_pair(url, req));
}
srs_error_t MockHttpHooks::on_play(std::string url, ISrsRequest *req)
{
return srs_success;
}
void MockHttpHooks::on_stop(std::string url, ISrsRequest *req)
{
on_stop_count_++;
on_stop_calls_.push_back(std::make_pair(url, req));
}
srs_error_t MockHttpHooks::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify)
{
return srs_success;
}
srs_error_t MockHttpHooks::discover_co_workers(std::string url, std::string &host, int &port)
{
return srs_success;
}
srs_error_t MockHttpHooks::on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls)
{
return srs_success;
}
void MockHttpHooks::clear_calls()
{
on_stop_calls_.clear();
on_stop_count_ = 0;
on_unpublish_calls_.clear();
on_unpublish_count_ = 0;
}

View File

@ -35,15 +35,17 @@
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#include <srs_app_http_conn.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_http_stream.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_security.hpp>
#include <srs_app_srt_conn.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_protocol_http_conn.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_http_conn.hpp>
#include <srs_app_http_conn.hpp>
// Forward declarations
class SrsRtcTrackDescription;
@ -171,7 +173,7 @@ public:
};
// Mock statistic for testing
class MockRtcStatistic : public ISrsStatistic
class MockAppStatistic : public ISrsStatistic
{
public:
srs_error_t on_client_error_;
@ -183,8 +185,8 @@ public:
SrsRtmpConnType last_client_type_;
public:
MockRtcStatistic();
virtual ~MockRtcStatistic();
MockAppStatistic();
virtual ~MockAppStatistic();
virtual void on_disconnect(std::string id, srs_error_t err);
virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type);
virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height);
@ -629,6 +631,7 @@ public:
bool can_publish_result_;
int on_audio_count_;
int on_video_count_;
int on_dump_packets_count_;
public:
MockLiveSource();
@ -637,6 +640,7 @@ public:
void set_can_publish(bool can_publish);
virtual srs_error_t on_publish();
virtual srs_error_t on_edge_start_publish();
virtual srs_error_t consumer_dumps(ISrsLiveConsumer *consumer, bool ds, bool dm, bool dg);
public:
virtual srs_error_t on_audio(SrsRtmpCommonMessage *audio);
@ -777,6 +781,13 @@ class MockProtocolReadWriter : public ISrsProtocolReadWriter
public:
srs_utime_t recv_timeout_;
srs_utime_t send_timeout_;
int64_t recv_bytes_;
int read_count_;
public:
srs_error_t read_error_;
std::vector<std::string> recv_msgs_;
SrsCond *cond_;
public:
MockProtocolReadWriter();
@ -836,6 +847,12 @@ public:
srs_utime_t send_timeout_;
int64_t recv_bytes_;
int64_t send_bytes_;
int read_count_;
public:
srs_error_t read_error_;
std::vector<std::string> recv_msgs_;
SrsCond *cond_;
public:
MockSslConnection();
@ -999,4 +1016,62 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r);
};
// Mock request class for testing SrsBufferCache
class MockRequest : public ISrsRequest
{
public:
MockRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test");
virtual ~MockRequest();
virtual ISrsRequest *copy();
virtual std::string get_stream_url();
virtual void update_auth(ISrsRequest *req);
virtual void strip();
virtual ISrsRequest *as_http();
};
// Mock buffer cache for testing AAC stream encoder
class MockBufferCache : public ISrsBufferCache
{
public:
int dump_cache_count_;
ISrsLiveConsumer *last_consumer_;
SrsRtmpJitterAlgorithm last_jitter_;
public:
MockBufferCache();
virtual ~MockBufferCache();
virtual srs_error_t start();
virtual void stop();
virtual bool alive();
virtual srs_error_t dump_cache(ISrsLiveConsumer *consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t update_auth(ISrsRequest *r);
};
// Mock HTTP hooks for testing SrsRtcAsyncCallOnStop
class MockHttpHooks : public ISrsHttpHooks
{
public:
std::vector<std::pair<std::string, ISrsRequest *> > on_stop_calls_;
int on_stop_count_;
std::vector<std::pair<std::string, ISrsRequest *> > on_unpublish_calls_;
int on_unpublish_count_;
public:
MockHttpHooks();
virtual ~MockHttpHooks();
virtual srs_error_t on_connect(std::string url, ISrsRequest *req);
virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes);
virtual srs_error_t on_publish(std::string url, ISrsRequest *req);
virtual void on_unpublish(std::string url, ISrsRequest *req);
virtual srs_error_t on_play(std::string url, ISrsRequest *req);
virtual void on_stop(std::string url, ISrsRequest *req);
virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file);
virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration);
virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify);
virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port);
virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls);
void clear_calls();
};
#endif

View File

@ -23,7 +23,7 @@ VOID TEST(RtcPlayStreamTest, ManuallyVerifyBasicWorkflow)
// Create mock objects for dependencies
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_exec;
MockExpire mock_expire;

View File

@ -39,7 +39,7 @@ VOID TEST(RtcPublishStreamTest, ManuallyVerifyBasicWorkflow)
// Create mock objects for dependencies
MockAppConfig mock_config;
MockRtcSourceManager mock_rtc_sources;
MockRtcStatistic mock_stat;
MockAppStatistic mock_stat;
MockRtcAsyncCallRequest mock_request("test.vhost", "live", "stream1");
MockRtcAsyncTaskExecutor mock_exec;
MockExpire mock_expire;

View File

@ -50,7 +50,7 @@ VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPublisher)
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockAppStatistic> mock_stat(new MockAppStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());
@ -222,7 +222,7 @@ VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPlayer)
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockAppStatistic> mock_stat(new MockAppStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());

View File

@ -47,7 +47,7 @@ VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPublisher)
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockAppStatistic> mock_stat(new MockAppStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());
@ -147,7 +147,7 @@ VOID TEST(SrtConnTest, ManuallyVerifyBasicWorkflowForPlayer)
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockAppStatistic> mock_stat(new MockAppStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());