diff --git a/internal/proxy/rtc.go b/internal/proxy/rtc.go index ae9bacfef..a02d2d3ac 100644 --- a/internal/proxy/rtc.go +++ b/internal/proxy/rtc.go @@ -302,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error { go func() { defer v.wg.Done() + // Reuse a single receive buffer across iterations. handleClientUDP and the + // downstream HandlePacket consume the slice synchronously (kernel sendto + // copies bytes; STUN parsing copies the username via string()), so no caller + // retains the slice past the call. + buf := make([]byte, 4096) for ctx.Err() == nil { - buf := make([]byte, 4096) n, addr, err := listener.ReadFrom(buf) if err != nil { // If context is canceled or connection is closed, exit gracefully without logging error. @@ -419,6 +423,11 @@ type rtcConnection struct { // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // UDP dial; tests may override via a functional option to supply a fake connection. dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) + + // Guards the spawn of the backend->client reader goroutine. HandlePacket is + // called on every inbound client packet (STUN keepalives + RTCP feedback at + // steady state) but the reader must only start once per connection. + startReader stdSync.Once } func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection { @@ -467,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error { return nil } - // Proxy all messages from backend to client. - go func() { - for ctx.Err() == nil { + // Spawn the backend->client reader exactly once per connection. Previously + // this goroutine was launched unconditionally here on every inbound client + // packet, which leaked tens of thousands of goroutines under steady-state + // WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across + // iterations: WriteTo copies into the kernel before returning, so the next + // Read can safely overwrite. + v.startReader.Do(func() { + go func() { buf := make([]byte, 4096) - n, err := v.backendUDP.Read(buf) - if err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Warn(ctx, "read from backend failed, err=%v", err) - break - } + for ctx.Err() == nil { + n, err := v.backendUDP.Read(buf) + if err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Warn(ctx, "read from backend failed, err=%v", err) + return + } - if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Warn(ctx, "write to client failed, err=%v", err) - break + if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Warn(ctx, "write to client failed, err=%v", err) + return + } } - } - }() + }() + }) if _, err := v.backendUDP.Write(data); err != nil { return errors.Wrapf(err, "write to backend %v", v.StreamURL) diff --git a/internal/proxy/srt.go b/internal/proxy/srt.go index 2ecb97696..4beee83eb 100644 --- a/internal/proxy/srt.go +++ b/internal/proxy/srt.go @@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error { go func() { defer v.wg.Done() + // Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary + // clones ExtraData, and backendUDP.Write copies into the kernel before returning, so + // no caller retains a reference to this slice past the call. + buf := make([]byte, 4096) for ctx.Err() == nil { - buf := make([]byte, 4096) n, caddr, err := v.listener.ReadFrom(buf) if err != nil { // If context is canceled or connection is closed, exit gracefully without logging error. @@ -196,6 +199,11 @@ type SRTConnection struct { handshake2 *SRTHandshakePacket handshake3 *SRTHandshakePacket + // Guards the spawn of the backend->client reader goroutine. Keeps the spawn + // idempotent if the client retries handshake 2 (e.g. because our handshake 3 + // was dropped) and re-enters the handshake path. + startReader stdSync.Once + // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // UDP dial; tests may override via a functional option to supply a fake connection. dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) @@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa return errors.Wrapf(err, "write handshake 3") } - // Start a goroutine to proxy message from backend to client. + // Start a goroutine to proxy message from backend to client. The Once guard + // keeps the spawn idempotent if the client retries handshake 2 (because our + // handshake 3 was lost); the existing reader keeps running and we don't want + // a second one racing it on backendUDP.Read. // TODO: FIXME: Support close the connection when timeout or client disconnected. - go func() { - for ctx.Err() == nil { - nn, err := v.backendUDP.Read(b) - if err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Warn(ctx, "read from backend failed, err=%v", err) - return + v.startReader.Do(func() { + go func() { + buf := make([]byte, 4096) + for ctx.Err() == nil { + nn, err := v.backendUDP.Read(buf) + if err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Warn(ctx, "read from backend failed, err=%v", err) + return + } + if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Warn(ctx, "write to client failed, err=%v", err) + return + } } - if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Warn(ctx, "write to client failed, err=%v", err) - return - } - } - }() + }() + }) return nil } @@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error { // Only support IPv4. v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48]) - v.ExtraData = b[64:] + // Clone so ExtraData owns its bytes independently of the caller's buffer. + // Without this, callers that reuse the receive buffer would silently corrupt + // any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2). + v.ExtraData = bytes.Clone(b[64:]) return nil }