AI: Add utest to cover heatbeat.

This commit is contained in:
OSSRS-AI 2025-10-15 09:28:16 -04:00 committed by winlin
parent 223202f121
commit 44c3dab79e
11 changed files with 1042 additions and 34 deletions

View File

@ -43,27 +43,35 @@ SrsCircuitBreaker::SrsCircuitBreaker()
hybrid_high_water_level_ = 0;
hybrid_critical_water_level_ = 0;
hybrid_dying_water_level_ = 0;
host_ = new SrsHost();
config_ = _srs_config;
shared_timer_ = _srs_shared_timer;
}
SrsCircuitBreaker::~SrsCircuitBreaker()
{
srs_freep(host_);
config_ = NULL;
}
srs_error_t SrsCircuitBreaker::initialize()
{
srs_error_t err = srs_success;
enabled_ = _srs_config->get_circuit_breaker();
high_threshold_ = _srs_config->get_high_threshold();
high_pulse_ = _srs_config->get_high_pulse();
critical_threshold_ = _srs_config->get_critical_threshold();
critical_pulse_ = _srs_config->get_critical_pulse();
dying_threshold_ = _srs_config->get_dying_threshold();
dying_pulse_ = _srs_config->get_dying_pulse();
enabled_ = config_->get_circuit_breaker();
high_threshold_ = config_->get_high_threshold();
high_pulse_ = config_->get_high_pulse();
critical_threshold_ = config_->get_critical_threshold();
critical_pulse_ = config_->get_critical_pulse();
dying_threshold_ = config_->get_dying_threshold();
dying_pulse_ = config_->get_dying_pulse();
// Update the water level for circuit breaker.
// @see SrsCircuitBreaker::on_timer()
_srs_shared_timer->timer1s()->subscribe(this);
shared_timer_->timer1s()->subscribe(this);
srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_,
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
@ -93,7 +101,7 @@ srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval)
// Update the CPU usage.
srs_update_proc_stat();
SrsProcSelfStat *stat = srs_get_self_proc_stat();
SrsProcSelfStat *stat = host_->self_proc_stat();
// Reset the high water-level when CPU is low for N times.
if (stat->percent_ * 100 > high_threshold_) {
@ -117,7 +125,7 @@ srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval)
}
// Show statistics for RTC server.
SrsProcSelfStat *u = srs_get_self_proc_stat();
SrsProcSelfStat *u = host_->self_proc_stat();
// Resident Set Size: number of pages the process has in real memory.
int memory = (int)(u->rss_ * 4 / 1024);

View File

@ -11,6 +11,10 @@
#include <srs_kernel_hourglass.hpp>
class ISrsAppConfig;
class ISrsSharedTimer;
class ISrsHost;
// Interface for circuit breaker functionality to protect server in high load conditions.
// The circuit breaker monitors CPU usage and enables different levels of protection:
// - High water level: Disables some unnecessary features to reduce CPU load
@ -48,6 +52,12 @@ public:
class SrsCircuitBreaker : public ISrsCircuitBreaker, public ISrsFastTimerHandler
{
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
ISrsAppConfig *config_;
ISrsSharedTimer *shared_timer_;
ISrsHost *host_;
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
bool enabled_;

View File

@ -396,6 +396,20 @@ public:
// Heartbeat config
virtual bool get_heartbeat_enabled() = 0;
virtual srs_utime_t get_heartbeat_interval() = 0;
virtual std::string get_heartbeat_url() = 0;
virtual std::string get_heartbeat_device_id() = 0;
virtual bool get_heartbeat_summaries() = 0;
virtual bool get_heartbeat_ports() = 0;
public:
// Circuit breaker config
virtual bool get_circuit_breaker() = 0;
virtual int get_high_threshold() = 0;
virtual int get_high_pulse() = 0;
virtual int get_critical_threshold() = 0;
virtual int get_critical_pulse() = 0;
virtual int get_dying_threshold() = 0;
virtual int get_dying_pulse() = 0;
public:
// RTMPS config
@ -1522,7 +1536,7 @@ public:
virtual std::string get_heartbeat_device_id();
// Whether report with summaries of http api: /api/v1/summaries.
virtual bool get_heartbeat_summaries();
bool get_heartbeat_ports();
virtual bool get_heartbeat_ports();
// stats section
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on

View File

@ -10,6 +10,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_http_client.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_statistic.hpp>
@ -23,10 +24,14 @@ using namespace std;
SrsHttpHeartbeat::SrsHttpHeartbeat()
{
config_ = _srs_config;
app_factory_ = _srs_app_factory;
}
SrsHttpHeartbeat::~SrsHttpHeartbeat()
{
config_ = NULL;
app_factory_ = NULL;
}
void SrsHttpHeartbeat::heartbeat()
@ -43,7 +48,7 @@ srs_error_t SrsHttpHeartbeat::do_heartbeat()
{
srs_error_t err = srs_success;
std::string url = _srs_config->get_heartbeat_url();
std::string url = config_->get_heartbeat_url();
SrsHttpUri uri;
if ((err = uri.initialize(url)) != srs_success) {
@ -51,7 +56,7 @@ srs_error_t SrsHttpHeartbeat::do_heartbeat()
}
string ip;
std::string device_id = _srs_config->get_heartbeat_device_id();
std::string device_id = config_->get_heartbeat_device_id();
// Try to load the ip from the environment variable.
ip = srs_getenv("srs.device.ip"); // SRS_DEVICE_IP
@ -60,7 +65,7 @@ srs_error_t SrsHttpHeartbeat::do_heartbeat()
SrsProtocolUtility utility;
vector<SrsIPAddress *> &ips = utility.local_ips();
if (!ips.empty()) {
ip = ips[_srs_config->get_stats_network() % (int)ips.size()]->ip_;
ip = ips[config_->get_stats_network() % (int)ips.size()]->ip_;
}
}
@ -74,82 +79,82 @@ srs_error_t SrsHttpHeartbeat::do_heartbeat()
obj->set("service", SrsJsonAny::str(stat->service_id().c_str()));
obj->set("pid", SrsJsonAny::str(stat->service_pid().c_str()));
if (_srs_config->get_heartbeat_summaries()) {
if (config_->get_heartbeat_summaries()) {
SrsJsonObject *summaries = SrsJsonAny::object();
obj->set("summaries", summaries);
srs_api_dump_summaries(summaries);
}
if (_srs_config->get_heartbeat_ports()) {
if (config_->get_heartbeat_ports()) {
// For RTMP listen endpoints.
if (true) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("rtmp", o);
vector<string> endpoints = _srs_config->get_listens();
vector<string> endpoints = config_->get_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
o->append(SrsJsonAny::str(endpoints.at(i).c_str()));
}
}
// For HTTP Stream listen endpoints.
if (_srs_config->get_http_stream_enabled()) {
if (config_->get_http_stream_enabled()) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("http", o);
vector<string> endpoints = _srs_config->get_http_stream_listens();
vector<string> endpoints = config_->get_http_stream_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
o->append(SrsJsonAny::str(endpoints.at(i).c_str()));
}
}
// For HTTP API listen endpoints.
if (_srs_config->get_http_api_enabled()) {
if (config_->get_http_api_enabled()) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("api", o);
vector<string> endpoints = _srs_config->get_http_api_listens();
vector<string> endpoints = config_->get_http_api_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
o->append(SrsJsonAny::str(endpoints.at(i).c_str()));
}
}
// For SRT listen endpoints.
if (_srs_config->get_srt_enabled()) {
if (config_->get_srt_enabled()) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("srt", o);
vector<string> endpoints = _srs_config->get_srt_listens();
vector<string> endpoints = config_->get_srt_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
o->append(SrsJsonAny::str(endpoints.at(i).c_str()));
}
}
// For RTSP listen endpoints.
if (_srs_config->get_rtsp_server_enabled()) {
if (config_->get_rtsp_server_enabled()) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("rtsp", o);
vector<string> endpoints = _srs_config->get_rtsp_server_listens();
vector<string> endpoints = config_->get_rtsp_server_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
o->append(SrsJsonAny::str(endpoints.at(i).c_str()));
}
}
// For WebRTC listen endpoints.
if (_srs_config->get_rtc_server_enabled()) {
if (config_->get_rtc_server_enabled()) {
SrsJsonArray *o = SrsJsonAny::array();
obj->set("rtc", o);
vector<string> endpoints = _srs_config->get_rtc_server_listens();
vector<string> endpoints = config_->get_rtc_server_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
string endpoint = srs_fmt_sprintf("udp://%s", endpoints.at(i).c_str());
o->append(SrsJsonAny::str(endpoint.c_str()));
}
if (_srs_config->get_rtc_server_tcp_enabled()) {
vector<string> endpoints = _srs_config->get_rtc_server_tcp_listens();
if (config_->get_rtc_server_tcp_enabled()) {
vector<string> endpoints = config_->get_rtc_server_tcp_listens();
for (int i = 0; i < (int)endpoints.size(); i++) {
string endpoint = srs_fmt_sprintf("tcp://%s", endpoints.at(i).c_str());
o->append(SrsJsonAny::str(endpoint.c_str()));
@ -158,14 +163,14 @@ srs_error_t SrsHttpHeartbeat::do_heartbeat()
}
}
SrsHttpClient http;
if ((err = http.initialize(uri.get_schema(), uri.get_host(), uri.get_port())) != srs_success) {
SrsUniquePtr<ISrsHttpClient> http(app_factory_->create_http_client());
if ((err = http->initialize(uri.get_schema(), uri.get_host(), uri.get_port())) != srs_success) {
return srs_error_wrap(err, "init uri=%s", uri.get_url().c_str());
}
std::string req = obj->dumps();
ISrsHttpMessage *msg_raw = NULL;
if ((err = http.post(uri.get_path(), req, &msg_raw)) != srs_success) {
if ((err = http->post(uri.get_path(), req, &msg_raw)) != srs_success) {
return srs_error_wrap(err, "http post hartbeart uri failed. url=%s, request=%s", url.c_str(), req.c_str());
}

View File

@ -9,9 +9,17 @@
#include <srs_core.hpp>
class ISrsAppConfig;
class ISrsAppFactory;
// The http heartbeat to api-server to notice api that the information of SRS.
class SrsHttpHeartbeat
{
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
ISrsAppConfig *config_;
ISrsAppFactory *app_factory_;
public:
SrsHttpHeartbeat();
virtual ~SrsHttpHeartbeat();

View File

@ -11,6 +11,7 @@
#include <srs_app_http_conn.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
@ -18,7 +19,6 @@
#include <srs_kernel_utility.hpp>
#include <srs_protocol_json.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_factory.hpp>
#include <sstream>
#include <unistd.h>

View File

@ -324,6 +324,27 @@ int64_t SrsProcSystemStat::total()
return user_ + nice_ + sys_ + idle_ + iowait_ + irq_ + softirq_ + steal_ + guest_;
}
ISrsHost::ISrsHost()
{
}
ISrsHost::~ISrsHost()
{
}
SrsHost::SrsHost()
{
}
SrsHost::~SrsHost()
{
}
SrsProcSelfStat *SrsHost::self_proc_stat()
{
return srs_get_self_proc_stat();
}
SrsProcSelfStat *srs_get_self_proc_stat()
{
return &_srs_system_cpu_self_stat;

View File

@ -330,6 +330,28 @@ public:
int64_t total();
};
// The host interface.
class ISrsHost
{
public:
ISrsHost();
virtual ~ISrsHost();
public:
virtual SrsProcSelfStat *self_proc_stat() = 0;
};
// Get the host info.
class SrsHost : public ISrsHost
{
public:
SrsHost();
virtual ~SrsHost();
public:
virtual SrsProcSelfStat *self_proc_stat();
};
// Get system cpu stat, use cache to avoid performance problem.
extern SrsProcSelfStat *srs_get_self_proc_stat();
// Get system cpu stat, use cache to avoid performance problem.

View File

@ -9,14 +9,18 @@
using namespace std;
#include <srs_app_caster_flv.hpp>
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_encoder.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_heartbeat.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_latest_version.hpp>
#include <srs_app_mpegts_udp.hpp>
#include <srs_app_ng_exec.hpp>
#include <srs_app_process.hpp>
#include <srs_app_recv_thread.hpp>
#include <srs_app_refer.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_packet.hpp>
#include <srs_kernel_utility.hpp>
@ -3958,3 +3962,696 @@ VOID TEST(NgExecTest, ParseExecPublishWithMultipleArgs)
// Clean up - set to NULL to avoid double-free
ng_exec->config_ = NULL;
}
// Mock ISrsHttpMessage implementation for SrsHttpHeartbeat
MockHttpMessageForHeartbeat::MockHttpMessageForHeartbeat()
{
status_code_ = 200;
body_content_ = "";
body_reader_ = new MockBufferReader("");
}
MockHttpMessageForHeartbeat::~MockHttpMessageForHeartbeat()
{
srs_freep(body_reader_);
}
uint8_t MockHttpMessageForHeartbeat::method()
{
return 0;
}
uint16_t MockHttpMessageForHeartbeat::status_code()
{
return status_code_;
}
std::string MockHttpMessageForHeartbeat::method_str()
{
return "";
}
std::string MockHttpMessageForHeartbeat::url()
{
return "";
}
std::string MockHttpMessageForHeartbeat::host()
{
return "";
}
std::string MockHttpMessageForHeartbeat::path()
{
return "";
}
std::string MockHttpMessageForHeartbeat::query()
{
return "";
}
std::string MockHttpMessageForHeartbeat::ext()
{
return "";
}
srs_error_t MockHttpMessageForHeartbeat::body_read_all(std::string &body)
{
body = body_content_;
return srs_success;
}
ISrsHttpResponseReader *MockHttpMessageForHeartbeat::body_reader()
{
return NULL;
}
int64_t MockHttpMessageForHeartbeat::content_length()
{
return body_content_.length();
}
std::string MockHttpMessageForHeartbeat::query_get(std::string key)
{
return "";
}
SrsHttpHeader *MockHttpMessageForHeartbeat::header()
{
return NULL;
}
bool MockHttpMessageForHeartbeat::is_jsonp()
{
return false;
}
bool MockHttpMessageForHeartbeat::is_keep_alive()
{
return false;
}
ISrsRequest *MockHttpMessageForHeartbeat::to_request(std::string vhost)
{
return NULL;
}
std::string MockHttpMessageForHeartbeat::parse_rest_id(std::string pattern)
{
return "";
}
uint8_t MockHttpMessageForHeartbeat::message_type()
{
return 0;
}
bool MockHttpMessageForHeartbeat::is_http_get()
{
return false;
}
bool MockHttpMessageForHeartbeat::is_http_put()
{
return false;
}
bool MockHttpMessageForHeartbeat::is_http_post()
{
return true;
}
bool MockHttpMessageForHeartbeat::is_http_delete()
{
return false;
}
bool MockHttpMessageForHeartbeat::is_http_options()
{
return false;
}
std::string MockHttpMessageForHeartbeat::uri()
{
return "";
}
void MockHttpMessageForHeartbeat::reset()
{
status_code_ = 200;
body_content_ = "";
}
// Mock ISrsHttpClient implementation for SrsHttpHeartbeat
MockHttpClientForHeartbeat::MockHttpClientForHeartbeat()
{
initialize_called_ = false;
post_called_ = false;
port_ = 0;
mock_response_ = NULL;
initialize_error_ = srs_success;
post_error_ = srs_success;
should_delete_response_ = true;
}
MockHttpClientForHeartbeat::~MockHttpClientForHeartbeat()
{
srs_freep(initialize_error_);
srs_freep(post_error_);
// Don't delete mock_response_ - it's managed by the caller or already deleted
mock_response_ = NULL;
}
srs_error_t MockHttpClientForHeartbeat::initialize(std::string schema, std::string h, int p, srs_utime_t tm)
{
initialize_called_ = true;
schema_ = schema;
host_ = h;
port_ = p;
return srs_error_copy(initialize_error_);
}
srs_error_t MockHttpClientForHeartbeat::get(std::string path, std::string req, ISrsHttpMessage **ppmsg)
{
return srs_success;
}
srs_error_t MockHttpClientForHeartbeat::post(std::string path, std::string req, ISrsHttpMessage **ppmsg)
{
post_called_ = true;
path_ = path;
request_body_ = req;
if (ppmsg && mock_response_) {
*ppmsg = (ISrsHttpMessage *)mock_response_;
}
return srs_error_copy(post_error_);
}
void MockHttpClientForHeartbeat::set_recv_timeout(srs_utime_t tm)
{
}
void MockHttpClientForHeartbeat::kbps_sample(const char *label, srs_utime_t age)
{
}
void MockHttpClientForHeartbeat::reset()
{
initialize_called_ = false;
post_called_ = false;
port_ = 0;
schema_ = "";
host_ = "";
path_ = "";
request_body_ = "";
}
// Mock ISrsAppFactory implementation for SrsHttpHeartbeat
MockAppFactoryForHeartbeat::MockAppFactoryForHeartbeat()
{
mock_http_client_ = NULL;
create_http_client_called_ = false;
should_delete_client_ = true;
}
MockAppFactoryForHeartbeat::~MockAppFactoryForHeartbeat()
{
if (should_delete_client_) {
srs_freep(mock_http_client_);
}
}
ISrsHttpClient *MockAppFactoryForHeartbeat::create_http_client()
{
create_http_client_called_ = true;
return mock_http_client_;
}
void MockAppFactoryForHeartbeat::reset()
{
create_http_client_called_ = false;
}
// Mock ISrsAppConfig implementation for SrsHttpHeartbeat
MockAppConfigForHeartbeat::MockAppConfigForHeartbeat()
{
heartbeat_url_ = "";
heartbeat_device_id_ = "";
heartbeat_summaries_ = false;
heartbeat_ports_ = false;
stats_network_ = 0;
http_stream_enabled_ = false;
http_api_enabled_ = false;
srt_enabled_ = false;
rtsp_server_enabled_ = false;
rtc_server_enabled_ = false;
rtc_server_tcp_enabled_ = false;
}
MockAppConfigForHeartbeat::~MockAppConfigForHeartbeat()
{
}
std::string MockAppConfigForHeartbeat::get_heartbeat_url()
{
return heartbeat_url_;
}
std::string MockAppConfigForHeartbeat::get_heartbeat_device_id()
{
return heartbeat_device_id_;
}
bool MockAppConfigForHeartbeat::get_heartbeat_summaries()
{
return heartbeat_summaries_;
}
bool MockAppConfigForHeartbeat::get_heartbeat_ports()
{
return heartbeat_ports_;
}
int MockAppConfigForHeartbeat::get_stats_network()
{
return stats_network_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_listens()
{
return listens_;
}
bool MockAppConfigForHeartbeat::get_http_stream_enabled()
{
return http_stream_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_http_stream_listens()
{
return http_stream_listens_;
}
bool MockAppConfigForHeartbeat::get_http_api_enabled()
{
return http_api_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_http_api_listens()
{
return http_api_listens_;
}
bool MockAppConfigForHeartbeat::get_srt_enabled()
{
return srt_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_srt_listens()
{
return srt_listens_;
}
bool MockAppConfigForHeartbeat::get_rtsp_server_enabled()
{
return rtsp_server_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_rtsp_server_listens()
{
return rtsp_server_listens_;
}
bool MockAppConfigForHeartbeat::get_rtc_server_enabled()
{
return rtc_server_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_rtc_server_listens()
{
return rtc_server_listens_;
}
bool MockAppConfigForHeartbeat::get_rtc_server_tcp_enabled()
{
return rtc_server_tcp_enabled_;
}
std::vector<std::string> MockAppConfigForHeartbeat::get_rtc_server_tcp_listens()
{
return rtc_server_tcp_listens_;
}
// Mock ISrsAppConfig implementation for SrsCircuitBreaker
MockAppConfigForCircuitBreaker::MockAppConfigForCircuitBreaker()
{
circuit_breaker_enabled_ = true;
high_threshold_ = 90;
high_pulse_ = 2;
critical_threshold_ = 95;
critical_pulse_ = 1;
dying_threshold_ = 99;
dying_pulse_ = 5;
}
MockAppConfigForCircuitBreaker::~MockAppConfigForCircuitBreaker()
{
}
bool MockAppConfigForCircuitBreaker::get_circuit_breaker()
{
return circuit_breaker_enabled_;
}
int MockAppConfigForCircuitBreaker::get_high_threshold()
{
return high_threshold_;
}
int MockAppConfigForCircuitBreaker::get_high_pulse()
{
return high_pulse_;
}
int MockAppConfigForCircuitBreaker::get_critical_threshold()
{
return critical_threshold_;
}
int MockAppConfigForCircuitBreaker::get_critical_pulse()
{
return critical_pulse_;
}
int MockAppConfigForCircuitBreaker::get_dying_threshold()
{
return dying_threshold_;
}
int MockAppConfigForCircuitBreaker::get_dying_pulse()
{
return dying_pulse_;
}
// Mock ISrsFastTimer implementation for SrsCircuitBreaker
MockFastTimerForCircuitBreaker::MockFastTimerForCircuitBreaker()
{
subscribe_called_ = false;
subscribed_handler_ = NULL;
}
MockFastTimerForCircuitBreaker::~MockFastTimerForCircuitBreaker()
{
}
srs_error_t MockFastTimerForCircuitBreaker::start()
{
return srs_success;
}
void MockFastTimerForCircuitBreaker::subscribe(ISrsFastTimerHandler *handler)
{
subscribe_called_ = true;
subscribed_handler_ = handler;
}
void MockFastTimerForCircuitBreaker::unsubscribe(ISrsFastTimerHandler *handler)
{
}
void MockFastTimerForCircuitBreaker::reset()
{
subscribe_called_ = false;
subscribed_handler_ = NULL;
}
// Mock ISrsSharedTimer implementation for SrsCircuitBreaker
MockSharedTimerForCircuitBreaker::MockSharedTimerForCircuitBreaker()
{
timer1s_ = new MockFastTimerForCircuitBreaker();
}
MockSharedTimerForCircuitBreaker::~MockSharedTimerForCircuitBreaker()
{
srs_freep(timer1s_);
}
ISrsFastTimer *MockSharedTimerForCircuitBreaker::timer20ms()
{
return NULL;
}
ISrsFastTimer *MockSharedTimerForCircuitBreaker::timer100ms()
{
return NULL;
}
ISrsFastTimer *MockSharedTimerForCircuitBreaker::timer1s()
{
return timer1s_;
}
ISrsFastTimer *MockSharedTimerForCircuitBreaker::timer5s()
{
return NULL;
}
// Mock ISrsHost implementation for SrsCircuitBreaker
MockHostForCircuitBreaker::MockHostForCircuitBreaker()
{
proc_stat_ = new SrsProcSelfStat();
proc_stat_->percent_ = 0.0;
}
MockHostForCircuitBreaker::~MockHostForCircuitBreaker()
{
srs_freep(proc_stat_);
}
SrsProcSelfStat *MockHostForCircuitBreaker::self_proc_stat()
{
return proc_stat_;
}
VOID TEST(HttpHeartbeatTest, DoHeartbeatWithAllPortsEnabled)
{
srs_error_t err = srs_success;
// Create mock config with all protocols enabled
SrsUniquePtr<MockAppConfigForHeartbeat> mock_config(new MockAppConfigForHeartbeat());
mock_config->heartbeat_url_ = "http://127.0.0.1:8080/api/v1/heartbeat";
mock_config->heartbeat_device_id_ = "test-device-001";
mock_config->heartbeat_summaries_ = false;
mock_config->heartbeat_ports_ = true;
mock_config->stats_network_ = 0;
// Configure RTMP listen endpoints
mock_config->listens_.push_back("1935");
mock_config->listens_.push_back("19350");
// Configure HTTP stream endpoints
mock_config->http_stream_enabled_ = true;
mock_config->http_stream_listens_.push_back("8080");
// Configure HTTP API endpoints
mock_config->http_api_enabled_ = true;
mock_config->http_api_listens_.push_back("1985");
// Configure SRT endpoints
mock_config->srt_enabled_ = true;
mock_config->srt_listens_.push_back("10080");
// Configure RTSP endpoints
mock_config->rtsp_server_enabled_ = true;
mock_config->rtsp_server_listens_.push_back("554");
// Configure WebRTC endpoints
mock_config->rtc_server_enabled_ = true;
mock_config->rtc_server_listens_.push_back("8000");
mock_config->rtc_server_tcp_enabled_ = true;
mock_config->rtc_server_tcp_listens_.push_back("8000");
// Create mock HTTP response
MockHttpMessageForHeartbeat *mock_response = new MockHttpMessageForHeartbeat();
mock_response->status_code_ = 200;
mock_response->body_content_ = "{\"code\":0,\"data\":{}}";
// Create mock HTTP client
MockHttpClientForHeartbeat *mock_http_client = new MockHttpClientForHeartbeat();
mock_http_client->mock_response_ = mock_response;
mock_http_client->should_delete_response_ = true; // Client will delete response
// Create mock app factory
SrsUniquePtr<MockAppFactoryForHeartbeat> mock_factory(new MockAppFactoryForHeartbeat());
mock_factory->mock_http_client_ = mock_http_client;
mock_factory->should_delete_client_ = true; // Factory will delete client
// Create SrsHttpHeartbeat and inject mocks
SrsUniquePtr<SrsHttpHeartbeat> heartbeat(new SrsHttpHeartbeat());
heartbeat->config_ = mock_config.get();
heartbeat->app_factory_ = mock_factory.get();
// Execute do_heartbeat
// Note: This will delete mock_http_client and mock_response via SrsUniquePtr in do_heartbeat()
// So we cannot verify mock_http_client fields after this call
HELPER_EXPECT_SUCCESS(heartbeat->do_heartbeat());
// Clean up - set to NULL to avoid double-free
heartbeat->config_ = NULL;
heartbeat->app_factory_ = NULL;
mock_factory->mock_http_client_ = NULL; // Already deleted by do_heartbeat()
}
VOID TEST(ReferTest, CheckReferWithMatchingDomain)
{
srs_error_t err;
// Create SrsRefer instance
SrsUniquePtr<SrsRefer> refer(new SrsRefer());
// Create refer directive with allowed domains
SrsUniquePtr<SrsConfDirective> refer_conf(new SrsConfDirective());
refer_conf->name_ = "refer";
refer_conf->args_.push_back("github.com");
refer_conf->args_.push_back("github.io");
// Test 1: Valid page URL matching github.com domain
HELPER_EXPECT_SUCCESS(refer->check("http://www.github.com/path", refer_conf.get()));
// Test 2: Valid page URL matching github.io domain
HELPER_EXPECT_SUCCESS(refer->check("https://ossrs.github.io/index.html", refer_conf.get()));
// Test 3: Valid page URL with port number
HELPER_EXPECT_SUCCESS(refer->check("http://api.github.com:8080/api", refer_conf.get()));
// Test 4: Invalid page URL not matching any allowed domain
HELPER_EXPECT_FAILED(refer->check("http://example.com/page", refer_conf.get()));
// Test 5: NULL refer directive should allow all
HELPER_EXPECT_SUCCESS(refer->check("http://any-domain.com/page", NULL));
// Test 6: Empty refer directive should deny access
SrsUniquePtr<SrsConfDirective> empty_refer(new SrsConfDirective());
empty_refer->name_ = "refer";
HELPER_EXPECT_FAILED(refer->check("http://example.com/page", empty_refer.get()));
}
// Test SrsCircuitBreaker - covers the major use scenario:
// This test verifies the complete circuit breaker functionality including:
// 1. Initialize with configuration (enabled, thresholds, pulses)
// 2. Subscribe to timer for periodic CPU monitoring
// 3. Simulate CPU load changes via on_timer() callback
// 4. Verify water level transitions (high -> critical -> dying)
// 5. Verify water level decay when CPU load decreases
// 6. Test all three protection levels: high_water_level, critical_water_level, dying_water_level
VOID TEST(CircuitBreakerTest, InitializeAndWaterLevelTransitions)
{
srs_error_t err;
// Create mock dependencies
SrsUniquePtr<MockAppConfigForCircuitBreaker> mock_config(new MockAppConfigForCircuitBreaker());
SrsUniquePtr<MockSharedTimerForCircuitBreaker> mock_timer(new MockSharedTimerForCircuitBreaker());
MockHostForCircuitBreaker *mock_host = new MockHostForCircuitBreaker();
// Configure circuit breaker settings
mock_config->circuit_breaker_enabled_ = true;
mock_config->high_threshold_ = 90; // CPU > 90% triggers high water level
mock_config->high_pulse_ = 2; // High level lasts for 2 timer ticks
mock_config->critical_threshold_ = 95; // CPU > 95% triggers critical water level
mock_config->critical_pulse_ = 1; // Critical level lasts for 1 timer tick
mock_config->dying_threshold_ = 99; // CPU > 99% triggers dying water level
mock_config->dying_pulse_ = 5; // Dying level requires 5 consecutive ticks
// Create SrsCircuitBreaker
SrsUniquePtr<SrsCircuitBreaker> breaker(new SrsCircuitBreaker());
// Inject mock dependencies
breaker->config_ = mock_config.get();
breaker->shared_timer_ = mock_timer.get();
breaker->host_ = mock_host;
// Test 1: Initialize - should load config and subscribe to timer
HELPER_EXPECT_SUCCESS(breaker->initialize());
// Verify timer subscription
EXPECT_TRUE(mock_timer->timer1s_->subscribe_called_);
EXPECT_EQ(breaker.get(), mock_timer->timer1s_->subscribed_handler_);
// Test 2: Initially all water levels should be false (CPU is 0%)
EXPECT_FALSE(breaker->hybrid_high_water_level());
EXPECT_FALSE(breaker->hybrid_critical_water_level());
EXPECT_FALSE(breaker->hybrid_dying_water_level());
// Test 3: Simulate high CPU load (91% > high_threshold 90%)
mock_host->proc_stat_->percent_ = 0.91; // 91% CPU
HELPER_EXPECT_SUCCESS(breaker->on_timer(1 * SRS_UTIME_SECONDS));
// After 1 tick with high CPU, high water level should be active
EXPECT_TRUE(breaker->hybrid_high_water_level());
EXPECT_FALSE(breaker->hybrid_critical_water_level());
EXPECT_FALSE(breaker->hybrid_dying_water_level());
// Test 4: Simulate critical CPU load (96% > critical_threshold 95%)
mock_host->proc_stat_->percent_ = 0.96; // 96% CPU
HELPER_EXPECT_SUCCESS(breaker->on_timer(1 * SRS_UTIME_SECONDS));
// After 1 tick with critical CPU, both high and critical should be active
EXPECT_TRUE(breaker->hybrid_high_water_level());
EXPECT_TRUE(breaker->hybrid_critical_water_level());
EXPECT_FALSE(breaker->hybrid_dying_water_level());
// Test 5: Simulate dying CPU load (99.5% > dying_threshold 99%)
// Need 5 consecutive ticks to activate dying level
mock_host->proc_stat_->percent_ = 0.995; // 99.5% CPU
for (int i = 0; i < 5; i++) {
HELPER_EXPECT_SUCCESS(breaker->on_timer(1 * SRS_UTIME_SECONDS));
}
// After 5 consecutive ticks with dying CPU, all levels should be active
EXPECT_TRUE(breaker->hybrid_high_water_level());
EXPECT_TRUE(breaker->hybrid_critical_water_level());
EXPECT_TRUE(breaker->hybrid_dying_water_level());
// Test 6: Simulate CPU load decrease (back to 50%)
mock_host->proc_stat_->percent_ = 0.50; // 50% CPU
HELPER_EXPECT_SUCCESS(breaker->on_timer(1 * SRS_UTIME_SECONDS));
// Dying level should immediately reset to 0 when CPU drops
EXPECT_FALSE(breaker->hybrid_dying_water_level());
// Critical should also become false (since dying is false and critical_water_level_ will decay)
// But high should still be active (high_water_level_ = 2, needs 2 ticks to decay)
EXPECT_TRUE(breaker->hybrid_high_water_level());
// Critical is false because dying is false and critical_water_level_ decayed from 1 to 0
EXPECT_FALSE(breaker->hybrid_critical_water_level());
// Test 7: Continue with low CPU - high should decay to 0 after 1 more tick
HELPER_EXPECT_SUCCESS(breaker->on_timer(1 * SRS_UTIME_SECONDS));
EXPECT_FALSE(breaker->hybrid_high_water_level()); // High now false (high_water_level_ = 0)
EXPECT_FALSE(breaker->hybrid_critical_water_level());
EXPECT_FALSE(breaker->hybrid_dying_water_level());
// Test 9: Verify disabled circuit breaker returns false for all levels
mock_config->circuit_breaker_enabled_ = false;
SrsUniquePtr<SrsCircuitBreaker> disabled_breaker(new SrsCircuitBreaker());
disabled_breaker->config_ = mock_config.get();
disabled_breaker->shared_timer_ = mock_timer.get();
disabled_breaker->host_ = mock_host;
HELPER_EXPECT_SUCCESS(disabled_breaker->initialize());
// Even with high CPU, disabled breaker should return false
mock_host->proc_stat_->percent_ = 0.99;
HELPER_EXPECT_SUCCESS(disabled_breaker->on_timer(1 * SRS_UTIME_SECONDS));
EXPECT_FALSE(disabled_breaker->hybrid_high_water_level());
EXPECT_FALSE(disabled_breaker->hybrid_critical_water_level());
EXPECT_FALSE(disabled_breaker->hybrid_dying_water_level());
// Clean up - set to NULL to avoid double-free
breaker->config_ = NULL;
breaker->shared_timer_ = NULL;
breaker->host_ = NULL;
disabled_breaker->config_ = NULL;
disabled_breaker->shared_timer_ = NULL;
disabled_breaker->host_ = NULL;
srs_freep(mock_host);
}

View File

@ -13,18 +13,21 @@
#include <srs_utest.hpp>
#include <srs_app_caster_flv.hpp>
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_mpegts_udp.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_pithy_print.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_protocol_raw_avc.hpp>
#include <srs_protocol_rtmp_conn.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_utest_app10.hpp>
#include <srs_utest_app11.hpp>
#include <srs_utest_app6.hpp>
#include <srs_utest_kernel.hpp>
// Mock ISrsAppConfig for testing SrsUdpCasterListener
class MockAppConfigForUdpCaster : public MockAppConfig
@ -877,4 +880,213 @@ public:
void reset();
};
// Mock ISrsHttpMessage for testing SrsHttpHeartbeat
class MockHttpMessageForHeartbeat : public ISrsHttpMessage
{
public:
int status_code_;
std::string body_content_;
ISrsReader *body_reader_;
public:
MockHttpMessageForHeartbeat();
virtual ~MockHttpMessageForHeartbeat();
public:
virtual uint8_t method();
virtual uint16_t status_code();
virtual std::string method_str();
virtual std::string url();
virtual std::string host();
virtual std::string path();
virtual std::string query();
virtual std::string ext();
virtual srs_error_t body_read_all(std::string &body);
virtual ISrsHttpResponseReader *body_reader();
virtual int64_t content_length();
virtual std::string query_get(std::string key);
virtual SrsHttpHeader *header();
virtual bool is_jsonp();
virtual bool is_keep_alive();
virtual ISrsRequest *to_request(std::string vhost);
virtual std::string parse_rest_id(std::string pattern);
virtual uint8_t message_type();
virtual bool is_http_get();
virtual bool is_http_put();
virtual bool is_http_post();
virtual bool is_http_delete();
virtual bool is_http_options();
virtual std::string uri();
void reset();
};
// Mock ISrsHttpClient for testing SrsHttpHeartbeat
class MockHttpClientForHeartbeat : public ISrsHttpClient
{
public:
bool initialize_called_;
bool post_called_;
std::string schema_;
std::string host_;
int port_;
std::string path_;
std::string request_body_;
MockHttpMessageForHeartbeat *mock_response_;
srs_error_t initialize_error_;
srs_error_t post_error_;
bool should_delete_response_;
public:
MockHttpClientForHeartbeat();
virtual ~MockHttpClientForHeartbeat();
public:
virtual srs_error_t initialize(std::string schema, std::string h, int p, srs_utime_t tm = SRS_HTTP_CLIENT_TIMEOUT);
virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage **ppmsg);
virtual srs_error_t post(std::string path, std::string req, ISrsHttpMessage **ppmsg);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char *label, srs_utime_t age);
void reset();
};
// Mock ISrsAppFactory for testing SrsHttpHeartbeat
class MockAppFactoryForHeartbeat : public SrsAppFactory
{
public:
MockHttpClientForHeartbeat *mock_http_client_;
bool create_http_client_called_;
bool should_delete_client_;
public:
MockAppFactoryForHeartbeat();
virtual ~MockAppFactoryForHeartbeat();
public:
virtual ISrsHttpClient *create_http_client();
void reset();
};
// Mock ISrsAppConfig for testing SrsHttpHeartbeat
class MockAppConfigForHeartbeat : public MockAppConfig
{
public:
std::string heartbeat_url_;
std::string heartbeat_device_id_;
bool heartbeat_summaries_;
bool heartbeat_ports_;
int stats_network_;
bool http_stream_enabled_;
bool http_api_enabled_;
bool srt_enabled_;
bool rtsp_server_enabled_;
bool rtc_server_enabled_;
bool rtc_server_tcp_enabled_;
std::vector<std::string> listens_;
std::vector<std::string> http_stream_listens_;
std::vector<std::string> http_api_listens_;
std::vector<std::string> srt_listens_;
std::vector<std::string> rtsp_server_listens_;
std::vector<std::string> rtc_server_listens_;
std::vector<std::string> rtc_server_tcp_listens_;
public:
MockAppConfigForHeartbeat();
virtual ~MockAppConfigForHeartbeat();
public:
virtual std::string get_heartbeat_url();
virtual std::string get_heartbeat_device_id();
virtual bool get_heartbeat_summaries();
virtual bool get_heartbeat_ports();
virtual int get_stats_network();
virtual std::vector<std::string> get_listens();
virtual bool get_http_stream_enabled();
virtual std::vector<std::string> get_http_stream_listens();
virtual bool get_http_api_enabled();
virtual std::vector<std::string> get_http_api_listens();
virtual bool get_srt_enabled();
virtual std::vector<std::string> get_srt_listens();
virtual bool get_rtsp_server_enabled();
virtual std::vector<std::string> get_rtsp_server_listens();
virtual bool get_rtc_server_enabled();
virtual std::vector<std::string> get_rtc_server_listens();
virtual bool get_rtc_server_tcp_enabled();
virtual std::vector<std::string> get_rtc_server_tcp_listens();
};
// Mock ISrsAppConfig for testing SrsCircuitBreaker
class MockAppConfigForCircuitBreaker : public MockAppConfig
{
public:
bool circuit_breaker_enabled_;
int high_threshold_;
int high_pulse_;
int critical_threshold_;
int critical_pulse_;
int dying_threshold_;
int dying_pulse_;
public:
MockAppConfigForCircuitBreaker();
virtual ~MockAppConfigForCircuitBreaker();
public:
virtual bool get_circuit_breaker();
virtual int get_high_threshold();
virtual int get_high_pulse();
virtual int get_critical_threshold();
virtual int get_critical_pulse();
virtual int get_dying_threshold();
virtual int get_dying_pulse();
};
// Mock ISrsFastTimer for testing SrsCircuitBreaker
class MockFastTimerForCircuitBreaker : public ISrsFastTimer
{
public:
bool subscribe_called_;
ISrsFastTimerHandler *subscribed_handler_;
public:
MockFastTimerForCircuitBreaker();
virtual ~MockFastTimerForCircuitBreaker();
public:
virtual srs_error_t start();
virtual void subscribe(ISrsFastTimerHandler *handler);
virtual void unsubscribe(ISrsFastTimerHandler *handler);
void reset();
};
// Mock ISrsSharedTimer for testing SrsCircuitBreaker
class MockSharedTimerForCircuitBreaker : public ISrsSharedTimer
{
public:
MockFastTimerForCircuitBreaker *timer1s_;
public:
MockSharedTimerForCircuitBreaker();
virtual ~MockSharedTimerForCircuitBreaker();
public:
virtual ISrsFastTimer *timer20ms();
virtual ISrsFastTimer *timer100ms();
virtual ISrsFastTimer *timer1s();
virtual ISrsFastTimer *timer5s();
};
// Mock ISrsHost for testing SrsCircuitBreaker
class MockHostForCircuitBreaker : public ISrsHost
{
public:
SrsProcSelfStat *proc_stat_;
public:
MockHostForCircuitBreaker();
virtual ~MockHostForCircuitBreaker();
public:
virtual SrsProcSelfStat *self_proc_stat();
};
#endif

View File

@ -403,6 +403,17 @@ public:
virtual int get_stats_network() { return 0; }
virtual bool get_heartbeat_enabled() { return false; }
virtual srs_utime_t get_heartbeat_interval() { return 0; }
virtual std::string get_heartbeat_url() { return ""; }
virtual std::string get_heartbeat_device_id() { return ""; }
virtual bool get_heartbeat_summaries() { return false; }
virtual bool get_heartbeat_ports() { return false; }
virtual bool get_circuit_breaker() { return false; }
virtual int get_high_threshold() { return 0; }
virtual int get_high_pulse() { return 0; }
virtual int get_critical_threshold() { return 0; }
virtual int get_critical_pulse() { return 0; }
virtual int get_dying_threshold() { return 0; }
virtual int get_dying_pulse() { return 0; }
virtual std::string get_rtmps_ssl_cert() { return ""; }
virtual std::string get_rtmps_ssl_key() { return ""; }
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) { return NULL; }