srs/internal/bootstrap/proxy.go
winlin 3060bf8e7c Claude: Split lb interfaces, extract redisclient, drop race-prone globals.
- Split OriginLoadBalancer into OriginService / HLSService / RTCService;
  the original interface now embeds the three role interfaces. Generate
  counterfeiter fakes for all four.
- Extract internal/redisclient: RedisClient interface + New() factory.
  internal/lb/redis.go no longer imports github.com/go-redis/redis/v8.
- Add unit tests for lb.go (OriginServer.ID/String/Format/NewOriginServer)
  and for the full memory + redis load balancers.
- Replace package-level test seams (memoryKeepaliveInterval, newRedisClient,
  redisKeepaliveInterval, signal.signalNotify/osExit, rtmp.createBuffer) with
  per-instance struct fields so concurrent tests can't race on them.
- Promote signal.InstallSignals / InstallForceQuit onto a new signal.Handler
  type; update bootstrap to construct one.
- Move rtmp createBuffer onto amf0ObjectBase as bufFactory; the three AMF0
  marshalers and their tests use the per-instance factory.
- Make proxy test scripts locate the workspace by walking up to go.mod
  instead of brittle '../../../..' counting (symlink-aware).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 11:11:18 -04:00

150 lines
4.8 KiB
Go

// Copyright (c) 2026 Winlin
//
// SPDX-License-Identifier: MIT
package bootstrap
import (
"context"
"time"
"srsx/internal/debug"
"srsx/internal/env"
"srsx/internal/errors"
"srsx/internal/lb"
"srsx/internal/logger"
"srsx/internal/proxy"
"srsx/internal/signal"
"srsx/internal/version"
)
// NewProxyBootstrap creates a new Bootstrap instance for the proxy server.
func NewProxyBootstrap() Bootstrap {
return &proxyBootstrap{}
}
// proxyBootstrap implements the Bootstrap interface for the proxy server.
type proxyBootstrap struct{}
// Start initializes the context with logger and signal handlers, then runs the bootstrap.
// Returns any error encountered during startup.
func (b *proxyBootstrap) Start(ctx context.Context) error {
ctx = logger.WithContext(ctx)
logger.Debug(ctx, "%v-Proxy/%v started", version.Signature(), version.Version())
// Install signals.
ctx, cancel := context.WithCancel(ctx)
signal.NewHandler().InstallSignals(ctx, cancel)
// Run the main loop, ignore the user cancel error.
err := b.run(ctx)
if err != nil && ctx.Err() != context.Canceled {
logger.Error(ctx, "main: %+v", err)
return err
}
logger.Debug(ctx, "%v done", version.Signature())
return nil
}
// Run initializes and starts all proxy servers and the load balancer.
// It blocks until the context is cancelled.
func (b *proxyBootstrap) run(ctx context.Context) error {
// Setup the environment variables.
environment, err := env.NewProxyEnvironment(ctx)
if err != nil {
return errors.Wrapf(err, "create environment")
}
// When cancelled, the program is forced to exit due to a timeout. Normally, this doesn't occur
// because the main thread exits after the context is cancelled. However, sometimes the main thread
// may be blocked for some reason, so a forced exit is necessary to ensure the program terminates.
if err := signal.NewHandler().InstallForceQuit(ctx, environment); err != nil {
return errors.Wrapf(err, "install force quit")
}
// Start the Go pprof if enabled.
debug.HandleGoPprof(ctx, environment)
// Create and initialize the load balancer.
loadBalancer, err := b.initializeLoadBalancer(ctx, environment)
if err != nil {
return err
}
// Parse the gracefully quit timeout.
gracefulQuitTimeout, err := time.ParseDuration(environment.GraceQuitTimeout())
if err != nil {
return errors.Wrapf(err, "parse gracefully quit timeout")
}
// Start all servers and block until context is cancelled.
return b.startServers(ctx, environment, loadBalancer, gracefulQuitTimeout)
}
// initializeLoadBalancer sets up the load balancer based on configuration.
func (b *proxyBootstrap) initializeLoadBalancer(ctx context.Context, environment env.ProxyEnvironment) (lb.OriginLoadBalancer, error) {
var loadBalancer lb.OriginLoadBalancer
switch environment.LoadBalancerType() {
case "redis":
loadBalancer = lb.NewRedisLoadBalancer(environment)
default:
loadBalancer = lb.NewMemoryLoadBalancer(environment)
}
if err := loadBalancer.Initialize(ctx); err != nil {
return nil, errors.Wrapf(err, "initialize srs load balancer")
}
return loadBalancer, nil
}
// startServers initializes and starts all protocol servers.
func (b *proxyBootstrap) startServers(ctx context.Context, environment env.ProxyEnvironment, loadBalancer lb.OriginLoadBalancer, gracefulQuitTimeout time.Duration) error {
// Start the RTMP server.
rtmpProxyServer := proxy.NewRTMPProxyServer(environment, loadBalancer)
if err := rtmpProxyServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtmp server")
}
defer rtmpProxyServer.Close()
// Start the WebRTC server.
webRTCProxyServer := proxy.NewWebRTCProxyServer(environment, loadBalancer)
if err := webRTCProxyServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtc server")
}
defer webRTCProxyServer.Close()
// Start the HTTP API server.
httpAPIProxyServer := proxy.NewHTTPAPIProxyServer(environment, gracefulQuitTimeout, webRTCProxyServer)
if err := httpAPIProxyServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http api server")
}
defer httpAPIProxyServer.Close()
// Start the SRT server.
srsSRTProxyServer := proxy.NewSRSSRTProxyServer(environment, loadBalancer)
if err := srsSRTProxyServer.Run(ctx); err != nil {
return errors.Wrapf(err, "srt server")
}
defer srsSRTProxyServer.Close()
// Start the System API server.
systemAPI := proxy.NewSystemAPI(environment, loadBalancer, gracefulQuitTimeout)
if err := systemAPI.Run(ctx); err != nil {
return errors.Wrapf(err, "system api server")
}
defer systemAPI.Close()
// Start the HTTP web server.
httpStreamProxyServer := proxy.NewHTTPStreamProxyServer(environment, loadBalancer, gracefulQuitTimeout)
if err := httpStreamProxyServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http server")
}
defer httpStreamProxyServer.Close()
// Wait for the main loop to quit.
<-ctx.Done()
return nil
}