Codex: Rename origin load balancer types.

This commit is contained in:
winlin 2026-05-10 20:41:53 -04:00
parent de69339785
commit 3b93ddfddf
33 changed files with 45 additions and 44 deletions

View File

@ -1 +1 @@
../.openclaw/memory
../memory

View File

@ -1 +1 @@
../.openclaw/skills
../skills

View File

@ -1 +1 @@
../.openclaw/memory
../memory

View File

@ -1 +1 @@
../.openclaw/skills
../skills

View File

@ -1 +1 @@
../.openclaw/memory
../memory

View File

@ -1 +1 @@
../.openclaw/skills
../skills

1
.openclaw/memory Symbolic link
View File

@ -0,0 +1 @@
../memory

1
.openclaw/skills Symbolic link
View File

@ -0,0 +1 @@
../skills

View File

@ -53,14 +53,14 @@ Both implementations maintain stream-to-server mappings to ensure stream consist
The load balancer uses a clean interface-based architecture:
**Core Interface**: `SRSLoadBalancer`
**Core Interface**: `OriginLoadBalancer`
- Initialization and lifecycle management
- Server registration and updates
- Stream routing (Pick operation)
- Protocol-specific state management (HLS, WebRTC)
**Data Models**:
- `SRSServer`: Backend origin server representation
- `OriginServer`: Backend origin server representation
- `HLSPlayStream`: Interface for HLS streaming sessions
- `RTCConnection`: Interface for WebRTC connections

View File

@ -13,7 +13,7 @@ import (
)
// NewDefaultSRSForDebugging initialize the default SRS media server, for debugging only.
func NewDefaultSRSForDebugging(environment env.ProxyEnvironment) (*SRSServer, error) {
func NewDefaultSRSForDebugging(environment env.ProxyEnvironment) (*OriginServer, error) {
if environment.DefaultBackendEnabled() != "on" {
return nil, nil
}
@ -25,7 +25,7 @@ func NewDefaultSRSForDebugging(environment env.ProxyEnvironment) (*SRSServer, er
return nil, fmt.Errorf("empty default backend rtmp")
}
server := NewSRSServer(func(srs *SRSServer) {
server := NewOriginServer(func(srs *OriginServer) {
srs.IP = environment.DefaultBackendIP()
srs.RTMP = []string{environment.DefaultBackendRTMP()}
srs.ServerID = fmt.Sprintf("default-%v", logger.GenerateContextID())

View File

@ -19,8 +19,8 @@ const HLSAliveDuration = 120 * time.Second
// If WebRTC streaming update in this duration, it's alive.
const RTCAliveDuration = 120 * time.Second
// SRSServer represents a backend origin server.
type SRSServer struct {
// OriginServer represents a backend origin server.
type OriginServer struct {
// The server IP.
IP string `json:"ip,omitempty"`
// The server device ID, configured by user.
@ -45,15 +45,15 @@ type SRSServer struct {
UpdatedAt time.Time `json:"update_at,omitempty"`
}
func (v *SRSServer) ID() string {
func (v *OriginServer) ID() string {
return fmt.Sprintf("%v-%v-%v", v.ServerID, v.ServiceID, v.PID)
}
func (v *SRSServer) String() string {
func (v *OriginServer) String() string {
return fmt.Sprintf("%v", v)
}
func (v *SRSServer) Format(f fmt.State, c rune) {
func (v *OriginServer) Format(f fmt.State, c rune) {
switch c {
case 'v', 's':
if f.Flag('+') {
@ -87,8 +87,8 @@ func (v *SRSServer) Format(f fmt.State, c rune) {
}
}
func NewSRSServer(opts ...func(*SRSServer)) *SRSServer {
v := &SRSServer{}
func NewOriginServer(opts ...func(*OriginServer)) *OriginServer {
v := &OriginServer{}
for _, opt := range opts {
opt(v)
}
@ -109,14 +109,14 @@ type RTCConnection interface {
GetUfrag() string
}
// SRSLoadBalancer is the interface to load balance the SRS servers.
type SRSLoadBalancer interface {
// OriginLoadBalancer is the interface to load balance the SRS servers.
type OriginLoadBalancer interface {
// Initialize the load balancer.
Initialize(ctx context.Context) error
// Update the backend server.
Update(ctx context.Context, server *SRSServer) error
// Update records the latest registration or heartbeat for an origin server.
Update(ctx context.Context, server *OriginServer) error
// Pick a backend server for the specified stream URL.
Pick(ctx context.Context, streamURL string) (*SRSServer, error)
Pick(ctx context.Context, streamURL string) (*OriginServer, error)
// Load or store the HLS streaming for the specified stream URL.
LoadOrStoreHLS(ctx context.Context, streamURL string, value HLSPlayStream) (HLSPlayStream, error)
// Load the HLS streaming by SPBHID, the SRS Proxy Backend HLS ID.
@ -128,4 +128,4 @@ type SRSLoadBalancer interface {
}
// SrsLoadBalancer is the global SRS load balancer instance.
var SrsLoadBalancer SRSLoadBalancer
var SrsLoadBalancer OriginLoadBalancer

View File

@ -20,9 +20,9 @@ type MemoryLoadBalancer struct {
// The environment interface.
environment env.ProxyEnvironment
// All available SRS servers, key is server ID.
servers sync.Map[string, *SRSServer]
servers sync.Map[string, *OriginServer]
// The picked server to service client by specified stream URL, key is stream url.
picked sync.Map[string, *SRSServer]
picked sync.Map[string, *OriginServer]
// The HLS streaming, key is stream URL.
hlsStreamURL sync.Map[string, HLSPlayStream]
// The HLS streaming, key is SPBHID.
@ -34,11 +34,11 @@ type MemoryLoadBalancer struct {
}
// NewMemoryLoadBalancer creates a new memory-based load balancer.
func NewMemoryLoadBalancer(environment env.ProxyEnvironment) SRSLoadBalancer {
func NewMemoryLoadBalancer(environment env.ProxyEnvironment) OriginLoadBalancer {
return &MemoryLoadBalancer{
environment: environment,
servers: sync.NewMap[string, *SRSServer](),
picked: sync.NewMap[string, *SRSServer](),
servers: sync.NewMap[string, *OriginServer](),
picked: sync.NewMap[string, *OriginServer](),
hlsStreamURL: sync.NewMap[string, HLSPlayStream](),
hlsSPBHID: sync.NewMap[string, HLSPlayStream](),
rtcStreamURL: sync.NewMap[string, RTCConnection](),
@ -75,20 +75,20 @@ func (v *MemoryLoadBalancer) Initialize(ctx context.Context) error {
return nil
}
func (v *MemoryLoadBalancer) Update(ctx context.Context, server *SRSServer) error {
func (v *MemoryLoadBalancer) Update(ctx context.Context, server *OriginServer) error {
v.servers.Store(server.ID(), server)
return nil
}
func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSServer, error) {
func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*OriginServer, error) {
// Always proxy to the same server for the same stream URL.
if server, ok := v.picked.Load(streamURL); ok {
return server, nil
}
// Gather all servers that were alive within the last few seconds.
var servers []*SRSServer
v.servers.Range(func(key string, server *SRSServer) bool {
var servers []*OriginServer
v.servers.Range(func(key string, server *OriginServer) bool {
if time.Since(server.UpdatedAt) < ServerAliveDuration {
servers = append(servers, server)
}
@ -97,7 +97,7 @@ func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSSe
// If no servers available, use all possible servers.
if len(servers) == 0 {
v.servers.Range(func(key string, server *SRSServer) bool {
v.servers.Range(func(key string, server *OriginServer) bool {
servers = append(servers, server)
return true
})

View File

@ -28,7 +28,7 @@ type RedisLoadBalancer struct {
}
// NewRedisLoadBalancer creates a new Redis-based load balancer.
func NewRedisLoadBalancer(environment env.ProxyEnvironment) SRSLoadBalancer {
func NewRedisLoadBalancer(environment env.ProxyEnvironment) OriginLoadBalancer {
return &RedisLoadBalancer{
environment: environment,
}
@ -80,7 +80,7 @@ func (v *RedisLoadBalancer) Initialize(ctx context.Context) error {
return nil
}
func (v *RedisLoadBalancer) Update(ctx context.Context, server *SRSServer) error {
func (v *RedisLoadBalancer) Update(ctx context.Context, server *OriginServer) error {
b, err := json.Marshal(server)
if err != nil {
return errors.Wrapf(err, "marshal server %+v", server)
@ -130,14 +130,14 @@ func (v *RedisLoadBalancer) Update(ctx context.Context, server *SRSServer) error
return nil
}
func (v *RedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSServer, error) {
func (v *RedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*OriginServer, error) {
key := fmt.Sprintf("srs-proxy-url:%v", streamURL)
// Always proxy to the same server for the same stream URL.
if serverKey, err := v.rdb.Get(ctx, key).Result(); err == nil {
// If server not exists, ignore and pick another server for the stream URL.
if b, err := v.rdb.Get(ctx, serverKey).Bytes(); err == nil && len(b) > 0 {
var server SRSServer
var server OriginServer
if err := json.Unmarshal(b, &server); err != nil {
return nil, errors.Wrapf(err, "unmarshal key=%v server %v", key, string(b))
}
@ -163,7 +163,7 @@ func (v *RedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSSer
// All server should be alive, if not, should have been removed by redis. So we only
// random pick one that is always available. Use global rand which is thread-safe since Go 1.20.
var serverKey string
var server SRSServer
var server OriginServer
for i := 0; i < 3; i++ {
tryServerKey := serverKeys[rand.Intn(len(serverKeys))]
b, err := v.rdb.Get(ctx, tryServerKey).Bytes()

View File

@ -255,7 +255,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
return errors.Errorf("empty rtmp")
}
server := lb.NewSRSServer(func(srs *lb.SRSServer) {
server := lb.NewOriginServer(func(srs *lb.OriginServer) {
srs.IP, srs.DeviceID = ip, deviceID
srs.ServerID, srs.ServiceID, srs.PID = serverID, serviceID, pid
srs.RTMP, srs.HTTP, srs.API = rtmp, stream, api

View File

@ -245,7 +245,7 @@ func (v *httpFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter,
return nil
}
func (v *httpFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
func (v *httpFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.OriginServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no http stream server")
@ -363,7 +363,7 @@ func (v *hlsPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}
func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.OriginServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no rtmp server")

View File

@ -147,7 +147,7 @@ func (v *webRTCProxyServer) HandleApiForWHEP(ctx context.Context, w http.Respons
}
func (v *webRTCProxyServer) proxyApiToBackend(
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer,
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.OriginServer,
remoteSDPOffer string, streamURL string,
) error {
// Parse HTTP port from backend.

1
memory
View File

@ -1 +0,0 @@
.openclaw/memory

View File

@ -219,7 +219,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine
`internal/rtmp` — RTMP protocol implementation (parsing, not proxying). Full RTMP chunk stream and message protocol: simple handshake (C0/C1/C2), chunk stream reader/writer with all four format types, extended timestamp, message reassembly from chunks. Defines all RTMP message types, chunk stream IDs, and command names. Packet types include ConnectApp, CreateStream, Publish, Play, Call, SetChunkSize, WindowAcknowledgementSize, SetPeerBandwidth, UserControl. Uses Go generics (`ExpectPacket[T]`) to read until a specific packet type arrives. Also includes full AMF0 encoder/decoder supporting Number, Boolean, String, Object, Null, Undefined, EcmaArray, StrictArray, Date, LongString — with ordered key-value maps, auto-type-discovery, and safe type converters.
`internal/lb` — Load balancer abstraction and two implementations. Defines `SRSLoadBalancer` interface and core types in `lb.go` (Initialize, Update, Pick, HLS/WebRTC state management) and `SRSServer` struct representing a backend origin (IP, listen endpoints for RTMP/HTTP/API/SRT/RTC, heartbeat tracking). **Memory LB** (`mem.go`) — in-memory using `sync.Map`, sticky random pick per stream URL, single-proxy deployment. **Redis LB** (`redis.go`) — Redis-backed shared state with TTL-based expiration, enables multi-proxy horizontal scaling behind a network load balancer. Also includes a debug helper (`debug.go`) that creates a fake backend from env vars when `PROXY_DEFAULT_BACKEND_ENABLED=on` for development without real SRS registration.
`internal/lb` — Load balancer abstraction and two implementations. Defines `OriginLoadBalancer` interface and core types in `lb.go` (Initialize, Update, Pick, HLS/WebRTC state management) and `OriginServer` struct representing a backend origin (IP, listen endpoints for RTMP/HTTP/API/SRT/RTC, heartbeat tracking). **Memory LB** (`mem.go`) — in-memory using `sync.Map`, sticky random pick per stream URL, single-proxy deployment. **Redis LB** (`redis.go`) — Redis-backed shared state with TTL-based expiration, enables multi-proxy horizontal scaling behind a network load balancer. Also includes a debug helper (`debug.go`) that creates a fake backend from env vars when `PROXY_DEFAULT_BACKEND_ENABLED=on` for development without real SRS registration.
`internal/logger` — Structured logging with context IDs. Four log levels: Debug/Info (stdout), Warn/Error (stderr). Emits JSON via `log/slog` with `pid` and `cid` attributes. Each connection/request gets a unique 7-char hex context ID for log correlation, stored in `context.Context`.