diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 72459cc42..1a0fd1682 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -243,7 +243,6 @@ Experts: --clean=on|off Whether do 'make clean' when configure. Default: $(value2switch $SRS_CLEAN) --simulator=on|off RTC: Whether enable network simulator. Default: $(value2switch $SRS_SIMULATOR) --generate-objs=on|off RTC: Whether generate objs and quit. Default: $(value2switch $SRS_GENERATE_OBJS) - --single-thread=on|off Whether force single thread mode. Default: $(value2switch $SRS_SINGLE_THREAD) --signal-api=on|off Whether support sending signal by HTTP API. Default: $(value2switch $SRS_SIGNAL_API) --build-tag= Set the build object directory suffix. --debug=on|off Whether enable the debug code, may hurt performance. Default: $(value2switch $SRS_DEBUG) @@ -258,6 +257,7 @@ Experts: --generic-linux=on|off Whether run as generic linux, if not CentOS or Ubuntu. Default: $(value2switch $SRS_GENERIC_LINUX) Deprecated: + --single-thread=on|off Whether force single thread mode. Default: $(value2switch $SRS_SINGLE_THREAD) --cross-build Enable cross-build, please set bellow Toolchain also. Default: $(value2switch $SRS_CROSS_BUILD) --hds=on|off Whether build the hds streaming, mux RTMP to F4M/F4V files. Default: $(value2switch $SRS_HDS) --osx Enable build for OSX/Darwin AppleOS. Deprecated for automatically detecting the OS. @@ -595,10 +595,10 @@ function apply_auto_options() { echo "Disable SRT for cygwin64" SRS_SRT=NO fi - # TODO: FIXME: Cygwin: ST stuck when working in multiple threads mode. - # See https://github.com/ossrs/srs/issues/3253 - if [[ $SRS_CYGWIN64 == YES && $SRS_SINGLE_THREAD != YES ]]; then - echo "Force single thread for cygwin64" + + # Force single thread mode always - multi-threading support has been removed + if [[ $SRS_SINGLE_THREAD != YES ]]; then + echo "Warning: Multi-threading support has been removed. Forcing single thread mode." SRS_SINGLE_THREAD=YES fi diff --git a/trunk/configure b/trunk/configure index 5c22f8916..ba1f988ea 100755 --- a/trunk/configure +++ b/trunk/configure @@ -334,7 +334,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" - "srs_app_coworkers" "srs_app_hybrid" "srs_app_threads") + "srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker") if [[ $SRS_SRT == YES ]]; then MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source") fi diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 2d824e7ae..a430cbde3 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-08-20, Merge [#4445](https://github.com/ossrs/srs/pull/4445): AI: Remove multi-threading support and change to single-thread architecture. v7.0.59 (#4445) * v7.0, 2025-08-19, Merge [#4444](https://github.com/ossrs/srs/pull/4444): AI: Refine hooks from static to instance functions. v7.0.58 (#4444) * v7.0, 2025-08-19, Merge [#3126](https://github.com/ossrs/srs/pull/3126): HLS: restore HLS information when republish stream.(#3088). v7.0.57 (#3126) * v7.0, 2025-08-18, Merge [#4443](https://github.com/ossrs/srs/pull/4443): Support RTMPS server. v7.0.56 (#4443) diff --git a/trunk/src/app/srs_app_circuit_breaker.cpp b/trunk/src/app/srs_app_circuit_breaker.cpp new file mode 100644 index 000000000..471c80deb --- /dev/null +++ b/trunk/src/app/srs_app_circuit_breaker.cpp @@ -0,0 +1,148 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#include + +#include +#include +#include +#include +#include + +#ifdef SRS_RTC +#include +extern SrsPps *_srs_pps_snack2; +extern SrsPps *_srs_pps_snack3; +extern SrsPps *_srs_pps_snack4; +#endif + +using namespace std; + +ISrsCircuitBreaker::ISrsCircuitBreaker() +{ +} + +ISrsCircuitBreaker::~ISrsCircuitBreaker() +{ +} + +SrsCircuitBreaker::SrsCircuitBreaker() +{ + enabled_ = false; + high_threshold_ = 0; + high_pulse_ = 0; + critical_threshold_ = 0; + critical_pulse_ = 0; + dying_threshold_ = 0; + dying_pulse_ = 0; + + hybrid_high_water_level_ = 0; + hybrid_critical_water_level_ = 0; + hybrid_dying_water_level_ = 0; +} + +SrsCircuitBreaker::~SrsCircuitBreaker() +{ +} + +srs_error_t SrsCircuitBreaker::initialize() +{ + srs_error_t err = srs_success; + + enabled_ = _srs_config->get_circuit_breaker(); + high_threshold_ = _srs_config->get_high_threshold(); + high_pulse_ = _srs_config->get_high_pulse(); + critical_threshold_ = _srs_config->get_critical_threshold(); + critical_pulse_ = _srs_config->get_critical_pulse(); + dying_threshold_ = _srs_config->get_dying_threshold(); + dying_pulse_ = _srs_config->get_dying_pulse(); + + // Update the water level for circuit breaker. + // @see SrsCircuitBreaker::on_timer() + _srs_hybrid->timer1s()->subscribe(this); + + srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_, + high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, + dying_pulse_, dying_threshold_); + + return err; +} + +bool SrsCircuitBreaker::hybrid_high_water_level() +{ + return enabled_ && (hybrid_critical_water_level() || hybrid_high_water_level_); +} + +bool SrsCircuitBreaker::hybrid_critical_water_level() +{ + return enabled_ && (hybrid_dying_water_level() || hybrid_critical_water_level_); +} + +bool SrsCircuitBreaker::hybrid_dying_water_level() +{ + return enabled_ && dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_; +} + +srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval) +{ + srs_error_t err = srs_success; + + // Update the CPU usage. + srs_update_proc_stat(); + SrsProcSelfStat *stat = srs_get_self_proc_stat(); + + // Reset the high water-level when CPU is low for N times. + if (stat->percent * 100 > high_threshold_) { + hybrid_high_water_level_ = high_pulse_; + } else if (hybrid_high_water_level_ > 0) { + hybrid_high_water_level_--; + } + + // Reset the critical water-level when CPU is low for N times. + if (stat->percent * 100 > critical_threshold_) { + hybrid_critical_water_level_ = critical_pulse_; + } else if (hybrid_critical_water_level_ > 0) { + hybrid_critical_water_level_--; + } + + // Reset the dying water-level when CPU is low for N times. + if (stat->percent * 100 > dying_threshold_) { + hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1); + } else if (hybrid_dying_water_level_ > 0) { + hybrid_dying_water_level_ = 0; + } + + // Show statistics for RTC server. + SrsProcSelfStat *u = srs_get_self_proc_stat(); + // Resident Set Size: number of pages the process has in real memory. + int memory = (int)(u->rss * 4 / 1024); + + // The hybrid thread cpu and memory. + float thread_percent = stat->percent * 100; + + string snk_desc; +#ifdef SRS_RTC + static char buf[128]; + if (_srs_pps_snack2->r10s()) { + snprintf(buf, sizeof(buf), ", snk=%d,%d,%d", + _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s() // NACK packet,seqs sent. + ); + snk_desc = buf; + } +#endif + + if (enabled_ && (hybrid_high_water_level() || hybrid_critical_water_level())) { + srs_trace("CircuitBreaker: cpu=%.2f%%,%dMB, break=%d,%d,%d, cond=%.2f%%%s", + u->percent * 100, memory, + hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable. + thread_percent, // The conditions to enable Circuit-Breaker. + snk_desc.c_str()); + } + + return err; +} + +ISrsCircuitBreaker *_srs_circuit_breaker = NULL; diff --git a/trunk/src/app/srs_app_circuit_breaker.hpp b/trunk/src/app/srs_app_circuit_breaker.hpp new file mode 100644 index 000000000..a7aa6634a --- /dev/null +++ b/trunk/src/app/srs_app_circuit_breaker.hpp @@ -0,0 +1,83 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_APP_CIRCUIT_BREAKER_HPP +#define SRS_APP_CIRCUIT_BREAKER_HPP + +#include + +#include + +// Interface for circuit breaker functionality to protect server in high load conditions. +// The circuit breaker monitors CPU usage and enables different levels of protection: +// - High water level: Disables some unnecessary features to reduce CPU load +// - Critical water level: Disables more features to high-level protections +// - Dying water level: Disables even normal features should be disabled to prevent server crash +class ISrsCircuitBreaker +{ +public: + ISrsCircuitBreaker(); + virtual ~ISrsCircuitBreaker(); + +public: + // Initialize the circuit breaker with configuration settings. + // @return srs_success on success, error code otherwise. + virtual srs_error_t initialize() = 0; + +public: + // Check if server is in high water level state. + // When true, some unnecessary features should be disabled to reduce CPU load. + // @return true if high water level is active, false otherwise. + virtual bool hybrid_high_water_level() = 0; + + // Check if server is in critical water level state. + // When true, more features should be disabled to reduce CPU load. + // This includes all protections from high water level. + // @return true if critical water level is active, false otherwise. + virtual bool hybrid_critical_water_level() = 0; + + // Check if server is in dying water level state. + // When true, even normal features should be disabled to prevent server crash. + // This is the most severe protection level. + // @return true if dying water level is active, false otherwise. + virtual bool hybrid_dying_water_level() = 0; +}; + +class SrsCircuitBreaker : public ISrsCircuitBreaker, public ISrsFastTimer +{ +private: + bool enabled_; + int high_threshold_; + int high_pulse_; + int critical_threshold_; + int critical_pulse_; + int dying_threshold_; + int dying_pulse_; + +private: + int hybrid_high_water_level_; + int hybrid_critical_water_level_; + int hybrid_dying_water_level_; + +public: + SrsCircuitBreaker(); + virtual ~SrsCircuitBreaker(); + +public: + srs_error_t initialize(); + +public: + bool hybrid_high_water_level(); + bool hybrid_critical_water_level(); + bool hybrid_dying_water_level(); + +private: + srs_error_t on_timer(srs_utime_t interval); +}; + +extern ISrsCircuitBreaker *_srs_circuit_breaker; + +#endif diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index c83afe2c8..4d8b56003 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -6,16 +6,48 @@ #include +#include +#include #include +#include #include +#include +#include +#include +#include +#include #include +#include #include #include #include +#include #include +#ifdef SRS_RTC +#include +#include +#endif +#ifdef SRS_SRT +#include +#endif +#ifdef SRS_GB28181 +#include +#endif +#ifdef SRS_RTSP +#include +#endif +#include + +#include +#include +#include +#include +#include using namespace std; +SrsAsyncCallWorker *_srs_dvr_async = NULL; + extern SrsPps *_srs_pps_cids_get; extern SrsPps *_srs_pps_cids_set; @@ -115,6 +147,128 @@ extern SrsPps *_srs_pps_objs_rbuf; extern SrsPps *_srs_pps_objs_msgs; extern SrsPps *_srs_pps_objs_rothers; +extern ISrsLog *_srs_log; +extern ISrsContext *_srs_context; +extern SrsConfig *_srs_config; + +extern SrsStageManager *_srs_stages; + +#ifdef SRS_RTC +extern SrsRtcBlackhole *_srs_blackhole; +extern SrsResourceManager *_srs_rtc_manager; + +extern SrsDtlsCertificate *_srs_rtc_dtls_certificate; +#endif + +SrsPps *_srs_pps_aloss2 = NULL; + +extern SrsPps *_srs_pps_ids; +extern SrsPps *_srs_pps_fids; +extern SrsPps *_srs_pps_fids_level0; +extern SrsPps *_srs_pps_dispose; + +extern SrsPps *_srs_pps_timer; + +extern SrsPps *_srs_pps_snack; +extern SrsPps *_srs_pps_snack2; +extern SrsPps *_srs_pps_snack3; +extern SrsPps *_srs_pps_snack4; +extern SrsPps *_srs_pps_sanack; +extern SrsPps *_srs_pps_svnack; + +extern SrsPps *_srs_pps_rnack; +extern SrsPps *_srs_pps_rnack2; +extern SrsPps *_srs_pps_rhnack; +extern SrsPps *_srs_pps_rmnack; + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) +extern SrsPps *_srs_pps_recvfrom; +extern SrsPps *_srs_pps_recvfrom_eagain; +extern SrsPps *_srs_pps_sendto; +extern SrsPps *_srs_pps_sendto_eagain; + +extern SrsPps *_srs_pps_read; +extern SrsPps *_srs_pps_read_eagain; +extern SrsPps *_srs_pps_readv; +extern SrsPps *_srs_pps_readv_eagain; +extern SrsPps *_srs_pps_writev; +extern SrsPps *_srs_pps_writev_eagain; + +extern SrsPps *_srs_pps_recvmsg; +extern SrsPps *_srs_pps_recvmsg_eagain; +extern SrsPps *_srs_pps_sendmsg; +extern SrsPps *_srs_pps_sendmsg_eagain; + +extern SrsPps *_srs_pps_epoll; +extern SrsPps *_srs_pps_epoll_zero; +extern SrsPps *_srs_pps_epoll_shake; +extern SrsPps *_srs_pps_epoll_spin; + +extern SrsPps *_srs_pps_sched_15ms; +extern SrsPps *_srs_pps_sched_20ms; +extern SrsPps *_srs_pps_sched_25ms; +extern SrsPps *_srs_pps_sched_30ms; +extern SrsPps *_srs_pps_sched_35ms; +extern SrsPps *_srs_pps_sched_40ms; +extern SrsPps *_srs_pps_sched_80ms; +extern SrsPps *_srs_pps_sched_160ms; +extern SrsPps *_srs_pps_sched_s; +#endif + +extern SrsPps *_srs_pps_clock_15ms; +extern SrsPps *_srs_pps_clock_20ms; +extern SrsPps *_srs_pps_clock_25ms; +extern SrsPps *_srs_pps_clock_30ms; +extern SrsPps *_srs_pps_clock_35ms; +extern SrsPps *_srs_pps_clock_40ms; +extern SrsPps *_srs_pps_clock_80ms; +extern SrsPps *_srs_pps_clock_160ms; +extern SrsPps *_srs_pps_timer_s; + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) +extern SrsPps *_srs_pps_thread_run; +extern SrsPps *_srs_pps_thread_idle; +extern SrsPps *_srs_pps_thread_yield; +extern SrsPps *_srs_pps_thread_yield2; +#endif + +extern SrsPps *_srs_pps_rpkts; +extern SrsPps *_srs_pps_addrs; +extern SrsPps *_srs_pps_fast_addrs; + +extern SrsPps *_srs_pps_spkts; + +extern SrsPps *_srs_pps_sstuns; +extern SrsPps *_srs_pps_srtcps; +extern SrsPps *_srs_pps_srtps; + +extern SrsPps *_srs_pps_pli; +extern SrsPps *_srs_pps_twcc; +extern SrsPps *_srs_pps_rr; +extern SrsPps *_srs_pps_pub; +extern SrsPps *_srs_pps_conn; + +extern SrsPps *_srs_pps_rstuns; +extern SrsPps *_srs_pps_rrtps; +extern SrsPps *_srs_pps_rrtcps; + +extern SrsPps *_srs_pps_aloss2; + +extern SrsPps *_srs_pps_cids_get; +extern SrsPps *_srs_pps_cids_set; + +extern SrsPps *_srs_pps_objs_msgs; + +extern SrsPps *_srs_pps_objs_rtps; +extern SrsPps *_srs_pps_objs_rraw; +extern SrsPps *_srs_pps_objs_rfua; +extern SrsPps *_srs_pps_objs_rbuf; +extern SrsPps *_srs_pps_objs_rothers; + +extern srs_error_t _srs_reload_err; +extern SrsReloadState _srs_reload_state; +extern std::string _srs_reload_id; + ISrsHybridServer::ISrsHybridServer() { } @@ -132,6 +286,8 @@ SrsHybridServer::SrsHybridServer() timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS); clock_monitor_ = new SrsClockWallMonitor(); + + pid_fd = -1; } SrsHybridServer::~SrsHybridServer() @@ -150,6 +306,11 @@ SrsHybridServer::~SrsHybridServer() srs_freep(timer100ms_); srs_freep(timer1s_); srs_freep(timer5s_); + + if (pid_fd > 0) { + ::close(pid_fd); + pid_fd = -1; + } } void SrsHybridServer::register_server(ISrsHybridServer *svr) @@ -161,6 +322,12 @@ srs_error_t SrsHybridServer::initialize() { srs_error_t err = srs_success; + if ((err = acquire_pid_file()) != srs_success) { + return srs_error_wrap(err, "acquire pid file"); + } + + srs_trace("Hybrid server initialized in single thread mode"); + // Start the timer first. if ((err = timer20ms_->start()) != srs_success) { return srs_error_wrap(err, "start timer"); @@ -440,4 +607,229 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval) return err; } +srs_error_t SrsHybridServer::acquire_pid_file() +{ + std::string pid_file = _srs_config->get_pid_file(); + + // -rw-r--r-- + // 644 + int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + int fd; + // open pid file + if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) { + return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str()); + } + + // require write lock + struct flock lock; + + lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK + lock.l_start = 0; // type offset, relative to l_whence + lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END + lock.l_len = 0; + + if (fcntl(fd, F_SETLK, &lock) == -1) { + if (errno == EACCES || errno == EAGAIN) { + ::close(fd); + srs_error("srs is already running!"); + return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running"); + } + return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str()); + } + + // truncate file + if (ftruncate(fd, 0) != 0) { + return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str()); + } + + // write the pid + string pid = srs_int2str(getpid()); + if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) { + return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str()); + } + + // auto close when fork child process. + int val; + if ((val = fcntl(fd, F_GETFD, 0)) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd); + } + val |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, val) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd); + } + + srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str()); + pid_fd = fd; + + return srs_success; +} + +srs_error_t srs_global_initialize() +{ + srs_error_t err = srs_success; + + // Root global objects. + _srs_log = new SrsFileLog(); + _srs_context = new SrsThreadContext(); + _srs_config = new SrsConfig(); + + // The clock wall object. + _srs_clock = new SrsWallClock(); + + // The pps cids depends by st init. + _srs_pps_cids_get = new SrsPps(); + _srs_pps_cids_set = new SrsPps(); + + // Initialize ST, which depends on pps cids. + if ((err = srs_st_init()) != srs_success) { + return srs_error_wrap(err, "initialize st failed"); + } + + // The global objects which depends on ST. + _srs_hybrid = new SrsHybridServer(); + _srs_sources = new SrsLiveSourceManager(); + _srs_stages = new SrsStageManager(); + _srs_circuit_breaker = new SrsCircuitBreaker(); + _srs_hooks = new SrsHttpHooks(); + +#ifdef SRS_SRT + _srs_srt_sources = new SrsSrtSourceManager(); +#endif + +#ifdef SRS_RTC + _srs_rtc_sources = new SrsRtcSourceManager(); + _srs_blackhole = new SrsRtcBlackhole(); + + _srs_rtc_manager = new SrsResourceManager("RTC", true); + _srs_rtc_dtls_certificate = new SrsDtlsCertificate(); +#endif +#ifdef SRS_RTSP + _srs_rtsp_sources = new SrsRtspSourceManager(); + _srs_rtsp_manager = new SrsResourceManager("RTSP", true); +#endif +#ifdef SRS_GB28181 + _srs_gb_manager = new SrsResourceManager("GB", true); +#endif + + // Initialize global pps, which depends on _srs_clock + _srs_pps_ids = new SrsPps(); + _srs_pps_fids = new SrsPps(); + _srs_pps_fids_level0 = new SrsPps(); + _srs_pps_dispose = new SrsPps(); + + _srs_pps_timer = new SrsPps(); + _srs_pps_conn = new SrsPps(); + _srs_pps_pub = new SrsPps(); + +#ifdef SRS_RTC + _srs_pps_snack = new SrsPps(); + _srs_pps_snack2 = new SrsPps(); + _srs_pps_snack3 = new SrsPps(); + _srs_pps_snack4 = new SrsPps(); + _srs_pps_sanack = new SrsPps(); + _srs_pps_svnack = new SrsPps(); + + _srs_pps_rnack = new SrsPps(); + _srs_pps_rnack2 = new SrsPps(); + _srs_pps_rhnack = new SrsPps(); + _srs_pps_rmnack = new SrsPps(); +#endif + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_recvfrom = new SrsPps(); + _srs_pps_recvfrom_eagain = new SrsPps(); + _srs_pps_sendto = new SrsPps(); + _srs_pps_sendto_eagain = new SrsPps(); + + _srs_pps_read = new SrsPps(); + _srs_pps_read_eagain = new SrsPps(); + _srs_pps_readv = new SrsPps(); + _srs_pps_readv_eagain = new SrsPps(); + _srs_pps_writev = new SrsPps(); + _srs_pps_writev_eagain = new SrsPps(); + + _srs_pps_recvmsg = new SrsPps(); + _srs_pps_recvmsg_eagain = new SrsPps(); + _srs_pps_sendmsg = new SrsPps(); + _srs_pps_sendmsg_eagain = new SrsPps(); + + _srs_pps_epoll = new SrsPps(); + _srs_pps_epoll_zero = new SrsPps(); + _srs_pps_epoll_shake = new SrsPps(); + _srs_pps_epoll_spin = new SrsPps(); + + _srs_pps_sched_15ms = new SrsPps(); + _srs_pps_sched_20ms = new SrsPps(); + _srs_pps_sched_25ms = new SrsPps(); + _srs_pps_sched_30ms = new SrsPps(); + _srs_pps_sched_35ms = new SrsPps(); + _srs_pps_sched_40ms = new SrsPps(); + _srs_pps_sched_80ms = new SrsPps(); + _srs_pps_sched_160ms = new SrsPps(); + _srs_pps_sched_s = new SrsPps(); +#endif + + _srs_pps_clock_15ms = new SrsPps(); + _srs_pps_clock_20ms = new SrsPps(); + _srs_pps_clock_25ms = new SrsPps(); + _srs_pps_clock_30ms = new SrsPps(); + _srs_pps_clock_35ms = new SrsPps(); + _srs_pps_clock_40ms = new SrsPps(); + _srs_pps_clock_80ms = new SrsPps(); + _srs_pps_clock_160ms = new SrsPps(); + _srs_pps_timer_s = new SrsPps(); + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_thread_run = new SrsPps(); + _srs_pps_thread_idle = new SrsPps(); + _srs_pps_thread_yield = new SrsPps(); + _srs_pps_thread_yield2 = new SrsPps(); +#endif + + _srs_pps_rpkts = new SrsPps(); + _srs_pps_addrs = new SrsPps(); + _srs_pps_fast_addrs = new SrsPps(); + + _srs_pps_spkts = new SrsPps(); + _srs_pps_objs_msgs = new SrsPps(); + +#ifdef SRS_RTC + _srs_pps_sstuns = new SrsPps(); + _srs_pps_srtcps = new SrsPps(); + _srs_pps_srtps = new SrsPps(); + + _srs_pps_rstuns = new SrsPps(); + _srs_pps_rrtps = new SrsPps(); + _srs_pps_rrtcps = new SrsPps(); + + _srs_pps_aloss2 = new SrsPps(); + + _srs_pps_pli = new SrsPps(); + _srs_pps_twcc = new SrsPps(); + _srs_pps_rr = new SrsPps(); + + _srs_pps_objs_rtps = new SrsPps(); + _srs_pps_objs_rraw = new SrsPps(); + _srs_pps_objs_rfua = new SrsPps(); + _srs_pps_objs_rbuf = new SrsPps(); + _srs_pps_objs_rothers = new SrsPps(); +#endif + + // Create global async worker for DVR. + _srs_dvr_async = new SrsAsyncCallWorker(); + +#ifdef SRS_APM + // Initialize global TencentCloud CLS object. + _srs_cls = new SrsClsClient(); + _srs_apm = new SrsApmClient(); +#endif + + _srs_reload_err = srs_success; + _srs_reload_state = SrsReloadStateInit; + _srs_reload_id = srs_random_str(7); + + return err; +} + SrsHybridServer *_srs_hybrid = NULL; diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index b04011243..7ba8b0b95 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -17,6 +17,9 @@ class SrsServer; class SrsServerAdapter; class SrsWaitGroup; +// Initialize global shared variables cross all threads. +extern srs_error_t srs_global_initialize(); + // The hibrid server interfaces, we could register many servers. class ISrsHybridServer { @@ -44,6 +47,13 @@ private: SrsFastTimer *timer5s_; SrsClockWallMonitor *clock_monitor_; +private: + // The pid file fd, lock the file write when server is running. + // @remark the init.d script should cleanup the pid file, when stop service, + // for the server never delete the file; when system startup, the pid in pid file + // maybe valid but the process is not SRS, the init.d script will never start server. + int pid_fd; + public: SrsHybridServer(); virtual ~SrsHybridServer(); @@ -65,6 +75,10 @@ public: // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); + +private: + // Require the PID file for the whole process. + virtual srs_error_t acquire_pid_file(); }; extern SrsHybridServer *_srs_hybrid; diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index dedd37907..9c768c2ad 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include #include @@ -36,8 +35,6 @@ SrsFileLog::SrsFileLog() fd = -1; log_to_file_tank = false; utc = false; - - mutex_ = new SrsThreadMutex(); } SrsFileLog::~SrsFileLog() @@ -52,8 +49,6 @@ SrsFileLog::~SrsFileLog() if (_srs_config) { _srs_config->unsubscribe(this); } - - srs_freep(mutex_); } srs_error_t SrsFileLog::initialize() @@ -91,8 +86,6 @@ void SrsFileLog::log(SrsLogLevel level, const char *tag, const SrsContextId &con return; } - SrsThreadLocker(mutex_); - int size = 0; bool header_ok = srs_log_header( log_data, LOG_MAX_SIZE, utc, level >= SrsLogLevelWarn, tag, context_id, srs_log_level_strings[level], &size); diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index e08b69545..14b4ea824 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -15,8 +15,6 @@ #include #include -class SrsThreadMutex; - // For log TAGs. #define TAG_MAIN "MAIN" #define TAG_MAYBE "MAYBE" @@ -42,9 +40,6 @@ private: bool log_to_file_tank; // Whether use utc time. bool utc; - // TODO: FIXME: use macro define like SRS_MULTI_THREAD_LOG to switch enable log mutex or not. - // Mutex for multithread log. - SrsThreadMutex *mutex_; public: SrsFileLog(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index a53fcef66..6703e136d 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -19,6 +19,7 @@ using namespace std; #include #include +#include #include #include #include @@ -32,7 +33,6 @@ using namespace std; #include #include #include -#include #include #include #include diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index 1d2d1311a..7f543e96a 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -12,7 +12,7 @@ using namespace std; -#include +#include #include #include #include diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index dcd2c6690..4b7ddb9c0 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -742,17 +742,17 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval) return err; } -RtcServerAdapter::RtcServerAdapter() +SrsRtcServerAdapter::SrsRtcServerAdapter() { rtc = new SrsRtcServer(); } -RtcServerAdapter::~RtcServerAdapter() +SrsRtcServerAdapter::~SrsRtcServerAdapter() { srs_freep(rtc); } -srs_error_t RtcServerAdapter::initialize() +srs_error_t SrsRtcServerAdapter::initialize() { srs_error_t err = srs_success; @@ -767,7 +767,7 @@ srs_error_t RtcServerAdapter::initialize() return err; } -srs_error_t RtcServerAdapter::run(SrsWaitGroup *wg) +srs_error_t SrsRtcServerAdapter::run(SrsWaitGroup *wg) { srs_error_t err = srs_success; @@ -786,7 +786,7 @@ srs_error_t RtcServerAdapter::run(SrsWaitGroup *wg) return err; } -void RtcServerAdapter::stop() +void SrsRtcServerAdapter::stop() { } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index ebbf4cbe4..1eda54e14 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -124,14 +124,14 @@ private: }; // The RTC server adapter. -class RtcServerAdapter : public ISrsHybridServer +class SrsRtcServerAdapter : public ISrsHybridServer { private: SrsRtcServer *rtc; public: - RtcServerAdapter(); - virtual ~RtcServerAdapter(); + SrsRtcServerAdapter(); + virtual ~SrsRtcServerAdapter(); public: virtual srs_error_t initialize(); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 0ad474d5c..60cf9335b 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -17,7 +18,6 @@ #include #include #include -#include #include #include #include diff --git a/trunk/src/app/srs_app_rtsp_conn.cpp b/trunk/src/app/srs_app_rtsp_conn.cpp index 35309b1c1..b91c30c25 100644 --- a/trunk/src/app/srs_app_rtsp_conn.cpp +++ b/trunk/src/app/srs_app_rtsp_conn.cpp @@ -18,7 +18,6 @@ using namespace std; #include #include #include -#include #include #include #include diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index 8fa4f8b81..b1e16319a 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -6,13 +6,13 @@ #include +#include #include #include #include #include #include #include -#include #include #include #include diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp deleted file mode 100644 index 32c53cd5f..000000000 --- a/trunk/src/app/srs_app_threads.cpp +++ /dev/null @@ -1,815 +0,0 @@ -// -// Copyright (c) 2013-2025 The SRS Authors -// -// SPDX-License-Identifier: MIT -// - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#ifdef SRS_RTC -#include -#include -#endif -#ifdef SRS_SRT -#include -#endif -#ifdef SRS_GB28181 -#include -#endif -#ifdef SRS_RTSP -#include -#endif - -#include -#include -using namespace std; - -#include -#include - -#if defined(SRS_OSX) || defined(SRS_CYGWIN64) -pid_t gettid() -{ - return 0; -} -#else -#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30 -#include -#define gettid() syscall(SYS_gettid) -#endif -#endif - -// These functions first appeared in glibc in version 2.12. -// See https://man7.org/linux/man-pages/man3/pthread_setname_np.3.html -#if defined(SRS_CYGWIN64) || (defined(SRS_CROSSBUILD) && defined(__GLIBC__) && ((__GLIBC__ < 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ < 12))) -void pthread_setname_np(pthread_t trd, const char *name) -{ -} -#endif - -extern ISrsLog *_srs_log; -extern ISrsContext *_srs_context; -extern SrsConfig *_srs_config; - -extern SrsStageManager *_srs_stages; - -#ifdef SRS_RTC -extern SrsRtcBlackhole *_srs_blackhole; -extern SrsResourceManager *_srs_rtc_manager; - -extern SrsDtlsCertificate *_srs_rtc_dtls_certificate; -#endif - -#include - -SrsPps *_srs_pps_aloss2 = NULL; - -extern SrsPps *_srs_pps_ids; -extern SrsPps *_srs_pps_fids; -extern SrsPps *_srs_pps_fids_level0; -extern SrsPps *_srs_pps_dispose; - -extern SrsPps *_srs_pps_timer; - -extern SrsPps *_srs_pps_snack; -extern SrsPps *_srs_pps_snack2; -extern SrsPps *_srs_pps_snack3; -extern SrsPps *_srs_pps_snack4; -extern SrsPps *_srs_pps_sanack; -extern SrsPps *_srs_pps_svnack; - -extern SrsPps *_srs_pps_rnack; -extern SrsPps *_srs_pps_rnack2; -extern SrsPps *_srs_pps_rhnack; -extern SrsPps *_srs_pps_rmnack; - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) -extern SrsPps *_srs_pps_recvfrom; -extern SrsPps *_srs_pps_recvfrom_eagain; -extern SrsPps *_srs_pps_sendto; -extern SrsPps *_srs_pps_sendto_eagain; - -extern SrsPps *_srs_pps_read; -extern SrsPps *_srs_pps_read_eagain; -extern SrsPps *_srs_pps_readv; -extern SrsPps *_srs_pps_readv_eagain; -extern SrsPps *_srs_pps_writev; -extern SrsPps *_srs_pps_writev_eagain; - -extern SrsPps *_srs_pps_recvmsg; -extern SrsPps *_srs_pps_recvmsg_eagain; -extern SrsPps *_srs_pps_sendmsg; -extern SrsPps *_srs_pps_sendmsg_eagain; - -extern SrsPps *_srs_pps_epoll; -extern SrsPps *_srs_pps_epoll_zero; -extern SrsPps *_srs_pps_epoll_shake; -extern SrsPps *_srs_pps_epoll_spin; - -extern SrsPps *_srs_pps_sched_15ms; -extern SrsPps *_srs_pps_sched_20ms; -extern SrsPps *_srs_pps_sched_25ms; -extern SrsPps *_srs_pps_sched_30ms; -extern SrsPps *_srs_pps_sched_35ms; -extern SrsPps *_srs_pps_sched_40ms; -extern SrsPps *_srs_pps_sched_80ms; -extern SrsPps *_srs_pps_sched_160ms; -extern SrsPps *_srs_pps_sched_s; -#endif - -extern SrsPps *_srs_pps_clock_15ms; -extern SrsPps *_srs_pps_clock_20ms; -extern SrsPps *_srs_pps_clock_25ms; -extern SrsPps *_srs_pps_clock_30ms; -extern SrsPps *_srs_pps_clock_35ms; -extern SrsPps *_srs_pps_clock_40ms; -extern SrsPps *_srs_pps_clock_80ms; -extern SrsPps *_srs_pps_clock_160ms; -extern SrsPps *_srs_pps_timer_s; - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) -extern SrsPps *_srs_pps_thread_run; -extern SrsPps *_srs_pps_thread_idle; -extern SrsPps *_srs_pps_thread_yield; -extern SrsPps *_srs_pps_thread_yield2; -#endif - -extern SrsPps *_srs_pps_rpkts; -extern SrsPps *_srs_pps_addrs; -extern SrsPps *_srs_pps_fast_addrs; - -extern SrsPps *_srs_pps_spkts; - -extern SrsPps *_srs_pps_sstuns; -extern SrsPps *_srs_pps_srtcps; -extern SrsPps *_srs_pps_srtps; - -extern SrsPps *_srs_pps_pli; -extern SrsPps *_srs_pps_twcc; -extern SrsPps *_srs_pps_rr; -extern SrsPps *_srs_pps_pub; -extern SrsPps *_srs_pps_conn; - -extern SrsPps *_srs_pps_rstuns; -extern SrsPps *_srs_pps_rrtps; -extern SrsPps *_srs_pps_rrtcps; - -extern SrsPps *_srs_pps_aloss2; - -extern SrsPps *_srs_pps_cids_get; -extern SrsPps *_srs_pps_cids_set; - -extern SrsPps *_srs_pps_objs_msgs; - -extern SrsPps *_srs_pps_objs_rtps; -extern SrsPps *_srs_pps_objs_rraw; -extern SrsPps *_srs_pps_objs_rfua; -extern SrsPps *_srs_pps_objs_rbuf; -extern SrsPps *_srs_pps_objs_rothers; - -SrsCircuitBreaker::SrsCircuitBreaker() -{ - enabled_ = false; - high_threshold_ = 0; - high_pulse_ = 0; - critical_threshold_ = 0; - critical_pulse_ = 0; - dying_threshold_ = 0; - dying_pulse_ = 0; - - hybrid_high_water_level_ = 0; - hybrid_critical_water_level_ = 0; - hybrid_dying_water_level_ = 0; -} - -SrsCircuitBreaker::~SrsCircuitBreaker() -{ -} - -srs_error_t SrsCircuitBreaker::initialize() -{ - srs_error_t err = srs_success; - - enabled_ = _srs_config->get_circuit_breaker(); - high_threshold_ = _srs_config->get_high_threshold(); - high_pulse_ = _srs_config->get_high_pulse(); - critical_threshold_ = _srs_config->get_critical_threshold(); - critical_pulse_ = _srs_config->get_critical_pulse(); - dying_threshold_ = _srs_config->get_dying_threshold(); - dying_pulse_ = _srs_config->get_dying_pulse(); - - // Update the water level for circuit breaker. - // @see SrsCircuitBreaker::on_timer() - _srs_hybrid->timer1s()->subscribe(this); - - srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_, - high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, - dying_pulse_, dying_threshold_); - - return err; -} - -bool SrsCircuitBreaker::hybrid_high_water_level() -{ - return enabled_ && (hybrid_critical_water_level() || hybrid_high_water_level_); -} - -bool SrsCircuitBreaker::hybrid_critical_water_level() -{ - return enabled_ && (hybrid_dying_water_level() || hybrid_critical_water_level_); -} - -bool SrsCircuitBreaker::hybrid_dying_water_level() -{ - return enabled_ && dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_; -} - -srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval) -{ - srs_error_t err = srs_success; - - // Update the CPU usage. - srs_update_proc_stat(); - SrsProcSelfStat *stat = srs_get_self_proc_stat(); - - // Reset the high water-level when CPU is low for N times. - if (stat->percent * 100 > high_threshold_) { - hybrid_high_water_level_ = high_pulse_; - } else if (hybrid_high_water_level_ > 0) { - hybrid_high_water_level_--; - } - - // Reset the critical water-level when CPU is low for N times. - if (stat->percent * 100 > critical_threshold_) { - hybrid_critical_water_level_ = critical_pulse_; - } else if (hybrid_critical_water_level_ > 0) { - hybrid_critical_water_level_--; - } - - // Reset the dying water-level when CPU is low for N times. - if (stat->percent * 100 > dying_threshold_) { - hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1); - } else if (hybrid_dying_water_level_ > 0) { - hybrid_dying_water_level_ = 0; - } - - // Show statistics for RTC server. - SrsProcSelfStat *u = srs_get_self_proc_stat(); - // Resident Set Size: number of pages the process has in real memory. - int memory = (int)(u->rss * 4 / 1024); - - // The hybrid thread cpu and memory. - float thread_percent = stat->percent * 100; - - string snk_desc; -#ifdef SRS_RTC - static char buf[128]; - if (_srs_pps_snack2->r10s()) { - snprintf(buf, sizeof(buf), ", snk=%d,%d,%d", - _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s() // NACK packet,seqs sent. - ); - snk_desc = buf; - } -#endif - - if (enabled_ && (hybrid_high_water_level() || hybrid_critical_water_level())) { - srs_trace("CircuitBreaker: cpu=%.2f%%,%dMB, break=%d,%d,%d, cond=%.2f%%%s", - u->percent * 100, memory, - hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable. - thread_percent, // The conditions to enable Circuit-Breaker. - snk_desc.c_str()); - } - - return err; -} - -SrsCircuitBreaker *_srs_circuit_breaker = NULL; -SrsAsyncCallWorker *_srs_dvr_async = NULL; - -extern srs_error_t _srs_reload_err; -extern SrsReloadState _srs_reload_state; -extern std::string _srs_reload_id; - -srs_error_t srs_global_initialize() -{ - srs_error_t err = srs_success; - - // Root global objects. - _srs_log = new SrsFileLog(); - _srs_context = new SrsThreadContext(); - _srs_config = new SrsConfig(); - - // The clock wall object. - _srs_clock = new SrsWallClock(); - - // The pps cids depends by st init. - _srs_pps_cids_get = new SrsPps(); - _srs_pps_cids_set = new SrsPps(); - - // The global objects which depends on ST. - _srs_hybrid = new SrsHybridServer(); - _srs_sources = new SrsLiveSourceManager(); - _srs_stages = new SrsStageManager(); - _srs_circuit_breaker = new SrsCircuitBreaker(); - _srs_hooks = new SrsHttpHooks(); - -#ifdef SRS_SRT - _srs_srt_sources = new SrsSrtSourceManager(); -#endif - -#ifdef SRS_RTC - _srs_rtc_sources = new SrsRtcSourceManager(); - _srs_blackhole = new SrsRtcBlackhole(); - - _srs_rtc_manager = new SrsResourceManager("RTC", true); - _srs_rtc_dtls_certificate = new SrsDtlsCertificate(); -#endif -#ifdef SRS_RTSP - _srs_rtsp_sources = new SrsRtspSourceManager(); - _srs_rtsp_manager = new SrsResourceManager("RTSP", true); -#endif -#ifdef SRS_GB28181 - _srs_gb_manager = new SrsResourceManager("GB", true); -#endif - - // Initialize global pps, which depends on _srs_clock - _srs_pps_ids = new SrsPps(); - _srs_pps_fids = new SrsPps(); - _srs_pps_fids_level0 = new SrsPps(); - _srs_pps_dispose = new SrsPps(); - - _srs_pps_timer = new SrsPps(); - _srs_pps_conn = new SrsPps(); - _srs_pps_pub = new SrsPps(); - -#ifdef SRS_RTC - _srs_pps_snack = new SrsPps(); - _srs_pps_snack2 = new SrsPps(); - _srs_pps_snack3 = new SrsPps(); - _srs_pps_snack4 = new SrsPps(); - _srs_pps_sanack = new SrsPps(); - _srs_pps_svnack = new SrsPps(); - - _srs_pps_rnack = new SrsPps(); - _srs_pps_rnack2 = new SrsPps(); - _srs_pps_rhnack = new SrsPps(); - _srs_pps_rmnack = new SrsPps(); -#endif - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_recvfrom = new SrsPps(); - _srs_pps_recvfrom_eagain = new SrsPps(); - _srs_pps_sendto = new SrsPps(); - _srs_pps_sendto_eagain = new SrsPps(); - - _srs_pps_read = new SrsPps(); - _srs_pps_read_eagain = new SrsPps(); - _srs_pps_readv = new SrsPps(); - _srs_pps_readv_eagain = new SrsPps(); - _srs_pps_writev = new SrsPps(); - _srs_pps_writev_eagain = new SrsPps(); - - _srs_pps_recvmsg = new SrsPps(); - _srs_pps_recvmsg_eagain = new SrsPps(); - _srs_pps_sendmsg = new SrsPps(); - _srs_pps_sendmsg_eagain = new SrsPps(); - - _srs_pps_epoll = new SrsPps(); - _srs_pps_epoll_zero = new SrsPps(); - _srs_pps_epoll_shake = new SrsPps(); - _srs_pps_epoll_spin = new SrsPps(); - - _srs_pps_sched_15ms = new SrsPps(); - _srs_pps_sched_20ms = new SrsPps(); - _srs_pps_sched_25ms = new SrsPps(); - _srs_pps_sched_30ms = new SrsPps(); - _srs_pps_sched_35ms = new SrsPps(); - _srs_pps_sched_40ms = new SrsPps(); - _srs_pps_sched_80ms = new SrsPps(); - _srs_pps_sched_160ms = new SrsPps(); - _srs_pps_sched_s = new SrsPps(); -#endif - - _srs_pps_clock_15ms = new SrsPps(); - _srs_pps_clock_20ms = new SrsPps(); - _srs_pps_clock_25ms = new SrsPps(); - _srs_pps_clock_30ms = new SrsPps(); - _srs_pps_clock_35ms = new SrsPps(); - _srs_pps_clock_40ms = new SrsPps(); - _srs_pps_clock_80ms = new SrsPps(); - _srs_pps_clock_160ms = new SrsPps(); - _srs_pps_timer_s = new SrsPps(); - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_thread_run = new SrsPps(); - _srs_pps_thread_idle = new SrsPps(); - _srs_pps_thread_yield = new SrsPps(); - _srs_pps_thread_yield2 = new SrsPps(); -#endif - - _srs_pps_rpkts = new SrsPps(); - _srs_pps_addrs = new SrsPps(); - _srs_pps_fast_addrs = new SrsPps(); - - _srs_pps_spkts = new SrsPps(); - _srs_pps_objs_msgs = new SrsPps(); - -#ifdef SRS_RTC - _srs_pps_sstuns = new SrsPps(); - _srs_pps_srtcps = new SrsPps(); - _srs_pps_srtps = new SrsPps(); - - _srs_pps_rstuns = new SrsPps(); - _srs_pps_rrtps = new SrsPps(); - _srs_pps_rrtcps = new SrsPps(); - - _srs_pps_aloss2 = new SrsPps(); - - _srs_pps_pli = new SrsPps(); - _srs_pps_twcc = new SrsPps(); - _srs_pps_rr = new SrsPps(); - - _srs_pps_objs_rtps = new SrsPps(); - _srs_pps_objs_rraw = new SrsPps(); - _srs_pps_objs_rfua = new SrsPps(); - _srs_pps_objs_rbuf = new SrsPps(); - _srs_pps_objs_rothers = new SrsPps(); -#endif - - // Create global async worker for DVR. - _srs_dvr_async = new SrsAsyncCallWorker(); - -#ifdef SRS_APM - // Initialize global TencentCloud CLS object. - _srs_cls = new SrsClsClient(); - _srs_apm = new SrsApmClient(); -#endif - - _srs_reload_err = srs_success; - _srs_reload_state = SrsReloadStateInit; - _srs_reload_id = srs_random_str(7); - - return err; -} - -SrsThreadMutex::SrsThreadMutex() -{ - // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html - int r0 = pthread_mutexattr_init(&attr_); - srs_assert(!r0); - - // https://man7.org/linux/man-pages/man3/pthread_mutexattr_gettype.3p.html - r0 = pthread_mutexattr_settype(&attr_, PTHREAD_MUTEX_ERRORCHECK); - srs_assert(!r0); - - // https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html - r0 = pthread_mutex_init(&lock_, &attr_); - srs_assert(!r0); -} - -SrsThreadMutex::~SrsThreadMutex() -{ - int r0 = pthread_mutex_destroy(&lock_); - srs_assert(!r0); - - r0 = pthread_mutexattr_destroy(&attr_); - srs_assert(!r0); -} - -void SrsThreadMutex::lock() -{ - // https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html - // EDEADLK - // The mutex type is PTHREAD_MUTEX_ERRORCHECK and the current - // thread already owns the mutex. - int r0 = pthread_mutex_lock(&lock_); - srs_assert(!r0); -} - -void SrsThreadMutex::unlock() -{ - int r0 = pthread_mutex_unlock(&lock_); - srs_assert(!r0); -} - -SrsThreadEntry::SrsThreadEntry() -{ - pool = NULL; - start = NULL; - arg = NULL; - num = 0; - tid = 0; - - err = srs_success; -} - -SrsThreadEntry::~SrsThreadEntry() -{ - srs_freep(err); - - // TODO: FIXME: Should dispose trd. -} - -SrsThreadPool::SrsThreadPool() -{ - entry_ = NULL; - lock_ = new SrsThreadMutex(); - hybrid_ = NULL; - - // Add primordial thread, current thread itself. - SrsThreadEntry *entry = new SrsThreadEntry(); - threads_.push_back(entry); - entry_ = entry; - - entry->pool = this; - entry->label = "primordial"; - entry->start = NULL; - entry->arg = NULL; - entry->num = 1; - entry->trd = pthread_self(); - entry->tid = gettid(); - - char buf[256]; - snprintf(buf, sizeof(buf), "srs-master-%d", entry->num); - entry->name = buf; - - pid_fd = -1; -} - -// TODO: FIMXE: If free the pool, we should stop all threads. -SrsThreadPool::~SrsThreadPool() -{ - srs_freep(lock_); - - if (pid_fd > 0) { - ::close(pid_fd); - pid_fd = -1; - } -} - -// Setup the thread-local variables, MUST call when each thread starting. -srs_error_t SrsThreadPool::setup_thread_locals() -{ - srs_error_t err = srs_success; - - // Initialize ST, which depends on pps cids. - if ((err = srs_st_init()) != srs_success) { - return srs_error_wrap(err, "initialize st failed"); - } - - return err; -} - -srs_error_t SrsThreadPool::initialize() -{ - srs_error_t err = srs_success; - - if ((err = acquire_pid_file()) != srs_success) { - return srs_error_wrap(err, "acquire pid file"); - } - - // Initialize the master primordial thread. - SrsThreadEntry *entry = (SrsThreadEntry *)entry_; - - interval_ = _srs_config->get_threads_interval(); - - srs_trace("Thread #%d(%s): init name=%s, interval=%dms", entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_)); - - return err; -} - -srs_error_t SrsThreadPool::acquire_pid_file() -{ - std::string pid_file = _srs_config->get_pid_file(); - - // -rw-r--r-- - // 644 - int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - - int fd; - // open pid file - if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) { - return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str()); - } - - // require write lock - struct flock lock; - - lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK - lock.l_start = 0; // type offset, relative to l_whence - lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END - lock.l_len = 0; - - if (fcntl(fd, F_SETLK, &lock) == -1) { - if (errno == EACCES || errno == EAGAIN) { - ::close(fd); - srs_error("srs is already running!"); - return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running"); - } - return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str()); - } - - // truncate file - if (ftruncate(fd, 0) != 0) { - return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str()); - } - - // write the pid - string pid = srs_int2str(getpid()); - if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) { - return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str()); - } - - // auto close when fork child process. - int val; - if ((val = fcntl(fd, F_GETFD, 0)) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd); - } - val |= FD_CLOEXEC; - if (fcntl(fd, F_SETFD, val) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd); - } - - srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str()); - pid_fd = fd; - - return srs_success; -} - -srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void *arg), void *arg) -{ - srs_error_t err = srs_success; - - SrsThreadEntry *entry = new SrsThreadEntry(); - - // Update the hybrid thread entry for circuit breaker. - if (label == "hybrid") { - hybrid_ = entry; - hybrids_.push_back(entry); - } - - // To protect the threads_ for executing thread-safe. - if (true) { - SrsThreadLocker(lock_); - threads_.push_back(entry); - } - - entry->pool = this; - entry->label = label; - entry->start = start; - entry->arg = arg; - - // The id of thread, should equal to the debugger thread id. - // For gdb, it's: info threads - // For lldb, it's: thread list - static int num = entry_->num + 1; - entry->num = num++; - - char buf[256]; - snprintf(buf, sizeof(buf), "srs-%s-%d", entry->label.c_str(), entry->num); - entry->name = buf; - - // https://man7.org/linux/man-pages/man3/pthread_create.3.html - pthread_t trd; - int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry); - if (r0 != 0) { - entry->err = srs_error_new(ERROR_THREAD_CREATE, "create thread %s, r0=%d", label.c_str(), r0); - return srs_error_copy(entry->err); - } - - entry->trd = trd; - - return err; -} - -srs_error_t SrsThreadPool::run() -{ - srs_error_t err = srs_success; - - while (true) { - vector threads; - if (true) { - SrsThreadLocker(lock_); - threads = threads_; - } - - // Check the threads status fastly. - int loops = (int)(interval_ / SRS_UTIME_SECONDS); - for (int i = 0; i < loops; i++) { - for (int j = 0; j < (int)threads.size(); j++) { - SrsThreadEntry *entry = threads.at(j); - if (entry->err != srs_success) { - // Quit with success. - if (srs_error_code(entry->err) == ERROR_THREAD_FINISHED) { - srs_trace("quit for thread #%d(%s) finished", entry->num, entry->label.c_str()); - srs_freep(err); - return srs_success; - } - - // Quit with specified error. - err = srs_error_copy(entry->err); - err = srs_error_wrap(err, "thread #%d(%s)", entry->num, entry->label.c_str()); - return err; - } - } - - srs_usleep(1 * SRS_UTIME_SECONDS); - } - - // Show statistics for RTC server. - SrsProcSelfStat *u = srs_get_self_proc_stat(); - // Resident Set Size: number of pages the process has in real memory. - int memory = (int)(u->rss * 4 / 1024); - - srs_trace("Process: cpu=%.2f%%,%dMB, threads=%d", u->percent * 100, memory, (int)threads_.size()); - } - - return err; -} - -void SrsThreadPool::stop() -{ - // TODO: FIXME: Should notify other threads to do cleanup and quit. -} - -SrsThreadEntry *SrsThreadPool::self() -{ - std::vector threads; - - if (true) { - SrsThreadLocker(lock_); - threads = threads_; - } - - for (int i = 0; i < (int)threads.size(); i++) { - SrsThreadEntry *entry = threads.at(i); - if (entry->trd == pthread_self()) { - return entry; - } - } - - return NULL; -} - -SrsThreadEntry *SrsThreadPool::hybrid() -{ - return hybrid_; -} - -vector SrsThreadPool::hybrids() -{ - return hybrids_; -} - -void *SrsThreadPool::start(void *arg) -{ - srs_error_t err = srs_success; - - SrsThreadEntry *entry = (SrsThreadEntry *)arg; - - // Initialize thread-local variables. - if ((err = SrsThreadPool::setup_thread_locals()) != srs_success) { - entry->err = err; - return NULL; - } - - // Set the thread local fields. - entry->tid = gettid(); - -#ifndef SRS_OSX - // https://man7.org/linux/man-pages/man3/pthread_setname_np.3.html - pthread_setname_np(pthread_self(), entry->name.c_str()); -#else - pthread_setname_np(entry->name.c_str()); -#endif - - srs_trace("Thread #%d: run with tid=%d, entry=%p, label=%s, name=%s", entry->num, (int)entry->tid, entry, entry->label.c_str(), entry->name.c_str()); - - if ((err = entry->start(entry->arg)) != srs_success) { - entry->err = err; - } - - // We use a special error to indicates the normally done. - if (entry->err == srs_success) { - entry->err = srs_error_new(ERROR_THREAD_FINISHED, "finished normally"); - } - - // We do not use the return value, the err has been set to entry->err. - return NULL; -} - -// It MUST be thread-safe, global and shared object. -SrsThreadPool *_srs_thread_pool = new SrsThreadPool(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp deleted file mode 100644 index 34f7ade88..000000000 --- a/trunk/src/app/srs_app_threads.hpp +++ /dev/null @@ -1,181 +0,0 @@ -// -// Copyright (c) 2013-2025 The SRS Authors -// -// SPDX-License-Identifier: MIT -// - -#ifndef SRS_APP_THREADS_HPP -#define SRS_APP_THREADS_HPP - -#include - -#include - -#include - -class SrsThreadPool; -class SrsProcSelfStat; - -// Protect server in high load. -class SrsCircuitBreaker : public ISrsFastTimer -{ -private: - // The config for high/critical water level. - bool enabled_; - int high_threshold_; - int high_pulse_; - int critical_threshold_; - int critical_pulse_; - int dying_threshold_; - int dying_pulse_; - -private: - // Reset the water-level when CPU is low for N times. - // @note To avoid the CPU change rapidly. - int hybrid_high_water_level_; - int hybrid_critical_water_level_; - int hybrid_dying_water_level_; - -public: - SrsCircuitBreaker(); - virtual ~SrsCircuitBreaker(); - -public: - srs_error_t initialize(); - -public: - // Whether hybrid server water-level is high. - bool hybrid_high_water_level(); - bool hybrid_critical_water_level(); - bool hybrid_dying_water_level(); - // interface ISrsFastTimer -private: - srs_error_t on_timer(srs_utime_t interval); -}; - -extern SrsCircuitBreaker *_srs_circuit_breaker; - -// Initialize global shared variables cross all threads. -extern srs_error_t srs_global_initialize(); - -// The thread mutex wrapper, without error. -class SrsThreadMutex -{ -private: - pthread_mutex_t lock_; - pthread_mutexattr_t attr_; - -public: - SrsThreadMutex(); - virtual ~SrsThreadMutex(); - -public: - void lock(); - void unlock(); -}; - -// The thread mutex locker. -// TODO: FIXME: Rename _SRS to _srs -#define SrsThreadLocker(instance) \ - impl__SrsThreadLocker _SRS_free_##instance(instance) - -class impl__SrsThreadLocker -{ -private: - SrsThreadMutex *lock; - -public: - impl__SrsThreadLocker(SrsThreadMutex *l) - { - lock = l; - lock->lock(); - } - virtual ~impl__SrsThreadLocker() - { - lock->unlock(); - } -}; - -// The information for a thread. -class SrsThreadEntry -{ -public: - SrsThreadPool *pool; - std::string label; - std::string name; - srs_error_t (*start)(void *arg); - void *arg; - int num; - // @see https://man7.org/linux/man-pages/man2/gettid.2.html - pid_t tid; - -public: - // The thread object. - pthread_t trd; - // The exit error of thread. - srs_error_t err; - -public: - SrsThreadEntry(); - virtual ~SrsThreadEntry(); -}; - -// Allocate a(or almost) fixed thread poll to execute tasks, -// so that we can take the advantage of multiple CPUs. -class SrsThreadPool -{ -private: - SrsThreadEntry *entry_; - srs_utime_t interval_; - -private: - SrsThreadMutex *lock_; - std::vector threads_; - -private: - // The hybrid server entry, the cpu percent used for circuit breaker. - SrsThreadEntry *hybrid_; - std::vector hybrids_; - -private: - // The pid file fd, lock the file write when server is running. - // @remark the init.d script should cleanup the pid file, when stop service, - // for the server never delete the file; when system startup, the pid in pid file - // maybe valid but the process is not SRS, the init.d script will never start server. - int pid_fd; - -public: - SrsThreadPool(); - virtual ~SrsThreadPool(); - -public: - // Setup the thread-local variables. - static srs_error_t setup_thread_locals(); - // Initialize the thread pool. - srs_error_t initialize(); - -private: - // Require the PID file for the whole process. - virtual srs_error_t acquire_pid_file(); - -public: - // Execute start function with label in thread. - srs_error_t execute(std::string label, srs_error_t (*start)(void *arg), void *arg); - // Run in the primordial thread, util stop or quit. - srs_error_t run(); - // Stop the thread pool and quit the primordial thread. - void stop(); - -public: - SrsThreadEntry *self(); - SrsThreadEntry *hybrid(); - std::vector hybrids(); - -private: - static void *start(void *arg); -}; - -// It MUST be thread-safe, global and shared object. -extern SrsThreadPool *_srs_thread_pool; - -#endif diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index e2b8024b2..52c2ba9d2 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 58 +#define VERSION_REVISION 59 #endif \ No newline at end of file diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index b89e8dd4b..bdc1237e6 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -13,7 +13,6 @@ using namespace std; #include -#include #include #include #include diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 217577771..4f42f4467 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -31,11 +31,11 @@ using namespace std; #include using namespace std; +#include #include #include #include #include -#include #include #include #include @@ -55,7 +55,7 @@ using namespace std; // pre-declare srs_error_t run_directly_or_daemon(); -srs_error_t run_in_thread_pool(); +srs_error_t run_hybrid_server(); void show_macro_features(); // @global log and context. @@ -109,10 +109,6 @@ srs_error_t do_main(int argc, char **argv, char **envp) return srs_error_wrap(err, "global init"); } - if ((err = SrsThreadPool::setup_thread_locals()) != srs_success) { - return srs_error_wrap(err, "thread init"); - } - // For background context id. _srs_context->set_id(_srs_context->generate_id()); @@ -415,8 +411,8 @@ srs_error_t run_directly_or_daemon() // If not daemon, directly run hybrid server. if (!run_as_daemon) { - if ((err = run_in_thread_pool()) != srs_success) { - return srs_error_wrap(err, "run thread pool"); + if ((err = run_hybrid_server()) != srs_success) { + return srs_error_wrap(err, "run server"); } return srs_success; } @@ -452,40 +448,15 @@ srs_error_t run_directly_or_daemon() // son srs_trace("son(daemon) process running."); - if ((err = run_in_thread_pool()) != srs_success) { - return srs_error_wrap(err, "daemon run thread pool"); + if ((err = run_hybrid_server()) != srs_success) { + return srs_error_wrap(err, "daemon run server"); } return err; } -srs_error_t run_hybrid_server(void *arg); -srs_error_t run_in_thread_pool() -{ - srs_error_t err = srs_success; - - // Initialize the thread pool, even if we run in single thread mode. - if ((err = _srs_thread_pool->initialize()) != srs_success) { - return srs_error_wrap(err, "init thread pool"); - } - -#ifdef SRS_SINGLE_THREAD - srs_trace("Run in single thread mode"); - return run_hybrid_server(NULL); -#else - // Start the hybrid service worker thread, for RTMP and RTC server, etc. - if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, (void *)NULL)) != srs_success) { - return srs_error_wrap(err, "start hybrid server thread"); - } - - srs_trace("Pool: Start threads primordial=1, hybrids=1 ok"); - - return _srs_thread_pool->run(); -#endif -} - #include -srs_error_t run_hybrid_server(void * /*arg*/) +srs_error_t run_hybrid_server() { srs_error_t err = srs_success; @@ -497,7 +468,7 @@ srs_error_t run_hybrid_server(void * /*arg*/) #endif #ifdef SRS_RTC - _srs_hybrid->register_server(new RtcServerAdapter()); + _srs_hybrid->register_server(new SrsRtcServerAdapter()); #endif // Do some system initialize. diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index 03442a906..f84ad16b1 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include @@ -63,10 +62,6 @@ srs_error_t prepare_main() return srs_error_wrap(err, "init global"); } - if ((err = SrsThreadPool::setup_thread_locals()) != srs_success) { - return srs_error_wrap(err, "init thread"); - } - srs_freep(_srs_log); _srs_log = new MockEmptyLog(SrsLogLevelError);