Codex: Refactor proxy server APIs.

This commit is contained in:
winlin 2026-05-01 07:36:10 -04:00
parent b6dd97ddff
commit cb90bf1fec
6 changed files with 122 additions and 102 deletions

View File

@ -99,25 +99,25 @@ func (b *proxyBootstrap) initializeLoadBalancer(ctx context.Context, environment
// startServers initializes and starts all protocol servers.
func (b *proxyBootstrap) startServers(ctx context.Context, environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) error {
// Start the RTMP server.
srsRTMPServer := server.NewSRSRTMPServer(environment)
if err := srsRTMPServer.Run(ctx); err != nil {
rtmpServer := server.NewRTMPServer(environment)
if err := rtmpServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtmp server")
}
defer srsRTMPServer.Close()
defer rtmpServer.Close()
// Start the WebRTC server.
srsWebRTCServer := server.NewSRSWebRTCServer(environment)
if err := srsWebRTCServer.Run(ctx); err != nil {
webRTCServer := server.NewWebRTCServer(environment)
if err := webRTCServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtc server")
}
defer srsWebRTCServer.Close()
defer webRTCServer.Close()
// Start the HTTP API server.
srsHTTPAPIServer := server.NewSRSHTTPAPIServer(environment, gracefulQuitTimeout, srsWebRTCServer)
if err := srsHTTPAPIServer.Run(ctx); err != nil {
httpAPIServer := server.NewHTTPAPIServer(environment, gracefulQuitTimeout, webRTCServer)
if err := httpAPIServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http api server")
}
defer srsHTTPAPIServer.Close()
defer httpAPIServer.Close()
// Start the SRT server.
srsSRTServer := server.NewSRSSRTServer(environment)
@ -134,11 +134,11 @@ func (b *proxyBootstrap) startServers(ctx context.Context, environment env.Proxy
defer systemAPI.Close()
// Start the HTTP web server.
srsHTTPStreamServer := server.NewSRSHTTPStreamServer(environment, gracefulQuitTimeout)
if err := srsHTTPStreamServer.Run(ctx); err != nil {
httpStreamServer := server.NewHTTPStreamServer(environment, gracefulQuitTimeout)
if err := httpStreamServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http server")
}
defer srsHTTPStreamServer.Close()
defer httpStreamServer.Close()
// Wait for the main loop to quit.
<-ctx.Done()

View File

@ -234,8 +234,6 @@ func (v *protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m M
}
}
}
return
}
func (v *protocol) parseAMFObject(p []byte) (pkt Packet, err error) {

View File

@ -20,23 +20,28 @@ import (
"srsx/internal/version"
)
// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
// HTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
// to proxy other HTTP API of SRS like the streams and clients, etc.
type srsHTTPAPIServer struct {
type HTTPAPIServer interface {
Run(ctx context.Context) error
Close() error
}
type httpAPIServer struct {
// The environment interface.
environment env.ProxyEnvironment
// The underlayer HTTP server.
server *http.Server
// The WebRTC server.
rtc *srsWebRTCServer
rtc WebRTCServer
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}
func NewSRSHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc *srsWebRTCServer) *srsHTTPAPIServer {
v := &srsHTTPAPIServer{
func NewHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCServer) HTTPAPIServer {
v := &httpAPIServer{
environment: environment,
gracefulQuitTimeout: gracefulQuitTimeout,
rtc: rtc,
@ -44,7 +49,7 @@ func NewSRSHTTPAPIServer(environment env.ProxyEnvironment, gracefulQuitTimeout t
return v
}
func (v *srsHTTPAPIServer) Close() error {
func (v *httpAPIServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
@ -53,7 +58,7 @@ func (v *srsHTTPAPIServer) Close() error {
return nil
}
func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
func (v *httpAPIServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := v.environment.HttpAPI()
if !strings.Contains(addr, ":") {

View File

@ -23,10 +23,15 @@ import (
"srsx/internal/version"
)
// srsHTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS,
// HTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS,
// HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy
// the request to the origin server.
type srsHTTPStreamServer struct {
type HTTPStreamServer interface {
Run(ctx context.Context) error
Close() error
}
type httpStreamServer struct {
// The environment interface.
environment env.ProxyEnvironment
// The underlayer HTTP server.
@ -37,15 +42,15 @@ type srsHTTPStreamServer struct {
wg stdSync.WaitGroup
}
func NewSRSHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) *srsHTTPStreamServer {
v := &srsHTTPStreamServer{
func NewHTTPStreamServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration) HTTPStreamServer {
v := &httpStreamServer{
environment: environment,
gracefulQuitTimeout: gracefulQuitTimeout,
}
return v
}
func (v *srsHTTPStreamServer) Close() error {
func (v *httpStreamServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
@ -54,7 +59,7 @@ func (v *srsHTTPStreamServer) Close() error {
return nil
}
func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
func (v *httpStreamServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := v.environment.HttpServer()
if !strings.Contains(addr, ":") {
@ -123,12 +128,12 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
return
}
stream, _ := lb.SrsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) {
stream, _ := lb.SrsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, newHLSPlayStream(func(s *hlsPlayStream) {
s.SRSProxyBackendHLSID = logger.GenerateContextID()
s.StreamURL, s.FullURL = streamURL, fullURL
}))
stream.Initialize(ctx).(*HLSPlayStream).ServeHTTP(w, r)
stream.Initialize(ctx).(*hlsPlayStream).ServeHTTP(w, r)
return
}
@ -140,13 +145,13 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
if stream, err := lb.SrsLoadBalancer.LoadHLSBySPBHID(ctx, srsProxyBackendID); err != nil {
http.Error(w, fmt.Sprintf("load stream by spbhid %v", srsProxyBackendID), http.StatusBadRequest)
} else {
stream.Initialize(ctx).(*HLSPlayStream).ServeHTTP(w, r)
stream.Initialize(ctx).(*hlsPlayStream).ServeHTTP(w, r)
}
return
}
// Use HTTP pseudo streaming to proxy the request.
NewHTTPFlvTsConnection(func(c *HTTPFlvTsConnection) {
newHTTPFlvTsConnection(func(c *httpFlvTsConnection) {
c.ctx = ctx
}).ServeHTTP(w, r)
return
@ -182,26 +187,26 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
return nil
}
// HTTPFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS
// httpFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS
// connection. There is no state need to be sync between proxy servers.
//
// When we got an HTTP FLV or TS request, we will parse the stream URL from the HTTP request,
// then proxy to the corresponding backend server. All state is in the HTTP request, so this
// connection is stateless.
type HTTPFlvTsConnection struct {
type httpFlvTsConnection struct {
// The context for HTTP streaming.
ctx context.Context
}
func NewHTTPFlvTsConnection(opts ...func(*HTTPFlvTsConnection)) *HTTPFlvTsConnection {
v := &HTTPFlvTsConnection{}
func newHTTPFlvTsConnection(opts ...func(*httpFlvTsConnection)) *httpFlvTsConnection {
v := &httpFlvTsConnection{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (v *httpFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
ctx := logger.WithContext(v.ctx)
@ -212,7 +217,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
}
func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *httpFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Always allow CORS for all requests.
if ok := utils.ApiCORS(ctx, w, r); ok {
return nil
@ -240,7 +245,7 @@ func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter,
return nil
}
func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
func (v *httpFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no http stream server")
@ -288,14 +293,14 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons
return nil
}
// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS
// hlsPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS
// clients will share this object, and they do not use the same ctx among proxy servers.
//
// Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections.
// Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create
// the spbhid which can be seen as the hash of stream URL or backend server. The spbhid enable us to convert
// to the stream URL and then query the backend server to serve it.
type HLSPlayStream struct {
type hlsPlayStream struct {
// The context for HLS streaming.
ctx context.Context
@ -307,26 +312,26 @@ type HLSPlayStream struct {
FullURL string `json:"full_url"`
}
func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream {
v := &HLSPlayStream{}
func newHLSPlayStream(opts ...func(*hlsPlayStream)) *hlsPlayStream {
v := &hlsPlayStream{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *HLSPlayStream) Initialize(ctx context.Context) lb.HLSPlayStream {
func (v *hlsPlayStream) Initialize(ctx context.Context) lb.HLSPlayStream {
if v.ctx == nil {
v.ctx = logger.WithContext(ctx)
}
return v
}
func (v *HLSPlayStream) GetSPBHID() string {
func (v *hlsPlayStream) GetSPBHID() string {
return v.SRSProxyBackendHLSID
}
func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (v *hlsPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if err := v.serve(v.ctx, w, r); err != nil {
@ -337,7 +342,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *hlsPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
ctx, streamURL, fullURL := v.ctx, v.StreamURL, v.FullURL
// Always allow CORS for all requests.
@ -358,7 +363,7 @@ func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}
func (v *HLSPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no rtmp server")

View File

@ -23,10 +23,17 @@ import (
"srsx/internal/utils"
)
// srsWebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
// WebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
// which backend server to proxy to. It will also replace the UDP port to the proxy server's in the
// SDP answer.
type srsWebRTCServer struct {
type WebRTCServer interface {
Run(ctx context.Context) error
Close() error
HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error
HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}
type webRTCServer struct {
// The environment interface.
environment env.ProxyEnvironment
// The UDP listener for WebRTC server.
@ -34,21 +41,21 @@ type srsWebRTCServer struct {
// Fast cache for the username to identify the connection.
// The key is username, the value is the UDP address.
usernames sync.Map[string, *RTCConnection]
usernames sync.Map[string, *rtcConnection]
// Fast cache for the udp address to identify the connection.
// The key is UDP address, the value is the username.
// TODO: Support fast earch by uint64 address.
addresses sync.Map[string, *RTCConnection]
addresses sync.Map[string, *rtcConnection]
// The wait group for server.
wg stdSync.WaitGroup
}
func NewSRSWebRTCServer(environment env.ProxyEnvironment, opts ...func(*srsWebRTCServer)) *srsWebRTCServer {
v := &srsWebRTCServer{
func NewWebRTCServer(environment env.ProxyEnvironment, opts ...func(*webRTCServer)) WebRTCServer {
v := &webRTCServer{
environment: environment,
usernames: sync.NewMap[string, *RTCConnection](),
addresses: sync.NewMap[string, *RTCConnection](),
usernames: sync.NewMap[string, *rtcConnection](),
addresses: sync.NewMap[string, *rtcConnection](),
}
for _, opt := range opts {
opt(v)
@ -56,7 +63,7 @@ func NewSRSWebRTCServer(environment env.ProxyEnvironment, opts ...func(*srsWebRT
return v
}
func (v *srsWebRTCServer) Close() error {
func (v *webRTCServer) Close() error {
if v.listener != nil {
_ = v.listener.Close()
}
@ -65,7 +72,7 @@ func (v *srsWebRTCServer) Close() error {
return nil
}
func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *webRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
ctx = logger.WithContext(ctx)
@ -102,7 +109,7 @@ func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseW
return nil
}
func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *webRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
ctx = logger.WithContext(ctx)
@ -139,7 +146,7 @@ func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseW
return nil
}
func (v *srsWebRTCServer) proxyApiToBackend(
func (v *webRTCServer) proxyApiToBackend(
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.SRSServer,
remoteSDPOffer string, streamURL string,
) error {
@ -215,11 +222,11 @@ func (v *srsWebRTCServer) proxyApiToBackend(
}
// Save the new WebRTC connection to LB.
icePair := &RTCICEPair{
icePair := &rtcICEPair{
RemoteICEUfrag: remoteICEUfrag, RemoteICEPwd: remoteICEPwd,
LocalICEUfrag: localICEUfrag, LocalICEPwd: localICEPwd,
}
if err := lb.SrsLoadBalancer.StoreWebRTC(ctx, streamURL, NewRTCConnection(func(c *RTCConnection) {
if err := lb.SrsLoadBalancer.StoreWebRTC(ctx, streamURL, newRTCConnection(func(c *rtcConnection) {
c.StreamURL, c.Ufrag = streamURL, icePair.Ufrag()
c.Initialize(ctx, v.listener)
@ -239,7 +246,7 @@ func (v *srsWebRTCServer) proxyApiToBackend(
return nil
}
func (v *srsWebRTCServer) Run(ctx context.Context) error {
func (v *webRTCServer) Run(ctx context.Context) error {
// Parse address to listen.
endpoint := v.environment.WebRTCServer()
if !strings.Contains(endpoint, ":") {
@ -287,8 +294,8 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error {
return nil
}
func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
var connection *RTCConnection
func (v *webRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
var connection *rtcConnection
// If STUN binding request, parse the ufrag and identify the connection.
if err := func() error {
@ -296,7 +303,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr
return nil
}
var pkt RTCStunPacket
var pkt rtcStunPacket
if err := pkt.UnmarshalBinary(data); err != nil {
return errors.Wrapf(err, "unmarshal stun packet")
}
@ -311,7 +318,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr
if s, err := lb.SrsLoadBalancer.LoadWebRTCByUfrag(ctx, pkt.Username); err != nil {
return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username)
} else {
connection = s.(*RTCConnection).Initialize(ctx, v.listener)
connection = s.(*rtcConnection).Initialize(ctx, v.listener)
logger.Debug(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL)
}
@ -346,17 +353,17 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr
return nil
}
// RTCConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC
// rtcConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC
// connection, identify by the ufrag in sdp offer/answer and ICE binding request.
//
// It's not like RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is
// in the client request. The RTCConnection is stateful, and need to sync the ufrag between
// in the client request. The rtcConnection is stateful, and need to sync the ufrag between
// proxy servers.
//
// The media transport is UDP, which is also a special thing for WebRTC. So if the client switch
// to another UDP address, it may connect to another WebRTC proxy, then we should discover the
// RTCConnection by the ufrag from the ICE binding request.
type RTCConnection struct {
// rtcConnection by the ufrag from the ICE binding request.
type rtcConnection struct {
// The stream context for WebRTC streaming.
ctx context.Context
@ -373,15 +380,15 @@ type RTCConnection struct {
listenerUDP *net.UDPConn
}
func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection {
v := &RTCConnection{}
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
v := &rtcConnection{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection {
func (v *rtcConnection) Initialize(ctx context.Context, listener *net.UDPConn) *rtcConnection {
if v.ctx == nil {
v.ctx = logger.WithContext(ctx)
}
@ -391,11 +398,11 @@ func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *
return v
}
func (v *RTCConnection) GetUfrag() string {
func (v *rtcConnection) GetUfrag() string {
return v.Ufrag
}
func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
func (v *rtcConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
ctx := v.ctx
// Update the current UDP address.
@ -437,7 +444,7 @@ func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
return nil
}
func (v *RTCConnection) connectBackend(ctx context.Context) error {
func (v *rtcConnection) connectBackend(ctx context.Context) error {
if v.backendUDP != nil {
return nil
}
@ -470,7 +477,7 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error {
return nil
}
type RTCICEPair struct {
type rtcICEPair struct {
// The remote ufrag, used for ICE username and session id.
RemoteICEUfrag string `json:"remote_ufrag"`
// The remote pwd, used for ICE password.
@ -482,18 +489,18 @@ type RTCICEPair struct {
}
// Generate the ICE ufrag for the WebRTC streaming, format is remote-ufrag:local-ufrag.
func (v *RTCICEPair) Ufrag() string {
func (v *rtcICEPair) Ufrag() string {
return fmt.Sprintf("%v:%v", v.LocalICEUfrag, v.RemoteICEUfrag)
}
type RTCStunPacket struct {
type rtcStunPacket struct {
// The stun message type.
MessageType uint16
// The stun username, or ufrag.
Username string
}
func (v *RTCStunPacket) UnmarshalBinary(data []byte) error {
func (v *rtcStunPacket) UnmarshalBinary(data []byte) error {
if len(data) < 20 {
return errors.Errorf("stun packet too short %v", len(data))
}

View File

@ -20,10 +20,15 @@ import (
"srsx/internal/version"
)
// srsRTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
// RTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
// server. It will figure out the backend server to proxy to. Unlike the edge server, it will
// not cache the stream, but just proxy the stream to backend.
type srsRTMPServer struct {
type RTMPServer interface {
Run(ctx context.Context) error
Close() error
}
type rtmpServer struct {
// The environment interface.
environment env.ProxyEnvironment
// The TCP listener for RTMP server.
@ -32,15 +37,15 @@ type srsRTMPServer struct {
wg sync.WaitGroup
}
func NewSRSRTMPServer(environment env.ProxyEnvironment, opts ...func(*srsRTMPServer)) *srsRTMPServer {
v := &srsRTMPServer{environment: environment}
func NewRTMPServer(environment env.ProxyEnvironment, opts ...func(*rtmpServer)) RTMPServer {
v := &rtmpServer{environment: environment}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *srsRTMPServer) Close() error {
func (v *rtmpServer) Close() error {
if v.listener != nil {
v.listener.Close()
}
@ -49,7 +54,7 @@ func (v *srsRTMPServer) Close() error {
return nil
}
func (v *srsRTMPServer) Run(ctx context.Context) error {
func (v *rtmpServer) Run(ctx context.Context) error {
endpoint := v.environment.RtmpServer()
if !strings.Contains(endpoint, ":") {
endpoint = ":" + endpoint
@ -97,7 +102,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
}
}
rc := NewRTMPConnection()
rc := newRTMPConnection()
if err := rc.serve(ctx, conn); err != nil {
handleErr(err)
} else {
@ -110,24 +115,24 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
return nil
}
// RTMPConnection is an RTMP streaming connection. There is no state need to be sync between
// rtmpConnection is an RTMP streaming connection. There is no state need to be sync between
// proxy servers.
//
// When we got an RTMP request, we will parse the stream URL from the RTMP publish or play request,
// then proxy to the corresponding backend server. All state is in the RTMP request, so this
// connection is stateless.
type RTMPConnection struct {
type rtmpConnection struct {
}
func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection {
v := &RTMPConnection{}
func newRTMPConnection(opts ...func(*rtmpConnection)) *rtmpConnection {
v := &rtmpConnection{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
func (v *rtmpConnection) serve(ctx context.Context, conn *net.TCPConn) error {
logger.Debug(ctx, "Got RTMP client from %v", conn.RemoteAddr())
// If any goroutine quit, cancel another one.
@ -135,7 +140,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var backend *RTMPClientToBackend
var backend *rtmpClientToBackend
if true {
go func() {
<-ctx.Done()
@ -289,7 +294,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
tcUrl, streamName, currentStreamID, clientType)
// Find a backend SRS server to proxy the RTMP stream.
backend = NewRTMPClientToBackend(func(client *RTMPClientToBackend) {
backend = newRTMPClientToBackend(func(client *rtmpClientToBackend) {
client.typ = clientType
})
defer backend.Close()
@ -416,8 +421,8 @@ const (
RTMPClientTypeViewer RTMPClientType = "viewer"
)
// RTMPClientToBackend is a RTMP client to proxy the RTMP stream to backend.
type RTMPClientToBackend struct {
// rtmpClientToBackend is an RTMP client to proxy the RTMP stream to backend.
type rtmpClientToBackend struct {
// The underlayer tcp client.
tcpConn *net.TCPConn
// The RTMP protocol client.
@ -426,22 +431,22 @@ type RTMPClientToBackend struct {
typ RTMPClientType
}
func NewRTMPClientToBackend(opts ...func(*RTMPClientToBackend)) *RTMPClientToBackend {
v := &RTMPClientToBackend{}
func newRTMPClientToBackend(opts ...func(*rtmpClientToBackend)) *rtmpClientToBackend {
v := &rtmpClientToBackend{}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *RTMPClientToBackend) Close() error {
func (v *rtmpClientToBackend) Close() error {
if v.tcpConn != nil {
v.tcpConn.Close()
}
return nil
}
func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName string) error {
func (v *rtmpClientToBackend) Connect(ctx context.Context, tcUrl, streamName string) error {
// Build the stream URL in vhost/app/stream schema.
streamURL, err := utils.BuildStreamURL(fmt.Sprintf("%v/%v", tcUrl, streamName))
if err != nil {
@ -527,7 +532,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str
return v.publish(ctx, client, streamName)
}
func (v *RTMPClientToBackend) publish(ctx context.Context, client rtmp.Protocol, streamName string) error {
func (v *rtmpClientToBackend) publish(ctx context.Context, client rtmp.Protocol, streamName string) error {
if true {
identifyReq := rtmp.NewCallPacket()
identifyReq.CommandName = "releaseStream"
@ -620,7 +625,7 @@ func (v *RTMPClientToBackend) publish(ctx context.Context, client rtmp.Protocol,
return nil
}
func (v *RTMPClientToBackend) play(ctx context.Context, client rtmp.Protocol, streamName string) error {
func (v *rtmpClientToBackend) play(ctx context.Context, client rtmp.Protocol, streamName string) error {
var currentStreamID int
if true {
createStream := rtmp.NewCreateStreamPacket()