AI: Merge SRT and RTC servers into unified SrsServer. v7.0.68 (#4459)

This PR consolidates the SRT and RTC server functionality into the main
SrsServer class, eliminating the separate `SrsSrtServer` and
`SrsRtcServer` classes and their corresponding adapter classes. This
architectural change simplifies the codebase by removing the hybrid
server pattern and integrating all protocol handling directly into
`SrsServer`.

As unified connection manager (`_srs_conn_manager`) for all protocol
connections, all incoming connections are checked against the same
connection limit in `on_before_connection()`. This enables consistent
connection limits: `max_connections` now protects against resource
exhaustion from any protocol, not just RTMP.

Remove modules because it's not used now, so only keep the server
application module and main entry point. Remove the wait group to run
server, instead, directly run server and invoke the cycle method.

After this PR, the startup workflow and servers architecture should be
much easier to maintain.

---------

Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-08-31 08:58:37 -04:00 committed by GitHub
parent 084bb6f7fc
commit 32dfed43ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 1743 additions and 3844 deletions

82
trunk/configure vendored
View File

@ -55,27 +55,6 @@ SRS_BUILD_SUMMARY="_srs_build_summary.sh"
SrsUtestMakeEntry="@echo -e \"ignore utest for it's disabled\""
if [[ $SRS_UTEST == YES ]]; then SrsUtestMakeEntry="\$(MAKE)\$(JOBS) -C ${SRS_OBJS}/${SRS_PLATFORM}/utest"; fi
#####################################################################################
# finger out modules to install.
# where srs module is a dir which contains a config file.
SRS_MODULES=()
__mfiles=$(find $SRS_WORKDIR/modules -name "config") && for __mfile in $__mfiles; do
SRS_MODULES+=("`dirname $__mfile`")
done
# variables for makefile for all modules.
__mphonys="" && __mdefaults="" && __mcleanups=""
# add each modules for application
for SRS_MODULE in ${SRS_MODULES[*]}; do
echo "install module at: $SRS_MODULE"
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
if [[ 0 -ne ${#SRS_MODULE_MAIN[@]} ]]; then
__mphonys="$__mphonys $SRS_MODULE_NAME"
__mdefaults="$__mdefaults $SRS_MODULE_NAME"
__mcleanups="$__mcleanups $SRS_MODULE_NAME"
fi
done
# generate extra phony for each modules.
cat << END > ${SRS_OBJS}/Makefile
@ -319,7 +298,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc
"srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec"
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker"
"srs_app_coworkers" "srs_app_circuit_breaker"
"srs_app_stream_token")
if [[ $SRS_SRT == YES ]]; then
MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source")
@ -339,11 +318,6 @@ fi
DEFINES=""
# add each modules for app
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
MODULE_FILES+=("${SRS_MODULE_APP[*]}")
DEFINES="${DEFINES} ${SRS_MODULE_DEFINES}"
done
APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . $SRS_WORKDIR/auto/modules.sh
APP_OBJS="${MODULE_OBJS[@]}"
#
@ -357,24 +331,6 @@ fi
MODULE_FILES=("srs_main_server")
SERVER_INCS="src/main"; MODULE_DIR=${SERVER_INCS} . $SRS_WORKDIR/auto/modules.sh
SERVER_OBJS="${MODULE_OBJS[@]}"
#
#Main Module, for app from modules.
MODULE_ID="MAIN"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "APP")
ModuleLibIncs=(${SRS_OBJS} ${LibGperfRoot} ${LibSSLRoot} ${LibSrtpRoot})
if [[ $SRS_FFMPEG_FIT == YES ]]; then
ModuleLibIncs+=("${LibFfmpegRoot[*]}")
fi
MODULE_FILES=()
DEFINES=""
# add each modules for main
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
MODULE_FILES+=("${SRS_MODULE_MAIN[*]}")
DEFINES="${DEFINES} ${SRS_MODULE_DEFINES}"
done
MAIN_INCS="src/main"; MODULE_DIR=${MAIN_INCS} . $SRS_WORKDIR/auto/modules.sh
MAIN_OBJS="${MODULE_OBJS[@]}"
#####################################################################################
# Binaries, main entrances, link the module and its depends modules,
@ -382,10 +338,6 @@ MAIN_OBJS="${MODULE_OBJS[@]}"
#
# all main entrances
MAIN_ENTRANCES=("srs_main_server")
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
MAIN_ENTRANCES+=("${SRS_MODULE_MAIN[*]}")
done
#
# all depends libraries
ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile} ${LibSrtpFile})
@ -419,13 +371,6 @@ fi
if [[ $SRS_SRT == YES ]]; then
ModuleLibFiles+=("${LibSRTfile[*]}")
fi
#
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
# no SRS_MODULE_MAIN
if [[ 0 -eq ${#SRS_MODULE_MAIN[@]} ]]; then continue; fi
BUILD_KEY="$SRS_MODULE_NAME" APP_MAIN="${SRS_MODULE_MAIN[0]}" APP_NAME="$SRS_MODULE_NAME" . $SRS_WORKDIR/auto/apps.sh
done
# For utest on mac.
# @see https://github.com/protocolbuffers/protobuf/issues/51#issuecomment-111044468
if [[ $SRS_OSX == YES ]]; then
@ -480,7 +425,7 @@ mv ${SRS_MAKEFILE} ${SRS_MAKEFILE}.bk
# generate phony header
cat << END > ${SRS_MAKEFILE}
.PHONY: default all _default install help clean destroy server utest _prepare_dir $__mphonys
.PHONY: clean_srs clean_modules clean_openssl clean_srtp2 clean_opus clean_ffmpeg clean_st
.PHONY: clean_srs clean_openssl clean_srtp2 clean_opus clean_ffmpeg clean_st
.PHONY: st ffmpeg
CC = ${SRS_TOOL_CC}
@ -512,7 +457,7 @@ default: server
all: _default
_default: server utest $__mdefaults
_default: server utest
help:
@echo "Usage: make <help>|<clean>|<destroy>|<server>|<utest>|<install>|<uninstall>"
@ -541,7 +486,7 @@ doclean:
(cd ${SRS_OBJS} && rm -rf src/* include lib)
(mkdir -p ${SRS_OBJS}/utest && cd ${SRS_OBJS}/utest && rm -rf *.o *.a)
clean: clean_srs clean_modules
clean: clean_srs
destroy:
(cd ${SRS_OBJS} && rm -rf ${SRS_PLATFORM})
@ -549,9 +494,6 @@ destroy:
clean_srs:
@(cd ${SRS_OBJS} && rm -rf srs srs_utest src/* utest/*)
clean_modules:
@(cd ${SRS_OBJS} && rm -rf $__mdefaults)
clean_openssl:
@rm -rf ${SRS_OBJS}/${SRS_PLATFORM}/3rdparty/openssl
@echo "Please rebuild openssl by: ./configure"
@ -595,17 +537,6 @@ server: _prepare_dir
END
# generate all modules entry
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_WORKDIR/auto/reset_module.sh && . $SRS_MODULE/config
cat << END >> ${SRS_MAKEFILE}
$SRS_MODULE_NAME: _prepare_dir server
@echo "Build the $SRS_MODULE_NAME over SRS"
\$(MAKE)\$(JOBS) -f ${SRS_OBJS}/Makefile $SRS_MODULE_NAME
END
done
# install entry
cat << END >> ${SRS_MAKEFILE}
uninstall:
@ -817,11 +748,6 @@ else
echo -e "${GREEN}Note: The sanitizer is disabled.${BLACK}"
fi
# add each modules for application
for SRS_MODULE in ${SRS_MODULES[*]}; do
echo -e "${GREEN}Enable module: $SRS_MODULE${BLACK}"
done
#####################################################################################
# Do cleanup when configure done.
#####################################################################################

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-08-31, Merge [#4459](https://github.com/ossrs/srs/pull/4459): AI: Merge SRT and RTC servers into unified SrsServer. v7.0.68 (#4459)
* v7.0, 2025-08-29, Merge [#4457](https://github.com/ossrs/srs/pull/4457): Support IPv6 for all protocols: RTMP, HTTP/HTTPS, WebRTC, SRT, RTSP. v7.0.67 (#4457)
* v7.0, 2025-08-28, Merge [#4456](https://github.com/ossrs/srs/pull/4456): AI: Remove cloud CLS and APM. v7.0.66 (#4456)
* v7.0, 2025-08-27, Merge [#4455](https://github.com/ossrs/srs/pull/4455): Gather utility functions to kernel or protocol. v7.0.65 (#4455)

View File

@ -1,6 +0,0 @@
# The module to ingest hls to replace ffmpeg with better behavior.
SRS_MODULE_NAME=("srs_hls_ingester")
SRS_MODULE_MAIN=("srs_main_ingest_hls")
SRS_MODULE_APP=()
SRS_MODULE_DEFINES=""

View File

@ -1,6 +0,0 @@
# The module to parse mp4 file.
SRS_MODULE_NAME=("srs_mp4_parser")
SRS_MODULE_MAIN=("srs_main_mp4_parser")
SRS_MODULE_APP=()
SRS_MODULE_DEFINES=""

View File

@ -1,20 +0,0 @@
SRS Application Module Rules(SRS应用模块规则)
1. Each module in its isolate home directory(一个模块一个目录).
2. There is a config file in home(目录下放一个config文件).
3. All variables in configure are available(所有的configure中的变量模块中可以使用).
The Variables in config(模块中需要定义变量,例如):
1. SRS_MODULE_NAMEThe application binary name, optional. (模块名称用来做Makefile的phony以及执行binary文件名。模块的二进制输出。为空时没有独立的二进制。)
2. SRS_MODULE_MAINThe source file in src/main directory, optional. (模块的main函数所在的cpp文件在src/main目录。模块在main的文件。可以为空。)
3. SRS_MODULE_APPThe source file in src/app directory, optional. (模块在src/app目录的源文件列表。模块在app的文件。可以为空。)
4. SRS_MODULE_DEFINES: The extra defined macros, optional. (模块编译时的额外宏定义。在app和main模块加入。可以为空。)
For example(下面是一个RTMFP服务器实例):
SRS_MODULE_NAME=("srs_rtmfpd")
SRS_MODULE_MAIN=("srs_main_rtmfpd")
SRS_MODULE_APP=("srs_app_rtfmpd")
SRS_MODULE_DEFINES="-DRTMFPD"
Winlin, 2015.3

View File

@ -7,7 +7,7 @@
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_kbps.hpp>
#include <srs_kernel_utility.hpp>
@ -17,6 +17,8 @@ extern SrsPps *_srs_pps_snack2;
extern SrsPps *_srs_pps_snack3;
extern SrsPps *_srs_pps_snack4;
extern SrsServer *_srs_server;
using namespace std;
ISrsCircuitBreaker::ISrsCircuitBreaker()
@ -60,7 +62,7 @@ srs_error_t SrsCircuitBreaker::initialize()
// Update the water level for circuit breaker.
// @see SrsCircuitBreaker::on_timer()
_srs_hybrid->timer1s()->subscribe(this);
_srs_server->timer1s()->subscribe(this);
srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_,
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,

View File

@ -408,11 +408,12 @@ std::string SrsGbSession::desc()
return "GBS";
}
SrsGbListener::SrsGbListener()
SrsGbListener::SrsGbListener(ISrsHttpServeMux *http_api_mux)
{
conf_ = NULL;
sip_listener_ = new SrsTcpListener(this);
media_listener_ = new SrsTcpListener(this);
http_api_mux_ = http_api_mux;
}
SrsGbListener::~SrsGbListener()
@ -469,10 +470,7 @@ srs_error_t SrsGbListener::listen_api()
{
srs_error_t err = srs_success;
// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux *http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
if ((err = http_api_mux_->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

View File

@ -40,6 +40,7 @@ class SrsRawHEVCStream;
class SrsSharedPtrMessage;
class SrsPithyPrint;
class SrsRawAacStream;
class ISrsHttpServeMux;
// The state machine for GB session.
// init:
@ -218,9 +219,10 @@ private:
SrsConfDirective *conf_;
SrsTcpListener *media_listener_;
SrsTcpListener *sip_listener_;
ISrsHttpServeMux *http_api_mux_;
public:
SrsGbListener();
SrsGbListener(ISrsHttpServeMux *http_api_mux);
virtual ~SrsGbListener();
public:

View File

@ -1225,7 +1225,7 @@ srs_error_t SrsGoApiSignal::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessag
signo = SRS_SIGNAL_ASSERT_ABORT;
}
_srs_hybrid->srs()->instance()->on_signal(signo);
_srs_server->on_signal(signo);
// By default, response the json style response.
SrsUniquePtr<SrsJsonObject> obj(SrsJsonAny::object());

View File

@ -16,7 +16,6 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_server.hpp>
#include <srs_app_source.hpp>
@ -61,13 +60,13 @@ void SrsHlsVirtualConn::expire()
SrsHlsStream::SrsHlsStream()
{
_srs_hybrid->timer5s()->subscribe(this);
_srs_server->timer5s()->subscribe(this);
security_ = new SrsSecurity();
}
SrsHlsStream::~SrsHlsStream()
{
_srs_hybrid->timer5s()->unsubscribe(this);
_srs_server->timer5s()->unsubscribe(this);
std::map<std::string, SrsHlsVirtualConn *>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {

View File

@ -1,797 +0,0 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_app_hybrid.hpp>
#include <srs_app_async_call.hpp>
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_dvr.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_log.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_server.hpp>
#include <srs_app_source.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_st.hpp>
#ifdef SRS_SRT
#include <srs_app_srt_source.hpp>
#endif
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#ifdef SRS_RTSP
#include <srs_app_rtsp_source.hpp>
#endif
#include <srs_protocol_kbps.hpp>
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
SrsAsyncCallWorker *_srs_dvr_async = NULL;
extern SrsPps *_srs_pps_cids_get;
extern SrsPps *_srs_pps_cids_set;
extern SrsPps *_srs_pps_timer;
extern SrsPps *_srs_pps_pub;
extern SrsPps *_srs_pps_conn;
extern SrsPps *_srs_pps_dispose;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
extern __thread unsigned long long _st_stat_recvfrom;
extern __thread unsigned long long _st_stat_recvfrom_eagain;
extern __thread unsigned long long _st_stat_sendto;
extern __thread unsigned long long _st_stat_sendto_eagain;
SrsPps *_srs_pps_recvfrom = NULL;
SrsPps *_srs_pps_recvfrom_eagain = NULL;
SrsPps *_srs_pps_sendto = NULL;
SrsPps *_srs_pps_sendto_eagain = NULL;
extern __thread unsigned long long _st_stat_read;
extern __thread unsigned long long _st_stat_read_eagain;
extern __thread unsigned long long _st_stat_readv;
extern __thread unsigned long long _st_stat_readv_eagain;
extern __thread unsigned long long _st_stat_writev;
extern __thread unsigned long long _st_stat_writev_eagain;
SrsPps *_srs_pps_read = NULL;
SrsPps *_srs_pps_read_eagain = NULL;
SrsPps *_srs_pps_readv = NULL;
SrsPps *_srs_pps_readv_eagain = NULL;
SrsPps *_srs_pps_writev = NULL;
SrsPps *_srs_pps_writev_eagain = NULL;
extern __thread unsigned long long _st_stat_recvmsg;
extern __thread unsigned long long _st_stat_recvmsg_eagain;
extern __thread unsigned long long _st_stat_sendmsg;
extern __thread unsigned long long _st_stat_sendmsg_eagain;
SrsPps *_srs_pps_recvmsg = NULL;
SrsPps *_srs_pps_recvmsg_eagain = NULL;
SrsPps *_srs_pps_sendmsg = NULL;
SrsPps *_srs_pps_sendmsg_eagain = NULL;
extern __thread unsigned long long _st_stat_epoll;
extern __thread unsigned long long _st_stat_epoll_zero;
extern __thread unsigned long long _st_stat_epoll_shake;
extern __thread unsigned long long _st_stat_epoll_spin;
SrsPps *_srs_pps_epoll = NULL;
SrsPps *_srs_pps_epoll_zero = NULL;
SrsPps *_srs_pps_epoll_shake = NULL;
SrsPps *_srs_pps_epoll_spin = NULL;
extern __thread unsigned long long _st_stat_sched_15ms;
extern __thread unsigned long long _st_stat_sched_20ms;
extern __thread unsigned long long _st_stat_sched_25ms;
extern __thread unsigned long long _st_stat_sched_30ms;
extern __thread unsigned long long _st_stat_sched_35ms;
extern __thread unsigned long long _st_stat_sched_40ms;
extern __thread unsigned long long _st_stat_sched_80ms;
extern __thread unsigned long long _st_stat_sched_160ms;
extern __thread unsigned long long _st_stat_sched_s;
SrsPps *_srs_pps_sched_15ms = NULL;
SrsPps *_srs_pps_sched_20ms = NULL;
SrsPps *_srs_pps_sched_25ms = NULL;
SrsPps *_srs_pps_sched_30ms = NULL;
SrsPps *_srs_pps_sched_35ms = NULL;
SrsPps *_srs_pps_sched_40ms = NULL;
SrsPps *_srs_pps_sched_80ms = NULL;
SrsPps *_srs_pps_sched_160ms = NULL;
SrsPps *_srs_pps_sched_s = NULL;
#endif
SrsPps *_srs_pps_clock_15ms = NULL;
SrsPps *_srs_pps_clock_20ms = NULL;
SrsPps *_srs_pps_clock_25ms = NULL;
SrsPps *_srs_pps_clock_30ms = NULL;
SrsPps *_srs_pps_clock_35ms = NULL;
SrsPps *_srs_pps_clock_40ms = NULL;
SrsPps *_srs_pps_clock_80ms = NULL;
SrsPps *_srs_pps_clock_160ms = NULL;
SrsPps *_srs_pps_timer_s = NULL;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
extern __thread int _st_active_count;
extern __thread int _st_num_free_stacks;
extern __thread unsigned long long _st_stat_thread_run;
extern __thread unsigned long long _st_stat_thread_idle;
extern __thread unsigned long long _st_stat_thread_yield;
extern __thread unsigned long long _st_stat_thread_yield2;
SrsPps *_srs_pps_thread_run = NULL;
SrsPps *_srs_pps_thread_idle = NULL;
SrsPps *_srs_pps_thread_yield = NULL;
SrsPps *_srs_pps_thread_yield2 = NULL;
#endif
extern SrsPps *_srs_pps_objs_rtps;
extern SrsPps *_srs_pps_objs_rraw;
extern SrsPps *_srs_pps_objs_rfua;
extern SrsPps *_srs_pps_objs_rbuf;
extern SrsPps *_srs_pps_objs_msgs;
extern SrsPps *_srs_pps_objs_rothers;
extern ISrsLog *_srs_log;
extern ISrsContext *_srs_context;
extern SrsConfig *_srs_config;
extern SrsStageManager *_srs_stages;
extern SrsRtcBlackhole *_srs_blackhole;
extern SrsResourceManager *_srs_rtc_manager;
extern SrsDtlsCertificate *_srs_rtc_dtls_certificate;
SrsPps *_srs_pps_aloss2 = NULL;
extern SrsPps *_srs_pps_ids;
extern SrsPps *_srs_pps_fids;
extern SrsPps *_srs_pps_fids_level0;
extern SrsPps *_srs_pps_dispose;
extern SrsPps *_srs_pps_timer;
extern SrsPps *_srs_pps_snack;
extern SrsPps *_srs_pps_snack2;
extern SrsPps *_srs_pps_snack3;
extern SrsPps *_srs_pps_snack4;
extern SrsPps *_srs_pps_sanack;
extern SrsPps *_srs_pps_svnack;
extern SrsPps *_srs_pps_rnack;
extern SrsPps *_srs_pps_rnack2;
extern SrsPps *_srs_pps_rhnack;
extern SrsPps *_srs_pps_rmnack;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
extern SrsPps *_srs_pps_recvfrom;
extern SrsPps *_srs_pps_recvfrom_eagain;
extern SrsPps *_srs_pps_sendto;
extern SrsPps *_srs_pps_sendto_eagain;
extern SrsPps *_srs_pps_read;
extern SrsPps *_srs_pps_read_eagain;
extern SrsPps *_srs_pps_readv;
extern SrsPps *_srs_pps_readv_eagain;
extern SrsPps *_srs_pps_writev;
extern SrsPps *_srs_pps_writev_eagain;
extern SrsPps *_srs_pps_recvmsg;
extern SrsPps *_srs_pps_recvmsg_eagain;
extern SrsPps *_srs_pps_sendmsg;
extern SrsPps *_srs_pps_sendmsg_eagain;
extern SrsPps *_srs_pps_epoll;
extern SrsPps *_srs_pps_epoll_zero;
extern SrsPps *_srs_pps_epoll_shake;
extern SrsPps *_srs_pps_epoll_spin;
extern SrsPps *_srs_pps_sched_15ms;
extern SrsPps *_srs_pps_sched_20ms;
extern SrsPps *_srs_pps_sched_25ms;
extern SrsPps *_srs_pps_sched_30ms;
extern SrsPps *_srs_pps_sched_35ms;
extern SrsPps *_srs_pps_sched_40ms;
extern SrsPps *_srs_pps_sched_80ms;
extern SrsPps *_srs_pps_sched_160ms;
extern SrsPps *_srs_pps_sched_s;
#endif
extern SrsPps *_srs_pps_clock_15ms;
extern SrsPps *_srs_pps_clock_20ms;
extern SrsPps *_srs_pps_clock_25ms;
extern SrsPps *_srs_pps_clock_30ms;
extern SrsPps *_srs_pps_clock_35ms;
extern SrsPps *_srs_pps_clock_40ms;
extern SrsPps *_srs_pps_clock_80ms;
extern SrsPps *_srs_pps_clock_160ms;
extern SrsPps *_srs_pps_timer_s;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
extern SrsPps *_srs_pps_thread_run;
extern SrsPps *_srs_pps_thread_idle;
extern SrsPps *_srs_pps_thread_yield;
extern SrsPps *_srs_pps_thread_yield2;
#endif
extern SrsPps *_srs_pps_rpkts;
extern SrsPps *_srs_pps_addrs;
extern SrsPps *_srs_pps_fast_addrs;
extern SrsPps *_srs_pps_spkts;
extern SrsPps *_srs_pps_sstuns;
extern SrsPps *_srs_pps_srtcps;
extern SrsPps *_srs_pps_srtps;
extern SrsPps *_srs_pps_pli;
extern SrsPps *_srs_pps_twcc;
extern SrsPps *_srs_pps_rr;
extern SrsPps *_srs_pps_pub;
extern SrsPps *_srs_pps_conn;
extern SrsPps *_srs_pps_rstuns;
extern SrsPps *_srs_pps_rrtps;
extern SrsPps *_srs_pps_rrtcps;
extern SrsPps *_srs_pps_aloss2;
extern SrsPps *_srs_pps_cids_get;
extern SrsPps *_srs_pps_cids_set;
extern SrsPps *_srs_pps_objs_msgs;
extern SrsPps *_srs_pps_objs_rtps;
extern SrsPps *_srs_pps_objs_rraw;
extern SrsPps *_srs_pps_objs_rfua;
extern SrsPps *_srs_pps_objs_rbuf;
extern SrsPps *_srs_pps_objs_rothers;
extern srs_error_t _srs_reload_err;
extern SrsReloadState _srs_reload_state;
extern std::string _srs_reload_id;
ISrsHybridServer::ISrsHybridServer()
{
}
ISrsHybridServer::~ISrsHybridServer()
{
}
SrsHybridServer::SrsHybridServer()
{
// Create global shared timer.
timer20ms_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
timer100ms_ = new SrsFastTimer("hybrid", 100 * SRS_UTIME_MILLISECONDS);
timer1s_ = new SrsFastTimer("hybrid", 1 * SRS_UTIME_SECONDS);
timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS);
clock_monitor_ = new SrsClockWallMonitor();
pid_fd = -1;
}
SrsHybridServer::~SrsHybridServer()
{
// We must free servers first, because it may depend on the timers of hybrid server.
vector<ISrsHybridServer *>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer *server = *it;
srs_freep(server);
}
servers.clear();
srs_freep(clock_monitor_);
srs_freep(timer20ms_);
srs_freep(timer100ms_);
srs_freep(timer1s_);
srs_freep(timer5s_);
if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}
}
void SrsHybridServer::register_server(ISrsHybridServer *svr)
{
servers.push_back(svr);
}
srs_error_t SrsHybridServer::initialize()
{
srs_error_t err = srs_success;
if ((err = acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "acquire pid file");
}
srs_trace("Hybrid server initialized in single thread mode");
// Start the timer first.
if ((err = timer20ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
if ((err = timer100ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
if ((err = timer1s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
if ((err = timer5s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// Start the DVR async call.
if ((err = _srs_dvr_async->start()) != srs_success) {
return srs_error_wrap(err, "dvr async");
}
// Register some timers.
timer20ms_->subscribe(clock_monitor_);
timer5s_->subscribe(this);
// Initialize all hybrid servers.
vector<ISrsHybridServer *>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer *server = *it;
if ((err = server->initialize()) != srs_success) {
return srs_error_wrap(err, "init server");
}
}
return err;
}
srs_error_t SrsHybridServer::run()
{
srs_error_t err = srs_success;
// Wait for all servers which need to do cleanup.
SrsWaitGroup wg;
vector<ISrsHybridServer *>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer *server = *it;
if ((err = server->run(&wg)) != srs_success) {
return srs_error_wrap(err, "run server");
}
}
// Wait for all server to quit.
wg.wait();
return err;
}
void SrsHybridServer::stop()
{
vector<ISrsHybridServer *>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer *server = *it;
server->stop();
}
}
SrsServerAdapter *SrsHybridServer::srs()
{
for (vector<ISrsHybridServer *>::iterator it = servers.begin(); it != servers.end(); ++it) {
if (dynamic_cast<SrsServerAdapter *>(*it)) {
return dynamic_cast<SrsServerAdapter *>(*it);
}
}
return NULL;
}
SrsFastTimer *SrsHybridServer::timer20ms()
{
return timer20ms_;
}
SrsFastTimer *SrsHybridServer::timer100ms()
{
return timer100ms_;
}
SrsFastTimer *SrsHybridServer::timer1s()
{
return timer1s_;
}
SrsFastTimer *SrsHybridServer::timer5s()
{
return timer5s_;
}
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
// Show statistics for RTC server.
SrsProcSelfStat *u = srs_get_self_proc_stat();
// Resident Set Size: number of pages the process has in real memory.
int memory = (int)(u->rss * 4 / 1024);
static char buf[128];
string cid_desc;
_srs_pps_cids_get->update();
_srs_pps_cids_set->update();
if (_srs_pps_cids_get->r10s() || _srs_pps_cids_set->r10s()) {
snprintf(buf, sizeof(buf), ", cid=%d,%d", _srs_pps_cids_get->r10s(), _srs_pps_cids_set->r10s());
cid_desc = buf;
}
string timer_desc;
_srs_pps_timer->update();
_srs_pps_pub->update();
_srs_pps_conn->update();
if (_srs_pps_timer->r10s() || _srs_pps_pub->r10s() || _srs_pps_conn->r10s()) {
snprintf(buf, sizeof(buf), ", timer=%d,%d,%d", _srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s());
timer_desc = buf;
}
string free_desc;
_srs_pps_dispose->update();
if (_srs_pps_dispose->r10s()) {
snprintf(buf, sizeof(buf), ", free=%d", _srs_pps_dispose->r10s());
free_desc = buf;
}
string recvfrom_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvfrom->update(_st_stat_recvfrom);
_srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain);
_srs_pps_sendto->update(_st_stat_sendto);
_srs_pps_sendto_eagain->update(_st_stat_sendto_eagain);
if (_srs_pps_recvfrom->r10s() || _srs_pps_recvfrom_eagain->r10s() || _srs_pps_sendto->r10s() || _srs_pps_sendto_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", udp=%d,%d,%d,%d", _srs_pps_recvfrom->r10s(), _srs_pps_recvfrom_eagain->r10s(), _srs_pps_sendto->r10s(), _srs_pps_sendto_eagain->r10s());
recvfrom_desc = buf;
}
#endif
string io_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_read->update(_st_stat_read);
_srs_pps_read_eagain->update(_st_stat_read_eagain);
_srs_pps_readv->update(_st_stat_readv);
_srs_pps_readv_eagain->update(_st_stat_readv_eagain);
_srs_pps_writev->update(_st_stat_writev);
_srs_pps_writev_eagain->update(_st_stat_writev_eagain);
if (_srs_pps_read->r10s() || _srs_pps_read_eagain->r10s() || _srs_pps_readv->r10s() || _srs_pps_readv_eagain->r10s() || _srs_pps_writev->r10s() || _srs_pps_writev_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", io=%d,%d,%d,%d,%d,%d", _srs_pps_read->r10s(), _srs_pps_read_eagain->r10s(), _srs_pps_readv->r10s(), _srs_pps_readv_eagain->r10s(), _srs_pps_writev->r10s(), _srs_pps_writev_eagain->r10s());
io_desc = buf;
}
#endif
string msg_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvmsg->update(_st_stat_recvmsg);
_srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain);
_srs_pps_sendmsg->update(_st_stat_sendmsg);
_srs_pps_sendmsg_eagain->update(_st_stat_sendmsg_eagain);
if (_srs_pps_recvmsg->r10s() || _srs_pps_recvmsg_eagain->r10s() || _srs_pps_sendmsg->r10s() || _srs_pps_sendmsg_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", msg=%d,%d,%d,%d", _srs_pps_recvmsg->r10s(), _srs_pps_recvmsg_eagain->r10s(), _srs_pps_sendmsg->r10s(), _srs_pps_sendmsg_eagain->r10s());
msg_desc = buf;
}
#endif
string epoll_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_epoll->update(_st_stat_epoll);
_srs_pps_epoll_zero->update(_st_stat_epoll_zero);
_srs_pps_epoll_shake->update(_st_stat_epoll_shake);
_srs_pps_epoll_spin->update(_st_stat_epoll_spin);
if (_srs_pps_epoll->r10s() || _srs_pps_epoll_zero->r10s() || _srs_pps_epoll_shake->r10s() || _srs_pps_epoll_spin->r10s()) {
snprintf(buf, sizeof(buf), ", epoll=%d,%d,%d,%d", _srs_pps_epoll->r10s(), _srs_pps_epoll_zero->r10s(), _srs_pps_epoll_shake->r10s(), _srs_pps_epoll_spin->r10s());
epoll_desc = buf;
}
#endif
string sched_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_sched_160ms->update(_st_stat_sched_160ms);
_srs_pps_sched_s->update(_st_stat_sched_s);
_srs_pps_sched_15ms->update(_st_stat_sched_15ms);
_srs_pps_sched_20ms->update(_st_stat_sched_20ms);
_srs_pps_sched_25ms->update(_st_stat_sched_25ms);
_srs_pps_sched_30ms->update(_st_stat_sched_30ms);
_srs_pps_sched_35ms->update(_st_stat_sched_35ms);
_srs_pps_sched_40ms->update(_st_stat_sched_40ms);
_srs_pps_sched_80ms->update(_st_stat_sched_80ms);
if (_srs_pps_sched_160ms->r10s() || _srs_pps_sched_s->r10s() || _srs_pps_sched_15ms->r10s() || _srs_pps_sched_20ms->r10s() || _srs_pps_sched_25ms->r10s() || _srs_pps_sched_30ms->r10s() || _srs_pps_sched_35ms->r10s() || _srs_pps_sched_40ms->r10s() || _srs_pps_sched_80ms->r10s()) {
snprintf(buf, sizeof(buf), ", sched=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_sched_15ms->r10s(), _srs_pps_sched_20ms->r10s(), _srs_pps_sched_25ms->r10s(), _srs_pps_sched_30ms->r10s(), _srs_pps_sched_35ms->r10s(), _srs_pps_sched_40ms->r10s(), _srs_pps_sched_80ms->r10s(), _srs_pps_sched_160ms->r10s(), _srs_pps_sched_s->r10s());
sched_desc = buf;
}
#endif
string clock_desc;
_srs_pps_clock_15ms->update();
_srs_pps_clock_20ms->update();
_srs_pps_clock_25ms->update();
_srs_pps_clock_30ms->update();
_srs_pps_clock_35ms->update();
_srs_pps_clock_40ms->update();
_srs_pps_clock_80ms->update();
_srs_pps_clock_160ms->update();
_srs_pps_timer_s->update();
if (_srs_pps_clock_15ms->r10s() || _srs_pps_timer_s->r10s() || _srs_pps_clock_20ms->r10s() || _srs_pps_clock_25ms->r10s() || _srs_pps_clock_30ms->r10s() || _srs_pps_clock_35ms->r10s() || _srs_pps_clock_40ms->r10s() || _srs_pps_clock_80ms->r10s() || _srs_pps_clock_160ms->r10s()) {
snprintf(buf, sizeof(buf), ", clock=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_clock_15ms->r10s(), _srs_pps_clock_20ms->r10s(), _srs_pps_clock_25ms->r10s(), _srs_pps_clock_30ms->r10s(), _srs_pps_clock_35ms->r10s(), _srs_pps_clock_40ms->r10s(), _srs_pps_clock_80ms->r10s(), _srs_pps_clock_160ms->r10s(), _srs_pps_timer_s->r10s());
clock_desc = buf;
}
string thread_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_thread_run->update(_st_stat_thread_run);
_srs_pps_thread_idle->update(_st_stat_thread_idle);
_srs_pps_thread_yield->update(_st_stat_thread_yield);
_srs_pps_thread_yield2->update(_st_stat_thread_yield2);
if (_st_active_count > 0 || _st_num_free_stacks > 0 || _srs_pps_thread_run->r10s() || _srs_pps_thread_idle->r10s() || _srs_pps_thread_yield->r10s() || _srs_pps_thread_yield2->r10s()) {
snprintf(buf, sizeof(buf), ", co=%d,%d,%d, stk=%d, yield=%d,%d", _st_active_count, _srs_pps_thread_run->r10s(), _srs_pps_thread_idle->r10s(), _st_num_free_stacks, _srs_pps_thread_yield->r10s(), _srs_pps_thread_yield2->r10s());
thread_desc = buf;
}
#endif
string objs_desc;
_srs_pps_objs_rtps->update();
_srs_pps_objs_rraw->update();
_srs_pps_objs_rfua->update();
_srs_pps_objs_rbuf->update();
_srs_pps_objs_msgs->update();
_srs_pps_objs_rothers->update();
if (_srs_pps_objs_rtps->r10s() || _srs_pps_objs_rraw->r10s() || _srs_pps_objs_rfua->r10s() || _srs_pps_objs_rbuf->r10s() || _srs_pps_objs_msgs->r10s() || _srs_pps_objs_rothers->r10s()) {
snprintf(buf, sizeof(buf), ", objs=(pkt:%d,raw:%d,fua:%d,msg:%d,oth:%d,buf:%d)",
_srs_pps_objs_rtps->r10s(), _srs_pps_objs_rraw->r10s(), _srs_pps_objs_rfua->r10s(),
_srs_pps_objs_msgs->r10s(), _srs_pps_objs_rothers->r10s(), _srs_pps_objs_rbuf->r10s());
objs_desc = buf;
}
srs_trace("Hybrid cpu=%.2f%%,%dMB%s%s%s%s%s%s%s%s%s%s%s",
u->percent * 100, memory,
cid_desc.c_str(), timer_desc.c_str(),
recvfrom_desc.c_str(), io_desc.c_str(), msg_desc.c_str(),
epoll_desc.c_str(), sched_desc.c_str(), clock_desc.c_str(),
thread_desc.c_str(), free_desc.c_str(), objs_desc.c_str());
return err;
}
srs_error_t SrsHybridServer::acquire_pid_file()
{
std::string pid_file = _srs_config->get_pid_file();
// -rw-r--r--
// 644
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
int fd;
// open pid file
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) {
return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str());
}
// require write lock
struct flock lock;
lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
lock.l_start = 0; // type offset, relative to l_whence
lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = 0;
if (fcntl(fd, F_SETLK, &lock) == -1) {
if (errno == EACCES || errno == EAGAIN) {
::close(fd);
srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
}
return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str());
}
// truncate file
if (ftruncate(fd, 0) != 0) {
return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str());
}
// write the pid
string pid = srs_strconv_format_int(getpid());
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str());
}
// auto close when fork child process.
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd);
}
srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str());
pid_fd = fd;
return srs_success;
}
srs_error_t srs_global_initialize()
{
srs_error_t err = srs_success;
// Root global objects.
_srs_log = new SrsFileLog();
_srs_context = new SrsThreadContext();
_srs_config = new SrsConfig();
// The clock wall object.
_srs_clock = new SrsWallClock();
// The pps cids depends by st init.
_srs_pps_cids_get = new SrsPps();
_srs_pps_cids_set = new SrsPps();
// Initialize ST, which depends on pps cids.
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}
// The global objects which depends on ST.
_srs_hybrid = new SrsHybridServer();
_srs_sources = new SrsLiveSourceManager();
_srs_stages = new SrsStageManager();
_srs_circuit_breaker = new SrsCircuitBreaker();
_srs_hooks = new SrsHttpHooks();
#ifdef SRS_SRT
_srs_srt_sources = new SrsSrtSourceManager();
#endif
_srs_rtc_sources = new SrsRtcSourceManager();
_srs_blackhole = new SrsRtcBlackhole();
// Initialize stream publish token manager
_srs_stream_publish_tokens = new SrsStreamPublishTokenManager();
_srs_rtc_manager = new SrsResourceManager("RTC", true);
_srs_rtc_dtls_certificate = new SrsDtlsCertificate();
#ifdef SRS_RTSP
_srs_rtsp_sources = new SrsRtspSourceManager();
_srs_rtsp_manager = new SrsResourceManager("RTSP", true);
#endif
#ifdef SRS_GB28181
_srs_gb_manager = new SrsResourceManager("GB", true);
#endif
// Initialize global pps, which depends on _srs_clock
_srs_pps_ids = new SrsPps();
_srs_pps_fids = new SrsPps();
_srs_pps_fids_level0 = new SrsPps();
_srs_pps_dispose = new SrsPps();
_srs_pps_timer = new SrsPps();
_srs_pps_conn = new SrsPps();
_srs_pps_pub = new SrsPps();
_srs_pps_snack = new SrsPps();
_srs_pps_snack2 = new SrsPps();
_srs_pps_snack3 = new SrsPps();
_srs_pps_snack4 = new SrsPps();
_srs_pps_sanack = new SrsPps();
_srs_pps_svnack = new SrsPps();
_srs_pps_rnack = new SrsPps();
_srs_pps_rnack2 = new SrsPps();
_srs_pps_rhnack = new SrsPps();
_srs_pps_rmnack = new SrsPps();
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvfrom = new SrsPps();
_srs_pps_recvfrom_eagain = new SrsPps();
_srs_pps_sendto = new SrsPps();
_srs_pps_sendto_eagain = new SrsPps();
_srs_pps_read = new SrsPps();
_srs_pps_read_eagain = new SrsPps();
_srs_pps_readv = new SrsPps();
_srs_pps_readv_eagain = new SrsPps();
_srs_pps_writev = new SrsPps();
_srs_pps_writev_eagain = new SrsPps();
_srs_pps_recvmsg = new SrsPps();
_srs_pps_recvmsg_eagain = new SrsPps();
_srs_pps_sendmsg = new SrsPps();
_srs_pps_sendmsg_eagain = new SrsPps();
_srs_pps_epoll = new SrsPps();
_srs_pps_epoll_zero = new SrsPps();
_srs_pps_epoll_shake = new SrsPps();
_srs_pps_epoll_spin = new SrsPps();
_srs_pps_sched_15ms = new SrsPps();
_srs_pps_sched_20ms = new SrsPps();
_srs_pps_sched_25ms = new SrsPps();
_srs_pps_sched_30ms = new SrsPps();
_srs_pps_sched_35ms = new SrsPps();
_srs_pps_sched_40ms = new SrsPps();
_srs_pps_sched_80ms = new SrsPps();
_srs_pps_sched_160ms = new SrsPps();
_srs_pps_sched_s = new SrsPps();
#endif
_srs_pps_clock_15ms = new SrsPps();
_srs_pps_clock_20ms = new SrsPps();
_srs_pps_clock_25ms = new SrsPps();
_srs_pps_clock_30ms = new SrsPps();
_srs_pps_clock_35ms = new SrsPps();
_srs_pps_clock_40ms = new SrsPps();
_srs_pps_clock_80ms = new SrsPps();
_srs_pps_clock_160ms = new SrsPps();
_srs_pps_timer_s = new SrsPps();
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_thread_run = new SrsPps();
_srs_pps_thread_idle = new SrsPps();
_srs_pps_thread_yield = new SrsPps();
_srs_pps_thread_yield2 = new SrsPps();
#endif
_srs_pps_rpkts = new SrsPps();
_srs_pps_addrs = new SrsPps();
_srs_pps_fast_addrs = new SrsPps();
_srs_pps_spkts = new SrsPps();
_srs_pps_objs_msgs = new SrsPps();
_srs_pps_sstuns = new SrsPps();
_srs_pps_srtcps = new SrsPps();
_srs_pps_srtps = new SrsPps();
_srs_pps_rstuns = new SrsPps();
_srs_pps_rrtps = new SrsPps();
_srs_pps_rrtcps = new SrsPps();
_srs_pps_aloss2 = new SrsPps();
_srs_pps_pli = new SrsPps();
_srs_pps_twcc = new SrsPps();
_srs_pps_rr = new SrsPps();
_srs_pps_objs_rtps = new SrsPps();
_srs_pps_objs_rraw = new SrsPps();
_srs_pps_objs_rfua = new SrsPps();
_srs_pps_objs_rbuf = new SrsPps();
_srs_pps_objs_rothers = new SrsPps();
// Create global async worker for DVR.
_srs_dvr_async = new SrsAsyncCallWorker();
_srs_reload_err = srs_success;
_srs_reload_state = SrsReloadStateInit;
_srs_reload_id = srs_rand_gen_str(7);
return err;
}
SrsHybridServer *_srs_hybrid = NULL;

View File

@ -1,86 +0,0 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_APP_HYBRID_HPP
#define SRS_APP_HYBRID_HPP
#include <srs_core.hpp>
#include <vector>
#include <srs_app_hourglass.hpp>
class SrsServer;
class SrsServerAdapter;
class SrsWaitGroup;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
// The hibrid server interfaces, we could register many servers.
class ISrsHybridServer
{
public:
ISrsHybridServer();
virtual ~ISrsHybridServer();
public:
// Only ST initialized before each server, we could fork processes as such.
virtual srs_error_t initialize() = 0;
// Run each server, should never block except the SRS master server.
virtual srs_error_t run(SrsWaitGroup *wg) = 0;
// Stop each server, should do cleanup, for example, kill processes forked by server.
virtual void stop() = 0;
};
// The hybrid server manager.
class SrsHybridServer : public ISrsFastTimer
{
private:
std::vector<ISrsHybridServer *> servers;
SrsFastTimer *timer20ms_;
SrsFastTimer *timer100ms_;
SrsFastTimer *timer1s_;
SrsFastTimer *timer5s_;
SrsClockWallMonitor *clock_monitor_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
// for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
public:
virtual void register_server(ISrsHybridServer *svr);
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
public:
virtual SrsServerAdapter *srs();
SrsFastTimer *timer20ms();
SrsFastTimer *timer100ms();
SrsFastTimer *timer1s();
SrsFastTimer *timer5s();
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
private:
// Require the PID file for the whole process.
virtual srs_error_t acquire_pid_file();
};
extern SrsHybridServer *_srs_hybrid;
#endif

View File

@ -197,22 +197,9 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_wrap(err, "udp listener");
}
int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr *)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}
// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c && buf[19] == 0x63 && buf[20] == 0x6b) {
continue;
}
if ((err = handler->on_udp_packet((const sockaddr *)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}
if (SrsUdpPacketRecvCycleInterval > 0) {
@ -223,6 +210,31 @@ srs_error_t SrsUdpListener::cycle()
return err;
}
srs_error_t SrsUdpListener::do_cycle()
{
srs_error_t err = srs_success;
int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr *)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}
// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c && buf[19] == 0x63 && buf[20] == 0x6b) {
return err;
}
if ((err = handler->on_udp_packet((const sockaddr *)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}
return err;
}
SrsTcpListener::SrsTcpListener(ISrsTcpHandler *h)
{
handler = h;
@ -304,18 +316,30 @@ srs_error_t SrsTcpListener::cycle()
return srs_error_wrap(err, "tcp listener");
}
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if (fd == NULL) {
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
return err;
}
if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
srs_error_t SrsTcpListener::do_cycle()
{
srs_error_t err = srs_success;
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if (fd == NULL) {
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
return err;

View File

@ -114,6 +114,9 @@ public:
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};
// Bind and listen tcp port, use handler to process the client.
@ -145,6 +148,9 @@ public:
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};
// Bind and listen tcp port, use handler to process the client.

View File

@ -12,6 +12,7 @@
#include <srs_app_http_hooks.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_server.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
@ -28,7 +29,7 @@ using namespace std;
// To limit user to use too long password, to cause unknown issue.
#define SRS_ICE_PWD_MAX 32
SrsGoApiRtcPlay::SrsGoApiRtcPlay(SrsRtcServer *server)
SrsGoApiRtcPlay::SrsGoApiRtcPlay(SrsServer *server)
{
server_ = server;
security_ = new SrsSecurity();
@ -238,7 +239,7 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessa
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcConnection *session = NULL;
if ((err = server_->create_session(ruc, local_sdp, &session)) != srs_success) {
if ((err = server_->create_rtc_session(ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc->dtls_, ruc->srtp_, ruc->eip_.c_str());
}
@ -325,7 +326,7 @@ srs_error_t SrsGoApiRtcPlay::http_hooks_on_play(ISrsRequest *req)
return err;
}
SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsRtcServer *server)
SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsServer *server)
{
server_ = server;
security_ = new SrsSecurity();
@ -504,7 +505,7 @@ srs_error_t SrsGoApiRtcPublish::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMe
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
// We must do stat the client before hooks, because hooks depends on it.
SrsRtcConnection *session = NULL;
if ((err = server_->create_session(ruc, local_sdp, &session)) != srs_success) {
if ((err = server_->create_rtc_session(ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
@ -598,7 +599,7 @@ srs_error_t SrsGoApiRtcPublish::http_hooks_on_publish(ISrsRequest *req)
return err;
}
SrsGoApiRtcWhip::SrsGoApiRtcWhip(SrsRtcServer *server)
SrsGoApiRtcWhip::SrsGoApiRtcWhip(SrsServer *server)
{
server_ = server;
publish_ = new SrsGoApiRtcPublish(server);
@ -627,7 +628,7 @@ srs_error_t SrsGoApiRtcWhip::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessa
return srs_error_new(ERROR_RTC_INVALID_SESSION, "token empty");
}
SrsRtcConnection *session = server_->find_session_by_username(username);
SrsRtcConnection *session = server_->find_rtc_session_by_username(username);
if (session && token != session->token()) {
return srs_error_new(ERROR_RTC_INVALID_SESSION, "token %s not match", token.c_str());
}
@ -759,7 +760,7 @@ srs_error_t SrsGoApiRtcWhip::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMe
return err;
}
SrsGoApiRtcNACK::SrsGoApiRtcNACK(SrsRtcServer *server)
SrsGoApiRtcNACK::SrsGoApiRtcNACK(SrsServer *server)
{
server_ = server;
}
@ -802,7 +803,7 @@ srs_error_t SrsGoApiRtcNACK::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMe
return srs_error_new(ERROR_RTC_INVALID_PARAMS, "invalid drop=%s/%d", dropv.c_str(), drop);
}
SrsRtcConnection *session = server_->find_session_by_username(username);
SrsRtcConnection *session = server_->find_rtc_session_by_username(username);
if (!session) {
return srs_error_new(ERROR_RTC_NO_SESSION, "no session username=%s", username.c_str());
}

View File

@ -11,7 +11,7 @@
#include <srs_core.hpp>
#include <srs_protocol_http_stack.hpp>
class SrsRtcServer;
class SrsServer;
class ISrsRequest;
class SrsSdp;
class SrsRtcUserConfig;
@ -19,11 +19,11 @@ class SrsRtcUserConfig;
class SrsGoApiRtcPlay : public ISrsHttpHandler
{
private:
SrsRtcServer *server_;
SrsServer *server_;
SrsSecurity *security_;
public:
SrsGoApiRtcPlay(SrsRtcServer *server);
SrsGoApiRtcPlay(SrsServer *server);
virtual ~SrsGoApiRtcPlay();
public:
@ -45,11 +45,11 @@ private:
class SrsGoApiRtcPublish : public ISrsHttpHandler
{
private:
SrsRtcServer *server_;
SrsServer *server_;
SrsSecurity *security_;
public:
SrsGoApiRtcPublish(SrsRtcServer *server);
SrsGoApiRtcPublish(SrsServer *server);
virtual ~SrsGoApiRtcPublish();
public:
@ -72,12 +72,12 @@ private:
class SrsGoApiRtcWhip : public ISrsHttpHandler
{
private:
SrsRtcServer *server_;
SrsServer *server_;
SrsGoApiRtcPublish *publish_;
SrsGoApiRtcPlay *play_;
public:
SrsGoApiRtcWhip(SrsRtcServer *server);
SrsGoApiRtcWhip(SrsServer *server);
virtual ~SrsGoApiRtcWhip();
public:
@ -90,10 +90,10 @@ private:
class SrsGoApiRtcNACK : public ISrsHttpHandler
{
private:
SrsRtcServer *server_;
SrsServer *server_;
public:
SrsGoApiRtcNACK(SrsRtcServer *server);
SrsGoApiRtcNACK(SrsServer *server);
virtual ~SrsGoApiRtcNACK();
public:

View File

@ -442,7 +442,7 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection *s, const SrsContextId &cid)
SrsRtcPlayStream::~SrsRtcPlayStream()
{
if (req_) {
session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
}
_srs_config->unsubscribe(this);
@ -927,12 +927,12 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream *p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
_srs_server->timer1s()->subscribe(this);
}
SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer()
{
_srs_hybrid->timer1s()->unsubscribe(this);
_srs_server->timer1s()->unsubscribe(this);
}
srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
@ -963,12 +963,12 @@ srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream *p) : p_(p)
{
_srs_hybrid->timer100ms()->subscribe(this);
_srs_server->timer100ms()->subscribe(this);
}
SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
{
_srs_hybrid->timer100ms()->unsubscribe(this);
_srs_server->timer100ms()->unsubscribe(this);
}
srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
@ -1084,7 +1084,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection *session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
if (req_) {
session_->server_->exec_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
}
srs_freep(timer_rtcp_);
@ -1176,16 +1176,6 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript
track->set_nack_no_copy(nack_no_copy_);
}
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if ((err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
if (publish_token.get()) {
srs_trace("stream publish token acquired, type=rtc, url=%s", req_->get_stream_url().c_str());
}
// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) {
return srs_error_wrap(err, "create source");
@ -1734,12 +1724,12 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp &n
SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p)
{
_srs_hybrid->timer20ms()->subscribe(this);
_srs_server->timer20ms()->subscribe(this);
}
SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer()
{
_srs_hybrid->timer20ms()->unsubscribe(this);
_srs_server->timer20ms()->unsubscribe(this);
}
srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
@ -1771,7 +1761,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
return err;
}
SrsRtcConnection::SrsRtcConnection(SrsRtcServer *s, const SrsContextId &cid)
SrsRtcConnection::SrsRtcConnection(SrsServer *s, const SrsContextId &cid)
{
req_ = NULL;
cid_ = cid;
@ -1795,12 +1785,12 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer *s, const SrsContextId &cid)
nack_enabled_ = false;
timer_nack_ = new SrsRtcConnectionNackTimer(this);
_srs_rtc_manager->subscribe(this);
_srs_conn_manager->subscribe(this);
}
SrsRtcConnection::~SrsRtcConnection()
{
_srs_rtc_manager->unsubscribe(this);
_srs_conn_manager->unsubscribe(this);
srs_freep(timer_nack_);
@ -1912,7 +1902,7 @@ std::string SrsRtcConnection::desc()
void SrsRtcConnection::expire()
{
// TODO: FIXME: Should set session to expired and remove it by heartbeat checking. Should not remove it directly.
_srs_rtc_manager->remove(this);
_srs_conn_manager->remove(this);
}
void SrsRtcConnection::switch_to_context()
@ -2255,7 +2245,7 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc)
switch_to_context();
srs_trace("RTC: session destroy by DTLS alert(%s %s), username=%s", type.c_str(), desc.c_str(), username_.c_str());
_srs_rtc_manager->remove(this);
_srs_conn_manager->remove(this);
}
return err;

View File

@ -10,7 +10,6 @@
#include <srs_app_async_call.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_rtc_dtls.hpp>
@ -34,7 +33,7 @@
class SrsUdpMuxSocket;
class SrsLiveConsumer;
class SrsStunPacket;
class SrsRtcServer;
class SrsServer;
class SrsRtcConnection;
class SrsSharedPtrMessage;
class SrsRtcSource;
@ -270,6 +269,7 @@ public:
// Interface ISrsRtcSourceChangeCallback
public:
void on_stream_change(SrsRtcSourceDescription *desc);
public:
virtual const SrsContextId &context_id();
@ -479,7 +479,7 @@ public:
bool disposing_;
private:
SrsRtcServer *server_;
SrsServer *server_;
private:
iovec *cache_iov_;
@ -529,7 +529,7 @@ private:
bool nack_enabled_;
public:
SrsRtcConnection(SrsRtcServer *s, const SrsContextId &cid);
SrsRtcConnection(SrsServer *s, const SrsContextId &cid);
virtual ~SrsRtcConnection();
// interface ISrsDisposingHandler
public:

View File

@ -363,11 +363,11 @@ void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket *skt)
// If no cache, build cache and setup the relations in connection.
if (!addr_cache) {
peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly();
_srs_rtc_manager->add_with_id(peer_id, conn_);
_srs_conn_manager->add_with_id(peer_id, conn_);
uint64_t fast_id = skt->fast_id();
if (fast_id) {
_srs_rtc_manager->add_with_fast_id(fast_id, conn_);
_srs_conn_manager->add_with_fast_id(fast_id, conn_);
}
}
@ -888,7 +888,7 @@ srs_error_t SrsRtcTcpConn::handshake()
}
srs_assert(!session_);
SrsRtcConnection *session = dynamic_cast<SrsRtcConnection *>(_srs_rtc_manager->find_by_name(ping.get_username()));
SrsRtcConnection *session = dynamic_cast<SrsRtcConnection *>(_srs_conn_manager->find_by_name(ping.get_username()));
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session) {
return srs_error_new(ERROR_RTC_TCP_STUN, "no session, stun username=%s", ping.get_username().c_str());

View File

@ -32,9 +32,9 @@ using namespace std;
#include <srs_protocol_utility.hpp>
extern SrsPps *_srs_pps_rpkts;
SrsPps *_srs_pps_rstuns = NULL;
SrsPps *_srs_pps_rrtps = NULL;
SrsPps *_srs_pps_rrtcps = NULL;
extern SrsPps *_srs_pps_rstuns;
extern SrsPps *_srs_pps_rrtps;
extern SrsPps *_srs_pps_rrtcps;
extern SrsPps *_srs_pps_addrs;
extern SrsPps *_srs_pps_fast_addrs;
@ -216,7 +216,7 @@ srs_error_t api_server_as_candidates(string api, set<string> &candidate_ips)
return err;
}
static set<string> discover_candidates(SrsRtcUserConfig *ruc)
set<string> discover_candidates(SrsRtcUserConfig *ruc)
{
srs_error_t err = srs_success;
@ -306,540 +306,3 @@ SrsRtcUserConfig::~SrsRtcUserConfig()
{
srs_freep(req_);
}
SrsRtcServer::SrsRtcServer()
{
async = new SrsAsyncCallWorker();
_srs_config->subscribe(this);
}
SrsRtcServer::~SrsRtcServer()
{
_srs_config->unsubscribe(this);
if (true) {
vector<SrsUdpMuxListener *>::iterator it;
for (it = listeners.begin(); it != listeners.end(); ++it) {
SrsUdpMuxListener *listener = *it;
srs_freep(listener);
}
}
async->stop();
srs_freep(async);
}
srs_error_t SrsRtcServer::initialize()
{
srs_error_t err = srs_success;
// The RTC server start a timer, do routines of RTC server.
// @see SrsRtcServer::on_timer()
_srs_hybrid->timer5s()->subscribe(this);
// Initialize the black hole.
if ((err = _srs_blackhole->initialize()) != srs_success) {
return srs_error_wrap(err, "black hole");
}
async->start();
return err;
}
srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask *t)
{
return async->execute(t);
}
srs_error_t SrsRtcServer::listen_udp()
{
srs_error_t err = srs_success;
if (!_srs_config->get_rtc_server_enabled()) {
return err;
}
// Check protocol setting - only create UDP listeners if protocol allows UDP
string protocol = _srs_config->get_rtc_server_protocol();
if (protocol == "tcp") {
// When protocol is "tcp", don't create UDP listeners
return err;
}
vector<string> rtc_listens = _srs_config->get_rtc_server_listens();
if (rtc_listens.empty()) {
return srs_error_new(ERROR_RTC_PORT, "empty rtc listen");
}
// There should be no listeners before listening.
srs_assert(listeners.empty());
// For each listen address, create multiple listeners based on reuseport setting
int nn_listeners = _srs_config->get_rtc_server_reuseport();
for (int j = 0; j < (int)rtc_listens.size(); j++) {
string ip;
int port;
srs_net_split_for_listener(rtc_listens[j], ip, port);
if (port <= 0) {
return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);
}
for (int i = 0; i < nn_listeners; i++) {
SrsUdpMuxListener *listener = new SrsUdpMuxListener(this, ip, port);
if ((err = listener->listen()) != srs_success) {
srs_freep(listener);
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
srs_trace("WebRTC listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd());
listeners.push_back(listener);
}
}
return err;
}
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket *skt)
{
srs_error_t err = srs_success;
SrsRtcConnection *session = NULL;
char *data = skt->data();
int size = skt->size();
bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t *)data, size);
bool is_rtcp = srs_is_rtcp((uint8_t *)data, size);
uint64_t fast_id = skt->fast_id();
// Try fast id first, if not found, search by long peer id.
if (fast_id) {
session = (SrsRtcConnection *)_srs_rtc_manager->find_by_fast_id(fast_id);
}
if (!session) {
string peer_id = skt->peer_id();
session = (SrsRtcConnection *)_srs_rtc_manager->find_by_id(peer_id);
}
if (session) {
// When got any packet, the session is alive now.
session->alive();
}
// For STUN, the peer address may change.
if (!is_rtp_or_rtcp && srs_is_stun((uint8_t *)data, size)) {
++_srs_pps_rstuns->sugar;
string peer_id = skt->peer_id();
// TODO: FIXME: Should support ICE renomination, to switch network between candidates.
SrsStunPacket ping;
if ((err = ping.decode(data, size)) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
if (!session) {
session = find_session_by_username(ping.get_username());
}
if (session) {
session->switch_to_context();
}
srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session) {
return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64,
ping.get_username().c_str(), peer_id.c_str(), fast_id);
}
// For each binding request, update the UDP socket.
if (ping.is_binding_request()) {
session->udp()->update_sendonly_socket(skt);
}
return session->udp()->on_stun(&ping, data, size);
}
// For DTLS, RTCP or RTP, which does not support peer address changing.
if (!session) {
string peer_id = skt->peer_id();
return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id);
}
// Note that we don't(except error) switch to the context of session, for performance issue.
if (is_rtp_or_rtcp && !is_rtcp) {
++_srs_pps_rrtps->sugar;
err = session->udp()->on_rtp(data, size);
if (err != srs_success) {
session->switch_to_context();
}
return err;
}
session->switch_to_context();
if (is_rtp_or_rtcp && is_rtcp) {
++_srs_pps_rrtcps->sugar;
return session->udp()->on_rtcp(data, size);
}
if (srs_is_dtls((uint8_t *)data, size)) {
++_srs_pps_rstuns->sugar;
return session->udp()->on_dtls(data, size);
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
srs_error_t SrsRtcServer::listen_api()
{
srs_error_t err = srs_success;
// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux *http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play");
}
if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
// Generally, WHIP is a publishing protocol, but it can be also used as playing.
// See https://datatracker.ietf.org/doc/draft-ietf-wish-whep/
if ((err = http_api_mux->handle("/rtc/v1/whip/", new SrsGoApiRtcWhip(this))) != srs_success) {
return srs_error_wrap(err, "handle whip");
}
// We create another mount, to support play with the same query string as publish.
// See https://datatracker.ietf.org/doc/draft-murillo-whep/
if ((err = http_api_mux->handle("/rtc/v1/whip-play/", new SrsGoApiRtcWhip(this))) != srs_success) {
return srs_error_wrap(err, "handle whep play");
}
if ((err = http_api_mux->handle("/rtc/v1/whep/", new SrsGoApiRtcWhip(this))) != srs_success) {
return srs_error_wrap(err, "handle whep play");
}
#ifdef SRS_SIMULATOR
if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {
return srs_error_wrap(err, "handle nack");
}
#endif
return err;
}
srs_error_t SrsRtcServer::create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession)
{
srs_error_t err = srs_success;
SrsContextId cid = _srs_context->get_id();
ISrsRequest *req = ruc->req_;
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if (ruc->publish_ && !source->can_publish()) {
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());
}
// TODO: FIXME: add do_create_session to error process.
SrsRtcConnection *session = new SrsRtcConnection(this, cid);
if ((err = do_create_session(ruc, local_sdp, session)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "create session");
}
*psession = session;
return err;
}
srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session)
{
srs_error_t err = srs_success;
ISrsRequest *req = ruc->req_;
// first add publisher/player for negotiate sdp media info
if (ruc->publish_) {
if ((err = session->add_publisher(ruc, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add publisher");
}
} else {
if ((err = session->add_player(ruc, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add player");
}
}
// All tracks default as inactive, so we must enable them.
session->set_all_tracks_status(req->get_stream_url(), ruc->publish_, true);
std::string local_pwd = ruc->req_->ice_pwd_.empty() ? srs_rand_gen_str(32) : ruc->req_->ice_pwd_;
std::string local_ufrag = ruc->req_->ice_ufrag_.empty() ? srs_rand_gen_str(8) : ruc->req_->ice_ufrag_;
// TODO: FIXME: Rename for a better name, it's not an username.
std::string username = "";
while (true) {
username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag();
if (!_srs_rtc_manager->find_by_name(username)) {
break;
}
// Username conflict, regenerate a new one.
local_ufrag = srs_rand_gen_str(8);
}
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());
// We allows to mock the eip of server.
if (true) {
// TODO: Support multiple listen ports.
int udp_port = 0;
if (true) {
string udp_host;
string udp_hostport = _srs_config->get_rtc_server_listens().at(0);
srs_net_split_for_listener(udp_hostport, udp_host, udp_port);
}
int tcp_port = 0;
if (true) {
string tcp_host;
string tcp_hostport = _srs_config->get_rtc_server_tcp_listens().at(0);
srs_net_split_for_listener(tcp_hostport, tcp_host, tcp_port);
}
string protocol = _srs_config->get_rtc_server_protocol();
set<string> candidates = discover_candidates(ruc);
for (set<string>::iterator it = candidates.begin(); it != candidates.end(); ++it) {
string hostname;
int uport = udp_port;
srs_net_split_hostport(*it, hostname, uport);
int tport = tcp_port;
srs_net_split_hostport(*it, hostname, tport);
if (protocol == "udp") {
local_sdp.add_candidate("udp", hostname, uport, "host");
} else if (protocol == "tcp") {
local_sdp.add_candidate("tcp", hostname, tport, "host");
} else {
local_sdp.add_candidate("udp", hostname, uport, "host");
local_sdp.add_candidate("tcp", hostname, tport, "host");
}
}
vector<string> v = vector<string>(candidates.begin(), candidates.end());
srs_trace("RTC: Use candidates %s, protocol=%s, tcp_port=%d, udp_port=%d",
srs_strings_join(v, ", ").c_str(), protocol.c_str(), tcp_port, udp_port);
}
// Setup the negotiate DTLS by config.
local_sdp.session_negotiate_ = local_sdp.session_config_;
// Setup the negotiate DTLS role.
if (ruc->remote_sdp_.get_dtls_role() == "active") {
local_sdp.session_negotiate_.dtls_role = "passive";
} else if (ruc->remote_sdp_.get_dtls_role() == "passive") {
local_sdp.session_negotiate_.dtls_role = "active";
} else if (ruc->remote_sdp_.get_dtls_role() == "actpass") {
local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role;
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
// is 'active' in the offer and 'passive' in the answer.
local_sdp.session_negotiate_.dtls_role = "passive";
}
local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role);
session->set_remote_sdp(ruc->remote_sdp_);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state_as_waiting_stun();
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) {
return srs_error_wrap(err, "init");
}
// We allows username is optional, but it never empty here.
_srs_rtc_manager->add_with_name(username, session);
return err;
}
SrsRtcConnection *SrsRtcServer::find_session_by_username(const std::string &username)
{
ISrsResource *conn = _srs_rtc_manager->find_by_name(username);
return dynamic_cast<SrsRtcConnection *>(conn);
}
srs_error_t SrsRtcServer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
// Alive RTC sessions, for stat.
int nn_rtc_conns = 0;
// Check all sessions and dispose the dead sessions.
for (int i = 0; i < (int)_srs_rtc_manager->size(); i++) {
SrsRtcConnection *session = dynamic_cast<SrsRtcConnection *>(_srs_rtc_manager->at(i));
// Ignore not session, or already disposing.
if (!session || session->disposing_) {
continue;
}
// Update stat if session is alive.
if (session->is_alive()) {
nn_rtc_conns++;
SrsStatistic::instance()->kbps_add_delta(session->get_id().c_str(), session->delta());
continue;
}
SrsContextRestore(_srs_context->get_id());
session->switch_to_context();
string username = session->username();
srs_trace("RTC: session destroy by timeout, username=%s", username.c_str());
// Use manager to free session and notify other objects.
_srs_rtc_manager->remove(session);
}
// Ignore stats if no RTC connections.
if (!nn_rtc_conns) {
return err;
}
static char buf[128];
string rpkts_desc;
_srs_pps_rpkts->update();
_srs_pps_rrtps->update();
_srs_pps_rstuns->update();
_srs_pps_rrtcps->update();
if (_srs_pps_rpkts->r10s() || _srs_pps_rrtps->r10s() || _srs_pps_rstuns->r10s() || _srs_pps_rrtcps->r10s()) {
snprintf(buf, sizeof(buf), ", rpkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_rpkts->r10s(), _srs_pps_rrtps->r10s(), _srs_pps_rstuns->r10s(), _srs_pps_rrtcps->r10s());
rpkts_desc = buf;
}
string spkts_desc;
_srs_pps_spkts->update();
_srs_pps_srtps->update();
_srs_pps_sstuns->update();
_srs_pps_srtcps->update();
if (_srs_pps_spkts->r10s() || _srs_pps_srtps->r10s() || _srs_pps_sstuns->r10s() || _srs_pps_srtcps->r10s()) {
snprintf(buf, sizeof(buf), ", spkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_spkts->r10s(), _srs_pps_srtps->r10s(), _srs_pps_sstuns->r10s(), _srs_pps_srtcps->r10s());
spkts_desc = buf;
}
string rtcp_desc;
_srs_pps_pli->update();
_srs_pps_twcc->update();
_srs_pps_rr->update();
if (_srs_pps_pli->r10s() || _srs_pps_twcc->r10s() || _srs_pps_rr->r10s()) {
snprintf(buf, sizeof(buf), ", rtcp=(pli:%d,twcc:%d,rr:%d)", _srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s());
rtcp_desc = buf;
}
string snk_desc;
_srs_pps_snack->update();
_srs_pps_snack2->update();
_srs_pps_sanack->update();
_srs_pps_svnack->update();
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) {
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s());
snk_desc = buf;
}
string rnk_desc;
_srs_pps_rnack->update();
_srs_pps_rnack2->update();
_srs_pps_rhnack->update();
_srs_pps_rmnack->update();
if (_srs_pps_rnack->r10s() || _srs_pps_rnack2->r10s() || _srs_pps_rhnack->r10s() || _srs_pps_rmnack->r10s()) {
snprintf(buf, sizeof(buf), ", rnk=(%d,%d,h:%d,m:%d)", _srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s(), _srs_pps_rhnack->r10s(), _srs_pps_rmnack->r10s());
rnk_desc = buf;
}
string loss_desc;
SrsSnmpUdpStat *s = srs_get_udp_snmp_stat();
if (s->rcv_buf_errors_delta || s->snd_buf_errors_delta) {
snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d)", s->rcv_buf_errors_delta, s->snd_buf_errors_delta);
loss_desc = buf;
}
string fid_desc;
_srs_pps_ids->update();
_srs_pps_fids->update();
_srs_pps_fids_level0->update();
_srs_pps_addrs->update();
_srs_pps_fast_addrs->update();
if (_srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()) {
snprintf(buf, sizeof(buf), ", fid=(id:%d,fid:%d,ffid:%d,addr:%d,faddr:%d)", _srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s());
fid_desc = buf;
}
srs_trace("RTC: Server conns=%u%s%s%s%s%s%s%s",
nn_rtc_conns,
rpkts_desc.c_str(), spkts_desc.c_str(), rtcp_desc.c_str(), snk_desc.c_str(), rnk_desc.c_str(), loss_desc.c_str(), fid_desc.c_str());
return err;
}
SrsRtcServerAdapter::SrsRtcServerAdapter()
{
rtc = new SrsRtcServer();
}
SrsRtcServerAdapter::~SrsRtcServerAdapter()
{
srs_freep(rtc);
}
srs_error_t SrsRtcServerAdapter::initialize()
{
srs_error_t err = srs_success;
if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc dtls certificate initialize");
}
if ((err = rtc->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}
return err;
}
srs_error_t SrsRtcServerAdapter::run(SrsWaitGroup *wg)
{
srs_error_t err = srs_success;
if ((err = rtc->listen_udp()) != srs_success) {
return srs_error_wrap(err, "listen udp");
}
if ((err = rtc->listen_api()) != srs_success) {
return srs_error_wrap(err, "listen api");
}
if ((err = _srs_rtc_manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
return err;
}
void SrsRtcServerAdapter::stop()
{
}
SrsResourceManager *_srs_rtc_manager = NULL;

View File

@ -11,12 +11,12 @@
#include <srs_app_async_call.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_rtc_sdp.hpp>
#include <srs_app_st.hpp>
#include <set>
#include <string>
class SrsRtcServer;
@ -26,7 +26,6 @@ class ISrsRequest;
class SrsSdp;
class SrsRtcSource;
class SrsResourceManager;
class SrsWaitGroup;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@ -82,62 +81,11 @@ public:
virtual ~SrsRtcUserConfig();
};
// The RTC server instance, listen UDP port, handle UDP packet, manage RTC connections.
class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrsReloadHandler
{
private:
std::vector<SrsUdpMuxListener *> listeners;
SrsAsyncCallWorker *async;
public:
SrsRtcServer();
virtual ~SrsRtcServer();
public:
virtual srs_error_t initialize();
public:
srs_error_t exec_async_work(ISrsAsyncCallTask *t);
public:
// TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload.
srs_error_t listen_udp();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt);
srs_error_t listen_api();
public:
// Peer start offering, we answer it.
srs_error_t create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession);
private:
srs_error_t do_create_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session);
public:
SrsRtcConnection *find_session_by_username(const std::string &ufrag);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};
// The RTC server adapter.
class SrsRtcServerAdapter : public ISrsHybridServer
{
private:
SrsRtcServer *rtc;
public:
SrsRtcServerAdapter();
virtual ~SrsRtcServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run(SrsWaitGroup *wg);
virtual void stop();
};
// Discover the candidates for RTC server.
extern std::set<std::string> discover_candidates(SrsRtcUserConfig *ruc);
// Manager for RTC connections.
extern SrsResourceManager *_srs_rtc_manager;
extern SrsResourceManager *_srs_conn_manager;
// The dns resolve utility, return the resolved ip address.
extern std::string srs_dns_resolve(std::string host, int &family);

View File

@ -16,6 +16,7 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_queue.hpp>
#include <srs_app_server.hpp>
#include <srs_app_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_core_autofree.hpp>
@ -53,6 +54,7 @@ SrsPps *_srs_pps_rhnack = NULL;
SrsPps *_srs_pps_rmnack = NULL;
extern SrsPps *_srs_pps_aloss2;
extern SrsServer *_srs_server;
static const int kAudioChannel = 2;
static const int kAudioSamplerate = 48000;
@ -689,7 +691,7 @@ srs_error_t SrsRtcSource::on_publish()
pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
// @see SrsRtcSource::on_timer()
_srs_hybrid->timer100ms()->subscribe(this);
_srs_server->timer100ms()->subscribe(this);
}
SrsStatistic *stat = SrsStatistic::instance();
@ -723,7 +725,7 @@ void SrsRtcSource::on_unpublish()
// free bridge resource
if (bridge_) {
// For SrsRtcSource::on_timer()
_srs_hybrid->timer100ms()->unsubscribe(this);
_srs_server->timer100ms()->unsubscribe(this);
#ifdef SRS_FFMPEG_FIT
frame_builder_->on_unpublish();

View File

@ -184,7 +184,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip
server = svr;
transport_ = transport;
manager = svr;
manager = _srs_conn_manager;
ip = cip;
port = cport;
create_time = srsu2ms(srs_time_now_cached());

View File

@ -8,7 +8,6 @@
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtsp_conn.hpp>
#include <srs_app_statistic.hpp>

File diff suppressed because it is too large Load Diff

View File

@ -15,13 +15,25 @@
#include <srs_app_conn.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_source.hpp>
#include <srs_app_st.hpp>
#include <srs_protocol_st.hpp>
#ifdef SRS_SRT
#include <srs_app_srt_listener.hpp>
#include <srs_protocol_srt.hpp>
#endif
class SrsAsyncCallWorker;
class SrsUdpMuxListener;
class SrsUdpMuxSocket;
class SrsRtcUserConfig;
class SrsSdp;
class SrsRtcConnection;
class ISrsAsyncCallTask;
class SrsServer;
class ISrsHttpServeMux;
class SrsHttpServer;
@ -36,13 +48,28 @@ class SrsTcpListener;
class SrsAppCasterFlv;
class SrsResourceManager;
class SrsLatestVersion;
class SrsWaitGroup;
class SrsMultipleTcpListeners;
class SrsHttpFlvListener;
class SrsUdpCasterListener;
class SrsGbListener;
class SrsRtmpTransport;
class SrsRtmpsTransport;
class SrsSrtAcceptor;
class SrsSrtEventLoop;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
// Interface for SRT client acceptance
class ISrsSrtClientHandler
{
public:
ISrsSrtClientHandler();
virtual ~ISrsSrtClientHandler();
public:
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd);
};
// Convert signal to io,
// @see: st-1.9/docs/notes.html
@ -97,29 +124,39 @@ public:
virtual srs_error_t cycle();
};
// TODO: FIXME: Rename to SrsLiveServer.
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler, public ISrsTcpHandler, public ISrsResourceManager, public ISrsCoroutineHandler, public ISrsHourGlass
class SrsServer : public ISrsReloadHandler, // Reload framework for permormance optimization.
public ISrsLiveSourceHandler,
public ISrsTcpHandler,
public ISrsHourGlass,
public ISrsSrtClientHandler,
public ISrsUdpMuxHandler,
public ISrsFastTimer
{
private:
// TODO: FIXME: Extract an HttpApiServer.
ISrsHttpServeMux *http_api_mux;
SrsHttpServer *http_server;
ISrsHttpServeMux *http_api_mux_;
SrsHttpServer *http_server_;
private:
SrsHttpHeartbeat *http_heartbeat;
SrsIngester *ingester;
SrsResourceManager *conn_manager;
SrsCoroutine *trd_;
SrsHttpHeartbeat *http_heartbeat_;
SrsIngester *ingester_;
SrsHourGlass *timer_;
SrsWaitGroup *wg_;
private:
// Global shared timers moved from SrsHybridServer
SrsFastTimer *timer20ms_;
SrsFastTimer *timer100ms_;
SrsFastTimer *timer1s_;
SrsFastTimer *timer5s_;
SrsClockWallMonitor *clock_monitor_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
// for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd;
int pid_fd_;
private:
// If reusing, HTTP API use the same port of HTTP server.
@ -157,19 +194,28 @@ private:
// Stream Caster for GB28181.
SrsGbListener *stream_caster_gb28181_;
#endif
#ifdef SRS_SRT
// SRT acceptors for MPEG-TS over SRT.
std::vector<SrsSrtAcceptor *> srt_acceptors_;
#endif
// WebRTC UDP listeners for RTC server functionality.
std::vector<SrsUdpMuxListener *> rtc_listeners_;
// WebRTC async call worker for non-blocking operations.
SrsAsyncCallWorker *rtc_async_;
private:
// Signal manager which convert gignal to io message.
SrsSignalManager *signal_manager;
SrsSignalManager *signal_manager_;
// To query the latest available version of SRS.
SrsLatestVersion *latest_version_;
// User send the signal, convert to variable.
bool signal_reload;
bool signal_persistence_config;
bool signal_gmc_stop;
bool signal_fast_quit;
bool signal_gracefully_quit;
bool signal_reload_;
bool signal_persistence_config_;
bool signal_gmc_stop_;
bool signal_fast_quit_;
bool signal_gracefully_quit_;
// Parent pid for asprocess.
int ppid;
int ppid_;
public:
SrsServer();
@ -186,11 +232,21 @@ private:
// Close listener to stop accepting new connections,
// then wait and quit when all connections finished.
virtual void gracefully_dispose();
// server startup workflow, @see run_master()
public:
// Initialize server with callback handler ch.
// @remark user must free the handler.
virtual srs_error_t initialize();
private:
// Require the PID file for the whole process.
virtual srs_error_t acquire_pid_file();
public:
srs_error_t run();
private:
virtual srs_error_t initialize_st();
virtual srs_error_t initialize_signal();
virtual srs_error_t listen();
@ -199,11 +255,12 @@ public:
virtual srs_error_t ingest();
public:
virtual srs_error_t start(SrsWaitGroup *wg);
void stop();
// interface ISrsCoroutineHandler
public:
private:
virtual srs_error_t cycle();
// server utilities.
public:
// The callback for signal manager got a signal.
@ -226,6 +283,7 @@ private:
// update the global static data, for instance, the current time,
// the cpu/mem/network statistic.
virtual srs_error_t do_cycle();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
@ -234,48 +292,65 @@ private:
private:
// Resample the server kbs.
virtual void resample_kbps();
// For internal only
#ifdef SRS_SRT
// SRT-related methods
virtual srs_error_t listen_srt_mpegts();
virtual void close_srt_listeners();
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd);
virtual srs_error_t srt_fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr);
#endif
// WebRTC-related methods
virtual srs_error_t listen_rtc_udp();
// Interface ISrsUdpMuxHandler
public:
// TODO: FIXME: Fetch from hybrid server manager.
virtual ISrsHttpServeMux *api_server();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt);
private:
virtual srs_error_t listen_rtc_api();
public:
virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t);
virtual SrsRtcConnection *find_rtc_session_by_username(const std::string &ufrag);
virtual srs_error_t create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession);
private:
virtual srs_error_t do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session);
private:
virtual srs_error_t srs_update_rtc_sessions();
// Interface ISrsTcpHandler
public:
virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd);
private:
virtual srs_error_t do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stfd);
virtual srs_error_t on_before_connection(srs_netfd_t &stfd, const std::string &ip, int port);
// Interface ISrsResourceManager
public:
// A callback for connection to remove itself.
// When connection thread cycle terminated, callback this to delete connection.
// @see SrsTcpConnection.on_thread_stop().
virtual void remove(ISrsResource *c);
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_before_connection(const char *label, int fd, const std::string &ip, int port);
// Interface ISrsLiveSourceHandler
public:
virtual srs_error_t on_publish(ISrsRequest *r);
virtual void on_unpublish(ISrsRequest *r);
};
// The SRS server adapter, the master server.
class SrsServerAdapter : public ISrsHybridServer
{
public:
// Access to global shared timers
SrsFastTimer *timer20ms();
SrsFastTimer *timer100ms();
SrsFastTimer *timer1s();
SrsFastTimer *timer5s();
// interface ISrsFastTimer for statistics reporting
private:
SrsServer *srs;
public:
SrsServerAdapter();
virtual ~SrsServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run(SrsWaitGroup *wg);
virtual void stop();
public:
virtual SrsServer *instance();
virtual srs_error_t on_timer(srs_utime_t interval);
};
// @global main SRS server, for debugging
extern SrsServer *_srs_server;
// Manager for RTC connections.
extern SrsResourceManager *_srs_conn_manager;
#endif

View File

@ -19,7 +19,6 @@ using namespace std;
#include <srs_app_hds.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_ng_exec.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_server.hpp>
@ -2332,7 +2331,7 @@ srs_error_t SrsLiveSource::on_publish()
}
// notify the handler.
ISrsLiveSourceHandler *handler = _srs_hybrid->srs()->instance();
ISrsLiveSourceHandler *handler = _srs_server;
srs_assert(handler);
if ((err = handler->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "handle publish");
@ -2381,7 +2380,7 @@ void SrsLiveSource::on_unpublish()
_source_id = SrsContextId();
// notify the handler.
ISrsLiveSourceHandler *handler = _srs_hybrid->srs()->instance();
ISrsLiveSourceHandler *handler = _srs_server;
srs_assert(handler);
SrsStatistic *stat = SrsStatistic::instance();

View File

@ -152,12 +152,12 @@ srs_error_t SrsSrtRecvThread::get_recv_err()
return srs_error_copy(recv_err_);
}
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer *srt_server, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource())
SrsMpegtsSrtConn::SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource())
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
srt_server_ = srt_server;
resource_manager_ = resource_manager;
srt_fd_ = srt_fd;
srt_conn_ = new SrsSrtConnection(srt_fd_);
@ -235,7 +235,7 @@ srs_error_t SrsMpegtsSrtConn::cycle()
// Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
srt_server_->remove(this);
resource_manager_->remove(this);
// success.
if (err == srs_success) {

View File

@ -80,7 +80,7 @@ private:
class SrsMpegtsSrtConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsExpire
{
public:
SrsMpegtsSrtConn(SrsSrtServer *srt_server, srs_srt_t srt_fd, std::string ip, int port);
SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_srt_t srt_fd, std::string ip, int port);
virtual ~SrsMpegtsSrtConn();
// Interface ISrsResource.
public:
@ -125,7 +125,7 @@ private:
void http_hooks_on_stop();
private:
SrsSrtServer *srt_server_;
ISrsResourceManager *resource_manager_;
srs_srt_t srt_fd_;
SrsSrtConnection *srt_conn_;
SrsNetworkDelta *delta_;

View File

@ -19,10 +19,10 @@ using namespace std;
SrsSrtEventLoop *_srt_eventloop = NULL;
#endif
SrsSrtAcceptor::SrsSrtAcceptor(SrsSrtServer *srt_server)
SrsSrtAcceptor::SrsSrtAcceptor(ISrsSrtClientHandler *srt_handler)
{
port_ = 0;
srt_server_ = srt_server;
srt_handler_ = srt_handler;
listener_ = NULL;
}
@ -132,8 +132,8 @@ srs_error_t SrsSrtAcceptor::on_srt_client(srs_srt_t srt_fd)
{
srs_error_t err = srs_success;
// Notify srt server to accept srt client, and create new SrsSrtConn on it.
if ((err = srt_server_->accept_srt_client(srt_fd)) != srs_success) {
// Notify srt handler to accept srt client, and create new SrsSrtConn on it.
if ((err = srt_handler_->accept_srt_client(srt_fd)) != srs_success) {
srs_warn("accept srt client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
@ -141,254 +141,6 @@ srs_error_t SrsSrtAcceptor::on_srt_client(srs_srt_t srt_fd)
return err;
}
SrsSrtServer::SrsSrtServer()
{
conn_manager_ = new SrsResourceManager("SRT", true);
timer_ = NULL;
}
SrsSrtServer::~SrsSrtServer()
{
srs_freep(conn_manager_);
srs_freep(timer_);
}
srs_error_t SrsSrtServer::initialize()
{
srs_error_t err = srs_success;
if (!_srs_config->get_srt_enabled()) {
return err;
}
if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
return err;
}
srs_error_t SrsSrtServer::listen()
{
srs_error_t err = srs_success;
// Listen mpegts over srt.
if ((err = listen_srt_mpegts()) != srs_success) {
return srs_error_wrap(err, "srt mpegts listen");
}
if ((err = conn_manager_->start()) != srs_success) {
return srs_error_wrap(err, "srt connection manager");
}
return err;
}
srs_error_t SrsSrtServer::listen_srt_mpegts()
{
srs_error_t err = srs_success;
if (!_srs_config->get_srt_enabled()) {
return err;
}
// Close all listener for SRT if exists.
close_listeners();
// Start listeners for SRT, support multiple addresses including IPv6.
vector<string> srt_listens = _srs_config->get_srt_listens();
for (int i = 0; i < (int)srt_listens.size(); i++) {
SrsSrtAcceptor *acceptor = new SrsSrtAcceptor(this);
int port;
string ip;
srs_net_split_for_listener(srt_listens[i], ip, port);
if ((err = acceptor->listen(ip, port)) != srs_success) {
srs_freep(acceptor);
srs_warn("srt listen %s:%d failed, err=%s", ip.c_str(), port, srs_error_desc(err).c_str());
srs_error_reset(err);
continue;
}
acceptors_.push_back(acceptor);
}
// Check if at least one listener succeeded
if (acceptors_.empty()) {
return srs_error_new(ERROR_SOCKET_LISTEN, "no srt listeners available");
}
return err;
}
void SrsSrtServer::close_listeners()
{
std::vector<SrsSrtAcceptor *>::iterator it;
for (it = acceptors_.begin(); it != acceptors_.end();) {
SrsSrtAcceptor *acceptor = *it;
srs_freep(acceptor);
it = acceptors_.erase(it);
}
}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd)
{
srs_error_t err = srs_success;
ISrsResource *resource = NULL;
if ((err = fd_to_resource(srt_fd, &resource)) != srs_success) {
// close fd on conn error, otherwise will lead to fd leak -gs
// TODO: FIXME: Handle error.
srs_srt_close(srt_fd);
return srs_error_wrap(err, "srt fd to resource");
}
srs_assert(resource);
// directly enqueue, the cycle thread will remove the client.
conn_manager_->add(resource);
// Note that conn is managed by conn_manager, so we don't need to free it.
ISrsStartable *conn = dynamic_cast<ISrsStartable *>(resource);
if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start srt conn coroutine");
}
return err;
}
srs_error_t SrsSrtServer::fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr)
{
srs_error_t err = srs_success;
string ip = "";
int port = 0;
if ((err = srs_srt_get_remote_ip_port(srt_fd, ip, port)) != srs_success) {
return srs_error_wrap(err, "get srt ip port");
}
// TODO: FIXME: need to check max connection?
// The context id may change during creating the bellow objects.
SrsContextRestore(_srs_context->get_id());
// Covert to SRT conection.
*pr = new SrsMpegtsSrtConn(this, srt_fd, ip, port);
return err;
}
void SrsSrtServer::remove(ISrsResource *c)
{
// use manager to free it async.
conn_manager_->remove(c);
}
srs_error_t SrsSrtServer::setup_ticks()
{
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("srt", this, 1 * SRS_UTIME_SECONDS);
if (_srs_config->get_stats_enabled()) {
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsSrtServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
switch (event) {
case 8:
resample_kbps();
break;
}
return err;
}
void SrsSrtServer::resample_kbps()
{
// collect delta from all clients.
for (int i = 0; i < (int)conn_manager_->size(); i++) {
ISrsResource *c = conn_manager_->at(i);
SrsMpegtsSrtConn *conn = dynamic_cast<SrsMpegtsSrtConn *>(c);
srs_assert(conn);
// add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat.
SrsStatistic::instance()->kbps_add_delta(c->get_id().c_str(), conn->delta());
}
}
SrsSrtServerAdapter::SrsSrtServerAdapter()
{
srt_server_ = new SrsSrtServer();
}
SrsSrtServerAdapter::~SrsSrtServerAdapter()
{
srs_freep(srt_server_);
}
srs_error_t SrsSrtServerAdapter::initialize()
{
srs_error_t err = srs_success;
if ((err = srs_srt_log_initialize()) != srs_success) {
return srs_error_wrap(err, "srt log initialize");
}
_srt_eventloop = new SrsSrtEventLoop();
if ((err = _srt_eventloop->initialize()) != srs_success) {
return srs_error_wrap(err, "srt poller initialize");
}
if ((err = _srt_eventloop->start()) != srs_success) {
return srs_error_wrap(err, "srt poller start");
}
return err;
}
srs_error_t SrsSrtServerAdapter::run(SrsWaitGroup *wg)
{
srs_error_t err = srs_success;
// Initialize the whole system, set hooks to handle server level events.
if ((err = srt_server_->initialize()) != srs_success) {
return srs_error_wrap(err, "srt server initialize");
}
if ((err = srt_server_->listen()) != srs_success) {
return srs_error_wrap(err, "srt listen");
}
return err;
}
void SrsSrtServerAdapter::stop()
{
}
SrsSrtServer *SrsSrtServerAdapter::instance()
{
return srt_server_;
}
SrsSrtEventLoop::SrsSrtEventLoop()
{
srt_poller_ = NULL;

View File

@ -15,6 +15,7 @@
class SrsSrtServer;
class SrsHourGlass;
class ISrsSrtClientHandler;
// A common srt acceptor, for SRT server.
class SrsSrtAcceptor : public ISrsSrtHandler
@ -22,13 +23,13 @@ class SrsSrtAcceptor : public ISrsSrtHandler
private:
std::string ip_;
int port_;
SrsSrtServer *srt_server_;
ISrsSrtClientHandler *srt_handler_;
private:
SrsSrtListener *listener_;
public:
SrsSrtAcceptor(SrsSrtServer *srt_server);
SrsSrtAcceptor(ISrsSrtClientHandler *srt_handler);
virtual ~SrsSrtAcceptor();
public:
@ -41,70 +42,6 @@ public:
virtual srs_error_t on_srt_client(srs_srt_t srt_fd);
};
// SRS SRT server, initialize and listen, start connection service thread, destroy client.
class SrsSrtServer : public ISrsResourceManager, public ISrsHourGlass
{
private:
SrsResourceManager *conn_manager_;
SrsHourGlass *timer_;
private:
std::vector<SrsSrtAcceptor *> acceptors_;
public:
SrsSrtServer();
virtual ~SrsSrtServer();
public:
virtual srs_error_t initialize();
virtual srs_error_t listen();
private:
// listen at specified srt protocol.
virtual srs_error_t listen_srt_mpegts();
// Close the listeners and remove the listen object from manager.
virtual void close_listeners();
// For internal only
public:
// When listener got a fd, notice server to accept it.
// @param srt_fd, the client fd in srt boxed, the underlayer fd.
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd);
private:
virtual srs_error_t fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr);
// Interface ISrsResourceManager
public:
// A callback for connection to remove itself.
// When connection thread cycle terminated, callback this to delete connection.
virtual void remove(ISrsResource *c);
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
private:
virtual void resample_kbps();
};
// The srt server adapter, the master server.
class SrsSrtServerAdapter : public ISrsHybridServer
{
private:
SrsSrtServer *srt_server_;
public:
SrsSrtServerAdapter();
virtual ~SrsSrtServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run(SrsWaitGroup *wg);
virtual void stop();
public:
virtual SrsSrtServer *instance();
};
// Start a coroutine to drive the SRT events with state-threads.
class SrsSrtEventLoop : public ISrsCoroutineHandler
{

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 67
#define VERSION_REVISION 68
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1,132 +0,0 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_core.hpp>
#include <srs_app_config.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_deprecated.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_file.hpp>
#include <srs_kernel_mp4.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_protocol_kbps.hpp>
#include <srs_protocol_log.hpp>
#include <sstream>
#include <stdio.h>
#include <stdlib.h>
#include <string>
using namespace std;
// @global log and context.
ISrsLog *_srs_log = new SrsConsoleLog(SrsLogLevelTrace, false);
ISrsContext *_srs_context = new SrsThreadContext();
// @global config object for app module.
SrsConfig *_srs_config = new SrsConfig();
// @global Other variables.
bool _srs_in_docker = false;
// Whether setup config by environment variables, see https://github.com/ossrs/srs/issues/2277
bool _srs_config_by_env = false;
// The binary name of SRS.
const char *_srs_binary = NULL;
extern SrsPps *_srs_pps_cids_get;
srs_error_t parse(std::string mp4_file, bool verbose)
{
srs_error_t err = srs_success;
SrsFileReader fr;
if ((err = fr.open(mp4_file)) != srs_success) {
return srs_error_wrap(err, "open mp4 file %s", mp4_file.c_str());
}
srs_trace("MP4 file open success");
SrsMp4BoxReader br;
if ((err = br.initialize(&fr)) != srs_success) {
return srs_error_wrap(err, "open box reader");
}
srs_trace("MP4 box reader open success");
SrsUniquePtr<SrsSimpleStream> stream(new SrsSimpleStream());
fprintf(stderr, "\n%s\n", mp4_file.c_str());
while (true) {
SrsMp4Box *box_raw = NULL;
if ((err = br.read(stream.get(), &box_raw)) != srs_success) {
if (srs_error_code(err) == ERROR_SYSTEM_FILE_EOF) {
fprintf(stderr, "\n");
}
return srs_error_wrap(err, "read box");
}
// Should use unique pointer after box is created.
SrsUniquePtr box(box_raw);
SrsUniquePtr<SrsBuffer> buffer(new SrsBuffer(stream->bytes(), stream->length()));
if ((err = box->decode(buffer.get())) != srs_success) {
return srs_error_wrap(err, "decode box");
}
if ((err = br.skip(box.get(), stream.get())) != srs_success) {
return srs_error_wrap(err, "skip box");
}
SrsMp4DumpContext ctx;
ctx.level = 1;
ctx.summary = !verbose;
stringstream ss;
fprintf(stderr, "%s", box->dumps(ss, ctx).str().c_str());
}
return err;
}
int main(int argc, char **argv)
{
_srs_pps_cids_get = new SrsPps();
printf("SRS MP4 parser/%d.%d.%d, parse and show the mp4 boxes structure.\n",
VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION);
_srs_binary = argv[0];
if (argc < 2) {
printf("Usage: %s <mp4_file> [verbose]\n"
" mp4_file The MP4 file path to parse.\n"
" verbose Whether print verbose of box.\n"
"For example:\n"
" %s doc/source.200kbps.768x320.mp4\n"
" %s doc/source.200kbps.768x320.mp4 verbose\n",
argv[0], argv[0], argv[0]);
exit(-1);
}
string mp4_file = argv[1];
bool verbose = false;
if (argc > 2) {
verbose = true;
}
srs_trace("Parse MP4 file %s, verbose=%d", mp4_file.c_str(), verbose);
srs_error_t err = parse(mp4_file, verbose);
int code = srs_error_code(err);
if (code == ERROR_SYSTEM_FILE_EOF) {
srs_trace("Parse complete");
} else {
srs_error("Parse error %s", srs_error_desc(err).c_str());
}
srs_freep(err);
return code;
}

View File

@ -33,7 +33,6 @@ using namespace std;
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_log.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
@ -44,7 +43,6 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_server.hpp>
#ifdef SRS_SRT
#include <srs_app_srt_server.hpp>
@ -53,7 +51,7 @@ using namespace std;
// pre-declare
srs_error_t run_directly_or_daemon();
srs_error_t run_hybrid_server();
srs_error_t run_srs_server();
void show_macro_features();
// @global log and context.
@ -66,9 +64,6 @@ SrsConfig *_srs_config = NULL;
// @global version of srs, which can grep keyword "XCORE"
extern const char *_srs_version;
// @global main SRS server, for debugging
SrsServer *_srs_server = NULL;
// Whether setup config by environment variables, see https://github.com/ossrs/srs/issues/2277
bool _srs_config_by_env = false;
@ -409,7 +404,7 @@ srs_error_t run_directly_or_daemon()
// If not daemon, directly run hybrid server.
if (!run_as_daemon) {
if ((err = run_hybrid_server()) != srs_success) {
if ((err = run_srs_server()) != srs_success) {
return srs_error_wrap(err, "run server");
}
return srs_success;
@ -446,43 +441,31 @@ srs_error_t run_directly_or_daemon()
// son
srs_trace("son(daemon) process running.");
if ((err = run_hybrid_server()) != srs_success) {
if ((err = run_srs_server()) != srs_success) {
return srs_error_wrap(err, "daemon run server");
}
return err;
}
srs_error_t run_hybrid_server()
srs_error_t run_srs_server()
{
srs_error_t err = srs_success;
// Create servers and register them.
_srs_hybrid->register_server(new SrsServerAdapter());
#ifdef SRS_SRT
_srs_hybrid->register_server(new SrsSrtServerAdapter());
#endif
_srs_hybrid->register_server(new SrsRtcServerAdapter());
_srs_server = new SrsServer();
// Do some system initialize.
if ((err = _srs_hybrid->initialize()) != srs_success) {
if ((err = _srs_server->initialize()) != srs_success) {
return srs_error_wrap(err, "hybrid initialize");
}
// Circuit breaker to protect server, which depends on hybrid.
if ((err = _srs_circuit_breaker->initialize()) != srs_success) {
return srs_error_wrap(err, "init circuit breaker");
}
// Should run util hybrid servers all done.
if ((err = _srs_hybrid->run()) != srs_success) {
if ((err = _srs_server->run()) != srs_success) {
return srs_error_wrap(err, "hybrid run");
}
// After all done, stop and cleanup.
_srs_hybrid->stop();
_srs_server->stop();
return err;
}

View File

@ -17,7 +17,6 @@ class SrsSrtSocket;
extern srs_error_t srs_srt_log_initialize();
typedef int srs_srt_t;
extern srs_srt_t srs_srt_socket_invalid();
// Create srt socket only, with libsrt's default option.

View File

@ -14,6 +14,9 @@
#include <srs_kernel_error.hpp>
#include <srs_protocol_io.hpp>
// Wrap for SRT.
typedef int srs_srt_t;
// Wrap for coroutine.
typedef void *srs_netfd_t;
typedef void *srs_thread_t;

View File

@ -36,7 +36,6 @@ ISrsLog *_srs_log = NULL;
ISrsContext *_srs_context = NULL;
// app module.
SrsConfig *_srs_config = NULL;
SrsServer *_srs_server = NULL;
bool _srs_in_docker = false;
bool _srs_config_by_env = false;
@ -62,6 +61,8 @@ srs_error_t prepare_main()
return srs_error_wrap(err, "init global");
}
_srs_server = new SrsServer();
srs_freep(_srs_log);
_srs_log = new MockEmptyLog(SrsLogLevelError);