diff --git a/.openclaw/memory/srs-codebase-map.md b/.openclaw/memory/srs-codebase-map.md index 0beae9cae..1988eb8c4 100644 --- a/.openclaw/memory/srs-codebase-map.md +++ b/.openclaw/memory/srs-codebase-map.md @@ -215,7 +215,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine `internal/bootstrap` — Server startup and lifecycle orchestration. Sets up logging context, signal handlers, loads environment, installs force-quit timer, optionally starts pprof, initializes the load balancer (memory or Redis based on `PROXY_LOAD_BALANCER_TYPE`), then starts all six servers sequentially (RTMP, WebRTC, HTTP API, SRT, System API, HTTP Stream) and blocks until context is cancelled. Deferred `Close()` on each server ensures graceful shutdown. -`internal/server` — Proxy server implementations. Each server accepts client connections, parses just enough of the protocol to extract the stream URL, picks a backend via the load balancer, and proxies traffic bidirectionally. Contains five proxy servers: (1) **RTMP proxy** (`rtmp.go`) — TCP listener, simple handshake, parses connect/publish/play to get stream URL, bidirectional RTMP message copying, stateless. (2) **HTTP stream proxy** (`http.go`) — serves static files, proxies HTTP-FLV/TS via reverse-proxy, proxies HLS m3u8 with `spbhid` rewriting so TS segment requests route to the same backend. (3) **WebRTC proxy** (`rtc.go`) — two-phase: WHIP/WHEP signaling (SDP rewrite to replace backend UDP port with proxy's) + UDP media transport (identifies connections by STUN ufrag, supports address migration), stateful. (4) **SRT proxy** (`srt.go`) — intercepts SRT 4-step handshake locally, parses stream ID on handshake 2, replays full handshake with backend, then proxies UDP bidirectionally, stateful per-connection. (5) **HTTP API + System API** (`api.go`) — HTTP API delegates WHIP/WHEP to WebRTC server; System API provides `/api/v1/srs/register` where backend SRS C++ servers register themselves so the load balancer knows about them. +`internal/proxy` — Proxy server implementations. Each server accepts client connections, parses just enough of the protocol to extract the stream URL, picks a backend via the load balancer, and proxies traffic bidirectionally. Contains five proxy servers: (1) **RTMP proxy** (`rtmp.go`) — TCP listener, simple handshake, parses connect/publish/play to get stream URL, bidirectional RTMP message copying, stateless. (2) **HTTP stream proxy** (`http.go`) — serves static files, proxies HTTP-FLV/TS via reverse-proxy, proxies HLS m3u8 with `spbhid` rewriting so TS segment requests route to the same backend. (3) **WebRTC proxy** (`rtc.go`) — two-phase: WHIP/WHEP signaling (SDP rewrite to replace backend UDP port with proxy's) + UDP media transport (identifies connections by STUN ufrag, supports address migration), stateful. (4) **SRT proxy** (`srt.go`) — intercepts SRT 4-step handshake locally, parses stream ID on handshake 2, replays full handshake with backend, then proxies UDP bidirectionally, stateful per-connection. (5) **HTTP API + System API** (`api.go`) — HTTP API delegates WHIP/WHEP to WebRTC server; System API provides `/api/v1/srs/register` where backend SRS C++ servers register themselves so the load balancer knows about them. `internal/rtmp` — RTMP protocol implementation (parsing, not proxying). Full RTMP chunk stream and message protocol: simple handshake (C0/C1/C2), chunk stream reader/writer with all four format types, extended timestamp, message reassembly from chunks. Defines all RTMP message types, chunk stream IDs, and command names. Packet types include ConnectApp, CreateStream, Publish, Play, Call, SetChunkSize, WindowAcknowledgementSize, SetPeerBandwidth, UserControl. Uses Go generics (`ExpectPacket[T]`) to read until a specific packet type arrives. Also includes full AMF0 encoder/decoder supporting Number, Boolean, String, Object, Null, Undefined, EcmaArray, StrictArray, Date, LongString — with ordered key-value maps, auto-type-discovery, and safe type converters. diff --git a/internal/bootstrap/proxy.go b/internal/bootstrap/proxy.go index f59522b6d..985ed60b0 100644 --- a/internal/bootstrap/proxy.go +++ b/internal/bootstrap/proxy.go @@ -12,7 +12,7 @@ import ( "srsx/internal/errors" "srsx/internal/lb" "srsx/internal/logger" - "srsx/internal/server" + "srsx/internal/proxy" "srsx/internal/signal" "srsx/internal/version" ) @@ -99,46 +99,46 @@ func (b *proxyBootstrap) initializeLoadBalancer(ctx context.Context, environment // startServers initializes and starts all protocol servers. func (b *proxyBootstrap) startServers(ctx context.Context, environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) error { // Start the RTMP server. - rtmpServer := server.NewRTMPServer(environment) - if err := rtmpServer.Run(ctx); err != nil { + rtmpProxyServer := proxy.NewRTMPProxyServer(environment) + if err := rtmpProxyServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtmp server") } - defer rtmpServer.Close() + defer rtmpProxyServer.Close() // Start the WebRTC server. - webRTCServer := server.NewWebRTCServer(environment) - if err := webRTCServer.Run(ctx); err != nil { + webRTCProxyServer := proxy.NewWebRTCProxyServer(environment) + if err := webRTCProxyServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtc server") } - defer webRTCServer.Close() + defer webRTCProxyServer.Close() // Start the HTTP API server. - httpAPIServer := server.NewHTTPAPIServer(environment, gracefulQuitTimeout, webRTCServer) - if err := httpAPIServer.Run(ctx); err != nil { + httpAPIProxyServer := proxy.NewHTTPAPIProxyServer(environment, gracefulQuitTimeout, webRTCProxyServer) + if err := httpAPIProxyServer.Run(ctx); err != nil { return errors.Wrapf(err, "http api server") } - defer httpAPIServer.Close() + defer httpAPIProxyServer.Close() // Start the SRT server. - srsSRTServer := server.NewSRSSRTServer(environment) - if err := srsSRTServer.Run(ctx); err != nil { + srsSRTProxyServer := proxy.NewSRSSRTProxyServer(environment) + if err := srsSRTProxyServer.Run(ctx); err != nil { return errors.Wrapf(err, "srt server") } - defer srsSRTServer.Close() + defer srsSRTProxyServer.Close() // Start the System API server. - systemAPI := server.NewSystemAPI(environment, gracefulQuitTimeout) + systemAPI := proxy.NewSystemAPI(environment, gracefulQuitTimeout) if err := systemAPI.Run(ctx); err != nil { return errors.Wrapf(err, "system api server") } defer systemAPI.Close() // Start the HTTP web server. - httpStreamServer := server.NewHTTPStreamServer(environment, gracefulQuitTimeout) - if err := httpStreamServer.Run(ctx); err != nil { + httpStreamProxyServer := proxy.NewHTTPStreamProxyServer(environment, gracefulQuitTimeout) + if err := httpStreamProxyServer.Run(ctx); err != nil { return errors.Wrapf(err, "http server") } - defer httpStreamServer.Close() + defer httpStreamProxyServer.Close() // Wait for the main loop to quit. <-ctx.Done() diff --git a/internal/server/api.go b/internal/proxy/api.go similarity index 94% rename from internal/server/api.go rename to internal/proxy/api.go index a69353ee5..ed7bd6020 100644 --- a/internal/server/api.go +++ b/internal/proxy/api.go @@ -1,7 +1,7 @@ // Copyright (c) 2026 Winlin // // SPDX-License-Identifier: MIT -package server +package proxy import ( "context" @@ -20,28 +20,28 @@ import ( "srsx/internal/version" ) -// HTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP, +// HTTPAPIProxyServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP, // to proxy other HTTP API of SRS like the streams and clients, etc. -type HTTPAPIServer interface { +type HTTPAPIProxyServer interface { Run(ctx context.Context) error Close() error } -type httpAPIServer struct { +type httpAPIProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The underlayer HTTP server. server *http.Server // The WebRTC server. - rtc WebRTCServer + rtc WebRTCProxyServer // The gracefully quit timeout, wait server to quit. gracefulQuitTimeout time.Duration // The wait group for all goroutines. wg sync.WaitGroup } -func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCServer) HTTPAPIServer { - v := &httpAPIServer{ +func NewHTTPAPIProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCProxyServer) HTTPAPIProxyServer { + v := &httpAPIProxyServer{ environment: environment, gracefulQuitTimeout: gracefulQuitTimeout, rtc: rtc, @@ -49,7 +49,7 @@ func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time return v } -func (v *httpAPIServer) Close() error { +func (v *httpAPIProxyServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -58,7 +58,7 @@ func (v *httpAPIServer) Close() error { return nil } -func (v *httpAPIServer) Run(ctx context.Context) error { +func (v *httpAPIProxyServer) Run(ctx context.Context) error { // Parse address to listen. addr := v.environment.HttpAPI() if !strings.Contains(addr, ":") { diff --git a/internal/server/http.go b/internal/proxy/http.go similarity index 96% rename from internal/server/http.go rename to internal/proxy/http.go index 21db47741..1d82ce72c 100644 --- a/internal/server/http.go +++ b/internal/proxy/http.go @@ -1,7 +1,7 @@ // Copyright (c) 2026 Winlin // // SPDX-License-Identifier: MIT -package server +package proxy import ( "context" @@ -23,15 +23,15 @@ import ( "srsx/internal/version" ) -// HTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS, +// HTTPStreamProxyServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS, // HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy // the request to the origin server. -type HTTPStreamServer interface { +type HTTPStreamProxyServer interface { Run(ctx context.Context) error Close() error } -type httpStreamServer struct { +type httpStreamProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The underlayer HTTP server. @@ -42,15 +42,15 @@ type httpStreamServer struct { wg stdSync.WaitGroup } -func NewHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamServer { - v := &httpStreamServer{ +func NewHTTPStreamProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamProxyServer { + v := &httpStreamProxyServer{ environment: environment, gracefulQuitTimeout: gracefulQuitTimeout, } return v } -func (v *httpStreamServer) Close() error { +func (v *httpStreamProxyServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -59,7 +59,7 @@ func (v *httpStreamServer) Close() error { return nil } -func (v *httpStreamServer) Run(ctx context.Context) error { +func (v *httpStreamProxyServer) Run(ctx context.Context) error { // Parse address to listen. addr := v.environment.HttpServer() if !strings.Contains(addr, ":") { diff --git a/internal/server/rtc.go b/internal/proxy/rtc.go similarity index 94% rename from internal/server/rtc.go rename to internal/proxy/rtc.go index 7a85e0bbb..3711e3d37 100644 --- a/internal/server/rtc.go +++ b/internal/proxy/rtc.go @@ -1,7 +1,7 @@ // Copyright (c) 2026 Winlin // // SPDX-License-Identifier: MIT -package server +package proxy import ( "context" @@ -23,17 +23,17 @@ import ( "srsx/internal/utils" ) -// WebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out +// WebRTCProxyServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out // which backend server to proxy to. It will also replace the UDP port to the proxy server's in the // SDP answer. -type WebRTCServer interface { +type WebRTCProxyServer interface { Run(ctx context.Context) error Close() error HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error } -type webRTCServer struct { +type webRTCProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The UDP listener for WebRTC server. @@ -51,8 +51,8 @@ type webRTCServer struct { wg stdSync.WaitGroup } -func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServer)) WebRTCServer { - v := &webRTCServer{ +func NewWebRTCProxyServer(environment env.ProxyEnvironment, opts ...func(*webRTCProxyServer)) WebRTCProxyServer { + v := &webRTCProxyServer{ environment: environment, usernames: sync.NewMap[string, *rtcConnection](), addresses: sync.NewMap[string, *rtcConnection](), @@ -63,7 +63,7 @@ func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServe return v } -func (v *webRTCServer) Close() error { +func (v *webRTCProxyServer) Close() error { if v.listener != nil { _ = v.listener.Close() } @@ -72,7 +72,7 @@ func (v *webRTCServer) Close() error { return nil } -func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *webRTCProxyServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -109,7 +109,7 @@ func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWrit return nil } -func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *webRTCProxyServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -146,7 +146,7 @@ func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWrit return nil } -func (v *webRTCServer) proxyApiToBackend( +func (v *webRTCProxyServer) proxyApiToBackend( ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer, remoteSDPOffer string, streamURL string, ) error { @@ -246,7 +246,7 @@ func (v *webRTCServer) proxyApiToBackend( return nil } -func (v *webRTCServer) Run(ctx context.Context) error { +func (v *webRTCProxyServer) Run(ctx context.Context) error { // Parse address to listen. endpoint := v.environment.WebRTCServer() if !strings.Contains(endpoint, ":") { @@ -294,7 +294,7 @@ func (v *webRTCServer) Run(ctx context.Context) error { return nil } -func (v *webRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { +func (v *webRTCProxyServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { var connection *rtcConnection // If STUN binding request, parse the ufrag and identify the connection. diff --git a/internal/server/rtmp.go b/internal/proxy/rtmp.go similarity index 97% rename from internal/server/rtmp.go rename to internal/proxy/rtmp.go index b787e99c8..23be82416 100644 --- a/internal/server/rtmp.go +++ b/internal/proxy/rtmp.go @@ -1,7 +1,7 @@ // Copyright (c) 2026 Winlin // // SPDX-License-Identifier: MIT -package server +package proxy import ( "context" @@ -20,15 +20,15 @@ import ( "srsx/internal/version" ) -// RTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS +// RTMPProxyServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS // server. It will figure out the backend server to proxy to. Unlike the edge server, it will // not cache the stream, but just proxy the stream to backend. -type RTMPServer interface { +type RTMPProxyServer interface { Run(ctx context.Context) error Close() error } -type rtmpServer struct { +type rtmpProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The TCP listener for RTMP server. @@ -37,15 +37,15 @@ type rtmpServer struct { wg sync.WaitGroup } -func NewRTMPServer(environment env.ProxyEnvironment, opts ...func(*rtmpServer)) RTMPServer { - v := &rtmpServer{environment: environment} +func NewRTMPProxyServer(environment env.ProxyEnvironment, opts ...func(*rtmpProxyServer)) RTMPProxyServer { + v := &rtmpProxyServer{environment: environment} for _, opt := range opts { opt(v) } return v } -func (v *rtmpServer) Close() error { +func (v *rtmpProxyServer) Close() error { if v.listener != nil { v.listener.Close() } @@ -54,7 +54,7 @@ func (v *rtmpServer) Close() error { return nil } -func (v *rtmpServer) Run(ctx context.Context) error { +func (v *rtmpProxyServer) Run(ctx context.Context) error { endpoint := v.environment.RtmpServer() if !strings.Contains(endpoint, ":") { endpoint = ":" + endpoint diff --git a/internal/server/srt.go b/internal/proxy/srt.go similarity index 97% rename from internal/server/srt.go rename to internal/proxy/srt.go index 0da23f51f..7f3ba1fee 100644 --- a/internal/server/srt.go +++ b/internal/proxy/srt.go @@ -1,7 +1,7 @@ // Copyright (c) 2026 Winlin // // SPDX-License-Identifier: MIT -package server +package proxy import ( "bytes" @@ -21,10 +21,10 @@ import ( "srsx/internal/utils" ) -// srsSRTServer is the proxy for SRS server via SRT. It will figure out which backend server to +// srsSRTProxyServer is the proxy for SRS server via SRT. It will figure out which backend server to // proxy to. It only parses the SRT handshake messages, parses the stream id, and proxy to the // backend server. -type srsSRTServer struct { +type srsSRTProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The UDP listener for SRT server. @@ -39,8 +39,8 @@ type srsSRTServer struct { wg stdSync.WaitGroup } -func NewSRSSRTServer(environment env.ProxyEnvironment, opts ...func(*srsSRTServer)) *srsSRTServer { - v := &srsSRTServer{ +func NewSRSSRTProxyServer(environment env.ProxyEnvironment, opts ...func(*srsSRTProxyServer)) *srsSRTProxyServer { + v := &srsSRTProxyServer{ environment: environment, start: time.Now(), sockets: sync.NewMap[uint32, *SRTConnection](), @@ -52,7 +52,7 @@ func NewSRSSRTServer(environment env.ProxyEnvironment, opts ...func(*srsSRTServe return v } -func (v *srsSRTServer) Close() error { +func (v *srsSRTProxyServer) Close() error { if v.listener != nil { v.listener.Close() } @@ -61,7 +61,7 @@ func (v *srsSRTServer) Close() error { return nil } -func (v *srsSRTServer) Run(ctx context.Context) error { +func (v *srsSRTProxyServer) Run(ctx context.Context) error { // Parse address to listen. endpoint := v.environment.SRTServer() if !strings.Contains(endpoint, ":") { @@ -109,7 +109,7 @@ func (v *srsSRTServer) Run(ctx context.Context) error { return nil } -func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { +func (v *srsSRTProxyServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { socketID := utils.SrtParseSocketID(data) var pkt *SRTHandshakePacket