srs/internal/lb/mem.go
Winlin ebf8b712c9 Proxy: restructure repo as Go project with proxy as first module (#4652)
Reorganize the SRS (Simple Realtime Server) repository to
follow a conventional Go project structure, setting the stage for a
progressive transition from a C++ project to a Go project. The proxy,
which was once contained within its own `proxy/` subdirectory, will now
be converted into the initial Go module located at the root of the
repository, serving as a template for subsequent Go modules.

- **Go module at repo root:** `go.mod` moved to repo root, module
renamed from `proxy` to `srsx`. The repo is now a proper Go project with
`cmd/` and `internal/` at the top level.
- **Elevation of Proxy Code:** Move the proxy code from
`proxy/cmd/proxy-go/` to `cmd/proxy/`, and from `proxy/internal/` to
`internal/`. The proxy serves as the inaugural application; subsequent
modules (for instance, `cmd/origin`) will mimic this arrangement.
- **Documentation Restructured:** Transfer the documentation from
`proxy/docs/` to `docs/proxy/`, revise the main README to endorse
OpenClaw as the preferred AI tool, and update `proxy/README.md` to point
to the new documentation locations.
- **Build and config:** `Makefile` moved to root, `PROXY_STATIC_FILES`
default path corrected for the new layout, `.gitignore` consolidated.
- **Cleanup:** removed standalone `proxy/LICENSE` (repo-level license
applies), all internal imports updated to `srsx/internal/...`.
- **OpenClaw workspace:** added community bot info, git workflow
conventions, and support group behavior guidance.

This restructuring was performed by OpenClaw orchestrating Claude Code
and Codex via ACP.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: chatgpt-codex-connector[bot] <199175422+chatgpt-codex-connector[bot]@users.noreply.github.com>
2026-03-22 08:11:28 -04:00

150 lines
4.3 KiB
Go

// Copyright (c) 2025 Winlin
//
// SPDX-License-Identifier: MIT
package lb
import (
"context"
"fmt"
"math/rand"
"time"
"srsx/internal/env"
"srsx/internal/errors"
"srsx/internal/logger"
"srsx/internal/sync"
)
// MemoryLoadBalancer stores state in memory.
type MemoryLoadBalancer struct {
// The environment interface.
environment env.Environment
// All available SRS servers, key is server ID.
servers sync.Map[string, *SRSServer]
// The picked server to service client by specified stream URL, key is stream url.
picked sync.Map[string, *SRSServer]
// The HLS streaming, key is stream URL.
hlsStreamURL sync.Map[string, HLSPlayStream]
// The HLS streaming, key is SPBHID.
hlsSPBHID sync.Map[string, HLSPlayStream]
// The WebRTC streaming, key is stream URL.
rtcStreamURL sync.Map[string, RTCConnection]
// The WebRTC streaming, key is ufrag.
rtcUfrag sync.Map[string, RTCConnection]
}
// NewMemoryLoadBalancer creates a new memory-based load balancer.
func NewMemoryLoadBalancer(environment env.Environment) SRSLoadBalancer {
return &MemoryLoadBalancer{
environment: environment,
}
}
func (v *MemoryLoadBalancer) Initialize(ctx context.Context) error {
server, err := NewDefaultSRSForDebugging(v.environment)
if err != nil {
return errors.Wrapf(err, "initialize default SRS")
}
if server != nil {
if err := v.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update default SRS %+v", server)
}
// Keep alive.
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(30 * time.Second):
if err := v.Update(ctx, server); err != nil {
logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err)
}
}
}
}()
logger.Df(ctx, "MemoryLB: Initialize default SRS media server, %+v", server)
}
return nil
}
func (v *MemoryLoadBalancer) Update(ctx context.Context, server *SRSServer) error {
v.servers.Store(server.ID(), server)
return nil
}
func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSServer, error) {
// Always proxy to the same server for the same stream URL.
if server, ok := v.picked.Load(streamURL); ok {
return server, nil
}
// Gather all servers that were alive within the last few seconds.
var servers []*SRSServer
v.servers.Range(func(key string, server *SRSServer) bool {
if time.Since(server.UpdatedAt) < ServerAliveDuration {
servers = append(servers, server)
}
return true
})
// If no servers available, use all possible servers.
if len(servers) == 0 {
v.servers.Range(func(key string, server *SRSServer) bool {
servers = append(servers, server)
return true
})
}
// No server found, failed.
if len(servers) == 0 {
return nil, fmt.Errorf("no server available for %v", streamURL)
}
// Pick a server randomly from servers. Use global rand which is thread-safe since Go 1.20.
// For older Go versions, this is still safe as we're only reading from the servers slice.
server := servers[rand.Intn(len(servers))]
v.picked.Store(streamURL, server)
return server, nil
}
func (v *MemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (HLSPlayStream, error) {
// Load the HLS streaming for the SPBHID, for TS files.
if actual, ok := v.hlsSPBHID.Load(spbhid); !ok {
return nil, errors.Errorf("no HLS streaming for SPBHID %v", spbhid)
} else {
return actual, nil
}
}
func (v *MemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value HLSPlayStream) (HLSPlayStream, error) {
// Update the HLS streaming for the stream URL, for M3u8.
actual, _ := v.hlsStreamURL.LoadOrStore(streamURL, value)
if actual == nil {
return nil, errors.Errorf("load or store HLS streaming for %v failed", streamURL)
}
// Update the HLS streaming for the SPBHID, for TS files.
v.hlsSPBHID.Store(value.GetSPBHID(), actual)
return actual, nil
}
func (v *MemoryLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value RTCConnection) error {
// Update the WebRTC streaming for the stream URL.
v.rtcStreamURL.Store(streamURL, value)
// Update the WebRTC streaming for the ufrag.
v.rtcUfrag.Store(value.GetUfrag(), value)
return nil
}
func (v *MemoryLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (RTCConnection, error) {
if actual, ok := v.rtcUfrag.Load(ufrag); !ok {
return nil, errors.Errorf("no WebRTC streaming for ufrag %v", ufrag)
} else {
return actual, nil
}
}