Proxy: Rename internal/server package to internal/proxy
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
3663a8e38f
commit
8db8f8d9b4
|
|
@ -215,7 +215,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine
|
|||
|
||||
`internal/bootstrap` — Server startup and lifecycle orchestration. Sets up logging context, signal handlers, loads environment, installs force-quit timer, optionally starts pprof, initializes the load balancer (memory or Redis based on `PROXY_LOAD_BALANCER_TYPE`), then starts all six servers sequentially (RTMP, WebRTC, HTTP API, SRT, System API, HTTP Stream) and blocks until context is cancelled. Deferred `Close()` on each server ensures graceful shutdown.
|
||||
|
||||
`internal/server` — Proxy server implementations. Each server accepts client connections, parses just enough of the protocol to extract the stream URL, picks a backend via the load balancer, and proxies traffic bidirectionally. Contains five proxy servers: (1) **RTMP proxy** (`rtmp.go`) — TCP listener, simple handshake, parses connect/publish/play to get stream URL, bidirectional RTMP message copying, stateless. (2) **HTTP stream proxy** (`http.go`) — serves static files, proxies HTTP-FLV/TS via reverse-proxy, proxies HLS m3u8 with `spbhid` rewriting so TS segment requests route to the same backend. (3) **WebRTC proxy** (`rtc.go`) — two-phase: WHIP/WHEP signaling (SDP rewrite to replace backend UDP port with proxy's) + UDP media transport (identifies connections by STUN ufrag, supports address migration), stateful. (4) **SRT proxy** (`srt.go`) — intercepts SRT 4-step handshake locally, parses stream ID on handshake 2, replays full handshake with backend, then proxies UDP bidirectionally, stateful per-connection. (5) **HTTP API + System API** (`api.go`) — HTTP API delegates WHIP/WHEP to WebRTC server; System API provides `/api/v1/srs/register` where backend SRS C++ servers register themselves so the load balancer knows about them.
|
||||
`internal/proxy` — Proxy server implementations. Each server accepts client connections, parses just enough of the protocol to extract the stream URL, picks a backend via the load balancer, and proxies traffic bidirectionally. Contains five proxy servers: (1) **RTMP proxy** (`rtmp.go`) — TCP listener, simple handshake, parses connect/publish/play to get stream URL, bidirectional RTMP message copying, stateless. (2) **HTTP stream proxy** (`http.go`) — serves static files, proxies HTTP-FLV/TS via reverse-proxy, proxies HLS m3u8 with `spbhid` rewriting so TS segment requests route to the same backend. (3) **WebRTC proxy** (`rtc.go`) — two-phase: WHIP/WHEP signaling (SDP rewrite to replace backend UDP port with proxy's) + UDP media transport (identifies connections by STUN ufrag, supports address migration), stateful. (4) **SRT proxy** (`srt.go`) — intercepts SRT 4-step handshake locally, parses stream ID on handshake 2, replays full handshake with backend, then proxies UDP bidirectionally, stateful per-connection. (5) **HTTP API + System API** (`api.go`) — HTTP API delegates WHIP/WHEP to WebRTC server; System API provides `/api/v1/srs/register` where backend SRS C++ servers register themselves so the load balancer knows about them.
|
||||
|
||||
`internal/rtmp` — RTMP protocol implementation (parsing, not proxying). Full RTMP chunk stream and message protocol: simple handshake (C0/C1/C2), chunk stream reader/writer with all four format types, extended timestamp, message reassembly from chunks. Defines all RTMP message types, chunk stream IDs, and command names. Packet types include ConnectApp, CreateStream, Publish, Play, Call, SetChunkSize, WindowAcknowledgementSize, SetPeerBandwidth, UserControl. Uses Go generics (`ExpectPacket[T]`) to read until a specific packet type arrives. Also includes full AMF0 encoder/decoder supporting Number, Boolean, String, Object, Null, Undefined, EcmaArray, StrictArray, Date, LongString — with ordered key-value maps, auto-type-discovery, and safe type converters.
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
"srsx/internal/errors"
|
||||
"srsx/internal/lb"
|
||||
"srsx/internal/logger"
|
||||
"srsx/internal/server"
|
||||
"srsx/internal/proxy"
|
||||
"srsx/internal/signal"
|
||||
"srsx/internal/version"
|
||||
)
|
||||
|
|
@ -99,46 +99,46 @@ func (b *proxyBootstrap) initializeLoadBalancer(ctx context.Context, environment
|
|||
// startServers initializes and starts all protocol servers.
|
||||
func (b *proxyBootstrap) startServers(ctx context.Context, environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) error {
|
||||
// Start the RTMP server.
|
||||
rtmpServer := server.NewRTMPServer(environment)
|
||||
if err := rtmpServer.Run(ctx); err != nil {
|
||||
rtmpProxyServer := proxy.NewRTMPProxyServer(environment)
|
||||
if err := rtmpProxyServer.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "rtmp server")
|
||||
}
|
||||
defer rtmpServer.Close()
|
||||
defer rtmpProxyServer.Close()
|
||||
|
||||
// Start the WebRTC server.
|
||||
webRTCServer := server.NewWebRTCServer(environment)
|
||||
if err := webRTCServer.Run(ctx); err != nil {
|
||||
webRTCProxyServer := proxy.NewWebRTCProxyServer(environment)
|
||||
if err := webRTCProxyServer.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "rtc server")
|
||||
}
|
||||
defer webRTCServer.Close()
|
||||
defer webRTCProxyServer.Close()
|
||||
|
||||
// Start the HTTP API server.
|
||||
httpAPIServer := server.NewHTTPAPIServer(environment, gracefulQuitTimeout, webRTCServer)
|
||||
if err := httpAPIServer.Run(ctx); err != nil {
|
||||
httpAPIProxyServer := proxy.NewHTTPAPIProxyServer(environment, gracefulQuitTimeout, webRTCProxyServer)
|
||||
if err := httpAPIProxyServer.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "http api server")
|
||||
}
|
||||
defer httpAPIServer.Close()
|
||||
defer httpAPIProxyServer.Close()
|
||||
|
||||
// Start the SRT server.
|
||||
srsSRTServer := server.NewSRSSRTServer(environment)
|
||||
if err := srsSRTServer.Run(ctx); err != nil {
|
||||
srsSRTProxyServer := proxy.NewSRSSRTProxyServer(environment)
|
||||
if err := srsSRTProxyServer.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "srt server")
|
||||
}
|
||||
defer srsSRTServer.Close()
|
||||
defer srsSRTProxyServer.Close()
|
||||
|
||||
// Start the System API server.
|
||||
systemAPI := server.NewSystemAPI(environment, gracefulQuitTimeout)
|
||||
systemAPI := proxy.NewSystemAPI(environment, gracefulQuitTimeout)
|
||||
if err := systemAPI.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "system api server")
|
||||
}
|
||||
defer systemAPI.Close()
|
||||
|
||||
// Start the HTTP web server.
|
||||
httpStreamServer := server.NewHTTPStreamServer(environment, gracefulQuitTimeout)
|
||||
if err := httpStreamServer.Run(ctx); err != nil {
|
||||
httpStreamProxyServer := proxy.NewHTTPStreamProxyServer(environment, gracefulQuitTimeout)
|
||||
if err := httpStreamProxyServer.Run(ctx); err != nil {
|
||||
return errors.Wrapf(err, "http server")
|
||||
}
|
||||
defer httpStreamServer.Close()
|
||||
defer httpStreamProxyServer.Close()
|
||||
|
||||
// Wait for the main loop to quit.
|
||||
<-ctx.Done()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) 2026 Winlin
|
||||
//
|
||||
// SPDX-License-Identifier: MIT
|
||||
package server
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -20,28 +20,28 @@ import (
|
|||
"srsx/internal/version"
|
||||
)
|
||||
|
||||
// HTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
|
||||
// HTTPAPIProxyServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
|
||||
// to proxy other HTTP API of SRS like the streams and clients, etc.
|
||||
type HTTPAPIServer interface {
|
||||
type HTTPAPIProxyServer interface {
|
||||
Run(ctx context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type httpAPIServer struct {
|
||||
type httpAPIProxyServer struct {
|
||||
// The environment interface.
|
||||
environment env.ProxyEnvironment
|
||||
// The underlayer HTTP server.
|
||||
server *http.Server
|
||||
// The WebRTC server.
|
||||
rtc WebRTCServer
|
||||
rtc WebRTCProxyServer
|
||||
// The gracefully quit timeout, wait server to quit.
|
||||
gracefulQuitTimeout time.Duration
|
||||
// The wait group for all goroutines.
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCServer) HTTPAPIServer {
|
||||
v := &httpAPIServer{
|
||||
func NewHTTPAPIProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCProxyServer) HTTPAPIProxyServer {
|
||||
v := &httpAPIProxyServer{
|
||||
environment: environment,
|
||||
gracefulQuitTimeout: gracefulQuitTimeout,
|
||||
rtc: rtc,
|
||||
|
|
@ -49,7 +49,7 @@ func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time
|
|||
return v
|
||||
}
|
||||
|
||||
func (v *httpAPIServer) Close() error {
|
||||
func (v *httpAPIProxyServer) Close() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
||||
defer cancel()
|
||||
v.server.Shutdown(ctx)
|
||||
|
|
@ -58,7 +58,7 @@ func (v *httpAPIServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *httpAPIServer) Run(ctx context.Context) error {
|
||||
func (v *httpAPIProxyServer) Run(ctx context.Context) error {
|
||||
// Parse address to listen.
|
||||
addr := v.environment.HttpAPI()
|
||||
if !strings.Contains(addr, ":") {
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) 2026 Winlin
|
||||
//
|
||||
// SPDX-License-Identifier: MIT
|
||||
package server
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -23,15 +23,15 @@ import (
|
|||
"srsx/internal/version"
|
||||
)
|
||||
|
||||
// HTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS,
|
||||
// HTTPStreamProxyServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS,
|
||||
// HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy
|
||||
// the request to the origin server.
|
||||
type HTTPStreamServer interface {
|
||||
type HTTPStreamProxyServer interface {
|
||||
Run(ctx context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type httpStreamServer struct {
|
||||
type httpStreamProxyServer struct {
|
||||
// The environment interface.
|
||||
environment env.ProxyEnvironment
|
||||
// The underlayer HTTP server.
|
||||
|
|
@ -42,15 +42,15 @@ type httpStreamServer struct {
|
|||
wg stdSync.WaitGroup
|
||||
}
|
||||
|
||||
func NewHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamServer {
|
||||
v := &httpStreamServer{
|
||||
func NewHTTPStreamProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamProxyServer {
|
||||
v := &httpStreamProxyServer{
|
||||
environment: environment,
|
||||
gracefulQuitTimeout: gracefulQuitTimeout,
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *httpStreamServer) Close() error {
|
||||
func (v *httpStreamProxyServer) Close() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
||||
defer cancel()
|
||||
v.server.Shutdown(ctx)
|
||||
|
|
@ -59,7 +59,7 @@ func (v *httpStreamServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *httpStreamServer) Run(ctx context.Context) error {
|
||||
func (v *httpStreamProxyServer) Run(ctx context.Context) error {
|
||||
// Parse address to listen.
|
||||
addr := v.environment.HttpServer()
|
||||
if !strings.Contains(addr, ":") {
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) 2026 Winlin
|
||||
//
|
||||
// SPDX-License-Identifier: MIT
|
||||
package server
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -23,17 +23,17 @@ import (
|
|||
"srsx/internal/utils"
|
||||
)
|
||||
|
||||
// WebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
|
||||
// WebRTCProxyServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
|
||||
// which backend server to proxy to. It will also replace the UDP port to the proxy server's in the
|
||||
// SDP answer.
|
||||
type WebRTCServer interface {
|
||||
type WebRTCProxyServer interface {
|
||||
Run(ctx context.Context) error
|
||||
Close() error
|
||||
HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error
|
||||
HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error
|
||||
}
|
||||
|
||||
type webRTCServer struct {
|
||||
type webRTCProxyServer struct {
|
||||
// The environment interface.
|
||||
environment env.ProxyEnvironment
|
||||
// The UDP listener for WebRTC server.
|
||||
|
|
@ -51,8 +51,8 @@ type webRTCServer struct {
|
|||
wg stdSync.WaitGroup
|
||||
}
|
||||
|
||||
func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServer)) WebRTCServer {
|
||||
v := &webRTCServer{
|
||||
func NewWebRTCProxyServer(environment env.ProxyEnvironment, opts ...func(*webRTCProxyServer)) WebRTCProxyServer {
|
||||
v := &webRTCProxyServer{
|
||||
environment: environment,
|
||||
usernames: sync.NewMap[string, *rtcConnection](),
|
||||
addresses: sync.NewMap[string, *rtcConnection](),
|
||||
|
|
@ -63,7 +63,7 @@ func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServe
|
|||
return v
|
||||
}
|
||||
|
||||
func (v *webRTCServer) Close() error {
|
||||
func (v *webRTCProxyServer) Close() error {
|
||||
if v.listener != nil {
|
||||
_ = v.listener.Close()
|
||||
}
|
||||
|
|
@ -72,7 +72,7 @@ func (v *webRTCServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *webRTCProxyServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
defer r.Body.Close()
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
|
|
@ -109,7 +109,7 @@ func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWrit
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *webRTCProxyServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
defer r.Body.Close()
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
|
|
@ -146,7 +146,7 @@ func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWrit
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *webRTCServer) proxyApiToBackend(
|
||||
func (v *webRTCProxyServer) proxyApiToBackend(
|
||||
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer,
|
||||
remoteSDPOffer string, streamURL string,
|
||||
) error {
|
||||
|
|
@ -246,7 +246,7 @@ func (v *webRTCServer) proxyApiToBackend(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *webRTCServer) Run(ctx context.Context) error {
|
||||
func (v *webRTCProxyServer) Run(ctx context.Context) error {
|
||||
// Parse address to listen.
|
||||
endpoint := v.environment.WebRTCServer()
|
||||
if !strings.Contains(endpoint, ":") {
|
||||
|
|
@ -294,7 +294,7 @@ func (v *webRTCServer) Run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *webRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
||||
func (v *webRTCProxyServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
||||
var connection *rtcConnection
|
||||
|
||||
// If STUN binding request, parse the ufrag and identify the connection.
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) 2026 Winlin
|
||||
//
|
||||
// SPDX-License-Identifier: MIT
|
||||
package server
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -20,15 +20,15 @@ import (
|
|||
"srsx/internal/version"
|
||||
)
|
||||
|
||||
// RTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
|
||||
// RTMPProxyServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
|
||||
// server. It will figure out the backend server to proxy to. Unlike the edge server, it will
|
||||
// not cache the stream, but just proxy the stream to backend.
|
||||
type RTMPServer interface {
|
||||
type RTMPProxyServer interface {
|
||||
Run(ctx context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type rtmpServer struct {
|
||||
type rtmpProxyServer struct {
|
||||
// The environment interface.
|
||||
environment env.ProxyEnvironment
|
||||
// The TCP listener for RTMP server.
|
||||
|
|
@ -37,15 +37,15 @@ type rtmpServer struct {
|
|||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewRTMPServer(environment env.ProxyEnvironment, opts ...func(*rtmpServer)) RTMPServer {
|
||||
v := &rtmpServer{environment: environment}
|
||||
func NewRTMPProxyServer(environment env.ProxyEnvironment, opts ...func(*rtmpProxyServer)) RTMPProxyServer {
|
||||
v := &rtmpProxyServer{environment: environment}
|
||||
for _, opt := range opts {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *rtmpServer) Close() error {
|
||||
func (v *rtmpProxyServer) Close() error {
|
||||
if v.listener != nil {
|
||||
v.listener.Close()
|
||||
}
|
||||
|
|
@ -54,7 +54,7 @@ func (v *rtmpServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *rtmpServer) Run(ctx context.Context) error {
|
||||
func (v *rtmpProxyServer) Run(ctx context.Context) error {
|
||||
endpoint := v.environment.RtmpServer()
|
||||
if !strings.Contains(endpoint, ":") {
|
||||
endpoint = ":" + endpoint
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) 2026 Winlin
|
||||
//
|
||||
// SPDX-License-Identifier: MIT
|
||||
package server
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
|
@ -21,10 +21,10 @@ import (
|
|||
"srsx/internal/utils"
|
||||
)
|
||||
|
||||
// srsSRTServer is the proxy for SRS server via SRT. It will figure out which backend server to
|
||||
// srsSRTProxyServer is the proxy for SRS server via SRT. It will figure out which backend server to
|
||||
// proxy to. It only parses the SRT handshake messages, parses the stream id, and proxy to the
|
||||
// backend server.
|
||||
type srsSRTServer struct {
|
||||
type srsSRTProxyServer struct {
|
||||
// The environment interface.
|
||||
environment env.ProxyEnvironment
|
||||
// The UDP listener for SRT server.
|
||||
|
|
@ -39,8 +39,8 @@ type srsSRTServer struct {
|
|||
wg stdSync.WaitGroup
|
||||
}
|
||||
|
||||
func NewSRSSRTServer(environment env.ProxyEnvironment, opts ...func(*srsSRTServer)) *srsSRTServer {
|
||||
v := &srsSRTServer{
|
||||
func NewSRSSRTProxyServer(environment env.ProxyEnvironment, opts ...func(*srsSRTProxyServer)) *srsSRTProxyServer {
|
||||
v := &srsSRTProxyServer{
|
||||
environment: environment,
|
||||
start: time.Now(),
|
||||
sockets: sync.NewMap[uint32, *SRTConnection](),
|
||||
|
|
@ -52,7 +52,7 @@ func NewSRSSRTServer(environment env.ProxyEnvironment, opts ...func(*srsSRTServe
|
|||
return v
|
||||
}
|
||||
|
||||
func (v *srsSRTServer) Close() error {
|
||||
func (v *srsSRTProxyServer) Close() error {
|
||||
if v.listener != nil {
|
||||
v.listener.Close()
|
||||
}
|
||||
|
|
@ -61,7 +61,7 @@ func (v *srsSRTServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *srsSRTServer) Run(ctx context.Context) error {
|
||||
func (v *srsSRTProxyServer) Run(ctx context.Context) error {
|
||||
// Parse address to listen.
|
||||
endpoint := v.environment.SRTServer()
|
||||
if !strings.Contains(endpoint, ":") {
|
||||
|
|
@ -109,7 +109,7 @@ func (v *srsSRTServer) Run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
||||
func (v *srsSRTProxyServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
||||
socketID := utils.SrtParseSocketID(data)
|
||||
|
||||
var pkt *SRTHandshakePacket
|
||||
Loading…
Reference in New Issue
Block a user