From cb90bf1fec5b6563b62c0be4c9539f31a579fe09 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 1 May 2026 07:36:10 -0400 Subject: [PATCH] Codex: Refactor proxy server APIs. --- internal/bootstrap/proxy.go | 24 ++++++------ internal/rtmp/rtmp.go | 2 - internal/server/api.go | 19 ++++++---- internal/server/http.go | 57 ++++++++++++++++------------- internal/server/rtc.go | 73 ++++++++++++++++++++----------------- internal/server/rtmp.go | 49 ++++++++++++++----------- 6 files changed, 122 insertions(+), 102 deletions(-) diff --git a/internal/bootstrap/proxy.go b/internal/bootstrap/proxy.go index 11c52ff25..f59522b6d 100644 --- a/internal/bootstrap/proxy.go +++ b/internal/bootstrap/proxy.go @@ -99,25 +99,25 @@ func (b *proxyBootstrap) initializeLoadBalancer(ctx context.Context, environment // startServers initializes and starts all protocol servers. func (b *proxyBootstrap) startServers(ctx context.Context, environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) error { // Start the RTMP server. - srsRTMPServer := server.NewSRSRTMPServer(environment) - if err := srsRTMPServer.Run(ctx); err != nil { + rtmpServer := server.NewRTMPServer(environment) + if err := rtmpServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtmp server") } - defer srsRTMPServer.Close() + defer rtmpServer.Close() // Start the WebRTC server. - srsWebRTCServer := server.NewSRSWebRTCServer(environment) - if err := srsWebRTCServer.Run(ctx); err != nil { + webRTCServer := server.NewWebRTCServer(environment) + if err := webRTCServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtc server") } - defer srsWebRTCServer.Close() + defer webRTCServer.Close() // Start the HTTP API server. - srsHTTPAPIServer := server.NewSRSHTTPAPIServer(environment, gracefulQuitTimeout, srsWebRTCServer) - if err := srsHTTPAPIServer.Run(ctx); err != nil { + httpAPIServer := server.NewHTTPAPIServer(environment, gracefulQuitTimeout, webRTCServer) + if err := httpAPIServer.Run(ctx); err != nil { return errors.Wrapf(err, "http api server") } - defer srsHTTPAPIServer.Close() + defer httpAPIServer.Close() // Start the SRT server. srsSRTServer := server.NewSRSSRTServer(environment) @@ -134,11 +134,11 @@ func (b *proxyBootstrap) startServers(ctx context.Context, environment env.Proxy defer systemAPI.Close() // Start the HTTP web server. - srsHTTPStreamServer := server.NewSRSHTTPStreamServer(environment, gracefulQuitTimeout) - if err := srsHTTPStreamServer.Run(ctx); err != nil { + httpStreamServer := server.NewHTTPStreamServer(environment, gracefulQuitTimeout) + if err := httpStreamServer.Run(ctx); err != nil { return errors.Wrapf(err, "http server") } - defer srsHTTPStreamServer.Close() + defer httpStreamServer.Close() // Wait for the main loop to quit. <-ctx.Done() diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 640044a91..988804b3e 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -234,8 +234,6 @@ func (v *protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m M } } } - - return } func (v *protocol) parseAMFObject(p []byte) (pkt Packet, err error) { diff --git a/internal/server/api.go b/internal/server/api.go index f2aa41330..c8244c5bd 100644 --- a/internal/server/api.go +++ b/internal/server/api.go @@ -20,23 +20,28 @@ import ( "srsx/internal/version" ) -// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP, +// HTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP, // to proxy other HTTP API of SRS like the streams and clients, etc. -type srsHTTPAPIServer struct { +type HTTPAPIServer interface { + Run(ctx context.Context) error + Close() error +} + +type httpAPIServer struct { // The environment interface. environment env.ProxyEnvironment // The underlayer HTTP server. server *http.Server // The WebRTC server. - rtc *srsWebRTCServer + rtc WebRTCServer // The gracefully quit timeout, wait server to quit. gracefulQuitTimeout time.Duration // The wait group for all goroutines. wg sync.WaitGroup } -func NewSRSHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc *srsWebRTCServer) *srsHTTPAPIServer { - v := &srsHTTPAPIServer{ +func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCServer) HTTPAPIServer { + v := &httpAPIServer{ environment: environment, gracefulQuitTimeout: gracefulQuitTimeout, rtc: rtc, @@ -44,7 +49,7 @@ func NewSRSHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout t return v } -func (v *srsHTTPAPIServer) Close() error { +func (v *httpAPIServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -53,7 +58,7 @@ func (v *srsHTTPAPIServer) Close() error { return nil } -func (v *srsHTTPAPIServer) Run(ctx context.Context) error { +func (v *httpAPIServer) Run(ctx context.Context) error { // Parse address to listen. addr := v.environment.HttpAPI() if !strings.Contains(addr, ":") { diff --git a/internal/server/http.go b/internal/server/http.go index 1b11f763a..21db47741 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -23,10 +23,15 @@ import ( "srsx/internal/version" ) -// srsHTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS, +// HTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS, // HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy // the request to the origin server. -type srsHTTPStreamServer struct { +type HTTPStreamServer interface { + Run(ctx context.Context) error + Close() error +} + +type httpStreamServer struct { // The environment interface. environment env.ProxyEnvironment // The underlayer HTTP server. @@ -37,15 +42,15 @@ type srsHTTPStreamServer struct { wg stdSync.WaitGroup } -func NewSRSHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) *srsHTTPStreamServer { - v := &srsHTTPStreamServer{ +func NewHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamServer { + v := &httpStreamServer{ environment: environment, gracefulQuitTimeout: gracefulQuitTimeout, } return v } -func (v *srsHTTPStreamServer) Close() error { +func (v *httpStreamServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -54,7 +59,7 @@ func (v *srsHTTPStreamServer) Close() error { return nil } -func (v *srsHTTPStreamServer) Run(ctx context.Context) error { +func (v *httpStreamServer) Run(ctx context.Context) error { // Parse address to listen. addr := v.environment.HttpServer() if !strings.Contains(addr, ":") { @@ -123,12 +128,12 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { return } - stream, _ := lb.SrsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) { + stream, _ := lb.SrsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, newHLSPlayStream(func(s *hlsPlayStream) { s.SRSProxyBackendHLSID = logger.GenerateContextID() s.StreamURL, s.FullURL = streamURL, fullURL })) - stream.Initialize(ctx).(*HLSPlayStream).ServeHTTP(w, r) + stream.Initialize(ctx).(*hlsPlayStream).ServeHTTP(w, r) return } @@ -140,13 +145,13 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { if stream, err := lb.SrsLoadBalancer.LoadHLSBySPBHID(ctx, srsProxyBackendID); err != nil { http.Error(w, fmt.Sprintf("load stream by spbhid %v", srsProxyBackendID), http.StatusBadRequest) } else { - stream.Initialize(ctx).(*HLSPlayStream).ServeHTTP(w, r) + stream.Initialize(ctx).(*hlsPlayStream).ServeHTTP(w, r) } return } // Use HTTP pseudo streaming to proxy the request. - NewHTTPFlvTsConnection(func(c *HTTPFlvTsConnection) { + newHTTPFlvTsConnection(func(c *httpFlvTsConnection) { c.ctx = ctx }).ServeHTTP(w, r) return @@ -182,26 +187,26 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { return nil } -// HTTPFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS +// httpFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS // connection. There is no state need to be sync between proxy servers. // // When we got an HTTP FLV or TS request, we will parse the stream URL from the HTTP request, // then proxy to the corresponding backend server. All state is in the HTTP request, so this // connection is stateless. -type HTTPFlvTsConnection struct { +type httpFlvTsConnection struct { // The context for HTTP streaming. ctx context.Context } -func NewHTTPFlvTsConnection(opts ...func(*HTTPFlvTsConnection)) *HTTPFlvTsConnection { - v := &HTTPFlvTsConnection{} +func newHTTPFlvTsConnection(opts ...func(*httpFlvTsConnection)) *httpFlvTsConnection { + v := &httpFlvTsConnection{} for _, opt := range opts { opt(v) } return v } -func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (v *httpFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() ctx := logger.WithContext(v.ctx) @@ -212,7 +217,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) } } -func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *httpFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { // Always allow CORS for all requests. if ok := utils.ApiCORS(ctx, w, r); ok { return nil @@ -240,7 +245,7 @@ func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, return nil } -func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error { +func (v *httpFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error { // Parse HTTP port from backend. if len(backend.HTTP) == 0 { return errors.Errorf("no http stream server") @@ -288,14 +293,14 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons return nil } -// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS +// hlsPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS // clients will share this object, and they do not use the same ctx among proxy servers. // // Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections. // Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create // the spbhid which can be seen as the hash of stream URL or backend server. The spbhid enable us to convert // to the stream URL and then query the backend server to serve it. -type HLSPlayStream struct { +type hlsPlayStream struct { // The context for HLS streaming. ctx context.Context @@ -307,26 +312,26 @@ type HLSPlayStream struct { FullURL string `json:"full_url"` } -func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream { - v := &HLSPlayStream{} +func newHLSPlayStream(opts ...func(*hlsPlayStream)) *hlsPlayStream { + v := &hlsPlayStream{} for _, opt := range opts { opt(v) } return v } -func (v *HLSPlayStream) Initialize(ctx context.Context) lb.HLSPlayStream { +func (v *hlsPlayStream) Initialize(ctx context.Context) lb.HLSPlayStream { if v.ctx == nil { v.ctx = logger.WithContext(ctx) } return v } -func (v *HLSPlayStream) GetSPBHID() string { +func (v *hlsPlayStream) GetSPBHID() string { return v.SRSProxyBackendHLSID } -func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (v *hlsPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() if err := v.serve(v.ctx, w, r); err != nil { @@ -337,7 +342,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *hlsPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { ctx, streamURL, fullURL := v.ctx, v.StreamURL, v.FullURL // Always allow CORS for all requests. @@ -358,7 +363,7 @@ func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *htt return nil } -func (v *HLSPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error { +func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error { // Parse HTTP port from backend. if len(backend.HTTP) == 0 { return errors.Errorf("no rtmp server") diff --git a/internal/server/rtc.go b/internal/server/rtc.go index 4e981d323..7a85e0bbb 100644 --- a/internal/server/rtc.go +++ b/internal/server/rtc.go @@ -23,10 +23,17 @@ import ( "srsx/internal/utils" ) -// srsWebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out +// WebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out // which backend server to proxy to. It will also replace the UDP port to the proxy server's in the // SDP answer. -type srsWebRTCServer struct { +type WebRTCServer interface { + Run(ctx context.Context) error + Close() error + HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error + HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error +} + +type webRTCServer struct { // The environment interface. environment env.ProxyEnvironment // The UDP listener for WebRTC server. @@ -34,21 +41,21 @@ type srsWebRTCServer struct { // Fast cache for the username to identify the connection. // The key is username, the value is the UDP address. - usernames sync.Map[string, *RTCConnection] + usernames sync.Map[string, *rtcConnection] // Fast cache for the udp address to identify the connection. // The key is UDP address, the value is the username. // TODO: Support fast earch by uint64 address. - addresses sync.Map[string, *RTCConnection] + addresses sync.Map[string, *rtcConnection] // The wait group for server. wg stdSync.WaitGroup } -func NewSRSWebRTCServer(environment env.ProxyEnvironment, opts ...func(*srsWebRTCServer)) *srsWebRTCServer { - v := &srsWebRTCServer{ +func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServer)) WebRTCServer { + v := &webRTCServer{ environment: environment, - usernames: sync.NewMap[string, *RTCConnection](), - addresses: sync.NewMap[string, *RTCConnection](), + usernames: sync.NewMap[string, *rtcConnection](), + addresses: sync.NewMap[string, *rtcConnection](), } for _, opt := range opts { opt(v) @@ -56,7 +63,7 @@ func NewSRSWebRTCServer(environment env.ProxyEnvironment, opts ...func(*srsWebRT return v } -func (v *srsWebRTCServer) Close() error { +func (v *webRTCServer) Close() error { if v.listener != nil { _ = v.listener.Close() } @@ -65,7 +72,7 @@ func (v *srsWebRTCServer) Close() error { return nil } -func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -102,7 +109,7 @@ func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseW return nil } -func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -139,7 +146,7 @@ func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseW return nil } -func (v *srsWebRTCServer) proxyApiToBackend( +func (v *webRTCServer) proxyApiToBackend( ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer, remoteSDPOffer string, streamURL string, ) error { @@ -215,11 +222,11 @@ func (v *srsWebRTCServer) proxyApiToBackend( } // Save the new WebRTC connection to LB. - icePair := &RTCICEPair{ + icePair := &rtcICEPair{ RemoteICEUfrag: remoteICEUfrag, RemoteICEPwd: remoteICEPwd, LocalICEUfrag: localICEUfrag, LocalICEPwd: localICEPwd, } - if err := lb.SrsLoadBalancer.StoreWebRTC(ctx, streamURL, NewRTCConnection(func(c *RTCConnection) { + if err := lb.SrsLoadBalancer.StoreWebRTC(ctx, streamURL, newRTCConnection(func(c *rtcConnection) { c.StreamURL, c.Ufrag = streamURL, icePair.Ufrag() c.Initialize(ctx, v.listener) @@ -239,7 +246,7 @@ func (v *srsWebRTCServer) proxyApiToBackend( return nil } -func (v *srsWebRTCServer) Run(ctx context.Context) error { +func (v *webRTCServer) Run(ctx context.Context) error { // Parse address to listen. endpoint := v.environment.WebRTCServer() if !strings.Contains(endpoint, ":") { @@ -287,8 +294,8 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error { return nil } -func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { - var connection *RTCConnection +func (v *webRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { + var connection *rtcConnection // If STUN binding request, parse the ufrag and identify the connection. if err := func() error { @@ -296,7 +303,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr return nil } - var pkt RTCStunPacket + var pkt rtcStunPacket if err := pkt.UnmarshalBinary(data); err != nil { return errors.Wrapf(err, "unmarshal stun packet") } @@ -311,7 +318,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr if s, err := lb.SrsLoadBalancer.LoadWebRTCByUfrag(ctx, pkt.Username); err != nil { return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username) } else { - connection = s.(*RTCConnection).Initialize(ctx, v.listener) + connection = s.(*rtcConnection).Initialize(ctx, v.listener) logger.Debug(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL) } @@ -346,17 +353,17 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr return nil } -// RTCConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC +// rtcConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC // connection, identify by the ufrag in sdp offer/answer and ICE binding request. // // It's not like RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is -// in the client request. The RTCConnection is stateful, and need to sync the ufrag between +// in the client request. The rtcConnection is stateful, and need to sync the ufrag between // proxy servers. // // The media transport is UDP, which is also a special thing for WebRTC. So if the client switch // to another UDP address, it may connect to another WebRTC proxy, then we should discover the -// RTCConnection by the ufrag from the ICE binding request. -type RTCConnection struct { +// rtcConnection by the ufrag from the ICE binding request. +type rtcConnection struct { // The stream context for WebRTC streaming. ctx context.Context @@ -373,15 +380,15 @@ type RTCConnection struct { listenerUDP *net.UDPConn } -func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection { - v := &RTCConnection{} +func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection { + v := &rtcConnection{} for _, opt := range opts { opt(v) } return v } -func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection { +func (v *rtcConnection) Initialize(ctx context.Context, listener *net.UDPConn) *rtcConnection { if v.ctx == nil { v.ctx = logger.WithContext(ctx) } @@ -391,11 +398,11 @@ func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) * return v } -func (v *RTCConnection) GetUfrag() string { +func (v *rtcConnection) GetUfrag() string { return v.Ufrag } -func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error { +func (v *rtcConnection) HandlePacket(addr *net.UDPAddr, data []byte) error { ctx := v.ctx // Update the current UDP address. @@ -437,7 +444,7 @@ func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error { return nil } -func (v *RTCConnection) connectBackend(ctx context.Context) error { +func (v *rtcConnection) connectBackend(ctx context.Context) error { if v.backendUDP != nil { return nil } @@ -470,7 +477,7 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error { return nil } -type RTCICEPair struct { +type rtcICEPair struct { // The remote ufrag, used for ICE username and session id. RemoteICEUfrag string `json:"remote_ufrag"` // The remote pwd, used for ICE password. @@ -482,18 +489,18 @@ type RTCICEPair struct { } // Generate the ICE ufrag for the WebRTC streaming, format is remote-ufrag:local-ufrag. -func (v *RTCICEPair) Ufrag() string { +func (v *rtcICEPair) Ufrag() string { return fmt.Sprintf("%v:%v", v.LocalICEUfrag, v.RemoteICEUfrag) } -type RTCStunPacket struct { +type rtcStunPacket struct { // The stun message type. MessageType uint16 // The stun username, or ufrag. Username string } -func (v *RTCStunPacket) UnmarshalBinary(data []byte) error { +func (v *rtcStunPacket) UnmarshalBinary(data []byte) error { if len(data) < 20 { return errors.Errorf("stun packet too short %v", len(data)) } diff --git a/internal/server/rtmp.go b/internal/server/rtmp.go index 33b5f7bd7..b787e99c8 100644 --- a/internal/server/rtmp.go +++ b/internal/server/rtmp.go @@ -20,10 +20,15 @@ import ( "srsx/internal/version" ) -// srsRTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS +// RTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS // server. It will figure out the backend server to proxy to. Unlike the edge server, it will // not cache the stream, but just proxy the stream to backend. -type srsRTMPServer struct { +type RTMPServer interface { + Run(ctx context.Context) error + Close() error +} + +type rtmpServer struct { // The environment interface. environment env.ProxyEnvironment // The TCP listener for RTMP server. @@ -32,15 +37,15 @@ type srsRTMPServer struct { wg sync.WaitGroup } -func NewSRSRTMPServer(environment env.ProxyEnvironment, opts ...func(*srsRTMPServer)) *srsRTMPServer { - v := &srsRTMPServer{environment: environment} +func NewRTMPServer(environment env.ProxyEnvironment, opts ...func(*rtmpServer)) RTMPServer { + v := &rtmpServer{environment: environment} for _, opt := range opts { opt(v) } return v } -func (v *srsRTMPServer) Close() error { +func (v *rtmpServer) Close() error { if v.listener != nil { v.listener.Close() } @@ -49,7 +54,7 @@ func (v *srsRTMPServer) Close() error { return nil } -func (v *srsRTMPServer) Run(ctx context.Context) error { +func (v *rtmpServer) Run(ctx context.Context) error { endpoint := v.environment.RtmpServer() if !strings.Contains(endpoint, ":") { endpoint = ":" + endpoint @@ -97,7 +102,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { } } - rc := NewRTMPConnection() + rc := newRTMPConnection() if err := rc.serve(ctx, conn); err != nil { handleErr(err) } else { @@ -110,24 +115,24 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { return nil } -// RTMPConnection is an RTMP streaming connection. There is no state need to be sync between +// rtmpConnection is an RTMP streaming connection. There is no state need to be sync between // proxy servers. // // When we got an RTMP request, we will parse the stream URL from the RTMP publish or play request, // then proxy to the corresponding backend server. All state is in the RTMP request, so this // connection is stateless. -type RTMPConnection struct { +type rtmpConnection struct { } -func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection { - v := &RTMPConnection{} +func newRTMPConnection(opts ...func(*rtmpConnection)) *rtmpConnection { + v := &rtmpConnection{} for _, opt := range opts { opt(v) } return v } -func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { +func (v *rtmpConnection) serve(ctx context.Context, conn *net.TCPConn) error { logger.Debug(ctx, "Got RTMP client from %v", conn.RemoteAddr()) // If any goroutine quit, cancel another one. @@ -135,7 +140,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - var backend *RTMPClientToBackend + var backend *rtmpClientToBackend if true { go func() { <-ctx.Done() @@ -289,7 +294,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { tcUrl, streamName, currentStreamID, clientType) // Find a backend SRS server to proxy the RTMP stream. - backend = NewRTMPClientToBackend(func(client *RTMPClientToBackend) { + backend = newRTMPClientToBackend(func(client *rtmpClientToBackend) { client.typ = clientType }) defer backend.Close() @@ -416,8 +421,8 @@ const ( RTMPClientTypeViewer RTMPClientType = "viewer" ) -// RTMPClientToBackend is a RTMP client to proxy the RTMP stream to backend. -type RTMPClientToBackend struct { +// rtmpClientToBackend is an RTMP client to proxy the RTMP stream to backend. +type rtmpClientToBackend struct { // The underlayer tcp client. tcpConn *net.TCPConn // The RTMP protocol client. @@ -426,22 +431,22 @@ type RTMPClientToBackend struct { typ RTMPClientType } -func NewRTMPClientToBackend(opts ...func(*RTMPClientToBackend)) *RTMPClientToBackend { - v := &RTMPClientToBackend{} +func newRTMPClientToBackend(opts ...func(*rtmpClientToBackend)) *rtmpClientToBackend { + v := &rtmpClientToBackend{} for _, opt := range opts { opt(v) } return v } -func (v *RTMPClientToBackend) Close() error { +func (v *rtmpClientToBackend) Close() error { if v.tcpConn != nil { v.tcpConn.Close() } return nil } -func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName string) error { +func (v *rtmpClientToBackend) Connect(ctx context.Context, tcUrl, streamName string) error { // Build the stream URL in vhost/app/stream schema. streamURL, err := utils.BuildStreamURL(fmt.Sprintf("%v/%v", tcUrl, streamName)) if err != nil { @@ -527,7 +532,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str return v.publish(ctx, client, streamName) } -func (v *RTMPClientToBackend) publish(ctx context.Context, client rtmp.Protocol, streamName string) error { +func (v *rtmpClientToBackend) publish(ctx context.Context, client rtmp.Protocol, streamName string) error { if true { identifyReq := rtmp.NewCallPacket() identifyReq.CommandName = "releaseStream" @@ -620,7 +625,7 @@ func (v *RTMPClientToBackend) publish(ctx context.Context, client rtmp.Protocol, return nil } -func (v *RTMPClientToBackend) play(ctx context.Context, client rtmp.Protocol, streamName string) error { +func (v *rtmpClientToBackend) play(ctx context.Context, client rtmp.Protocol, streamName string) error { var currentStreamID int if true { createStream := rtmp.NewCreateStreamPacket()