Proxy: Fix RTC/SRT reader goroutine leak; reuse UDP receive buffers.
In rtcConnection.HandlePacket the backend->client reader goroutine was spawned unconditionally on every inbound client packet, which leaked goroutines under steady-state WHEP load (STUN keepalives + RTCP feedback). Guard the spawn with sync.Once so it runs exactly once per connection. In SRTConnection.handleHandshake apply the same sync.Once guard so a client retry of handshake 2 (because our handshake 3 was dropped) does not race a second reader on backendUDP.Read. In both Run loops, hoist the 4096-byte receive buffer out of the read loop; callers consume the slice synchronously and the kernel copies on Write/sendto, so no caller retains the slice past the call. To keep this safe for SRT, clone ExtraData in SRTHandshakePacket.UnmarshalBinary so decoded handshakes do not alias the reused receive buffer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a217ff9a4e
commit
fbcc43cdb0
|
|
@ -302,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error {
|
|||
go func() {
|
||||
defer v.wg.Done()
|
||||
|
||||
// Reuse a single receive buffer across iterations. handleClientUDP and the
|
||||
// downstream HandlePacket consume the slice synchronously (kernel sendto
|
||||
// copies bytes; STUN parsing copies the username via string()), so no caller
|
||||
// retains the slice past the call.
|
||||
buf := make([]byte, 4096)
|
||||
for ctx.Err() == nil {
|
||||
buf := make([]byte, 4096)
|
||||
n, addr, err := listener.ReadFrom(buf)
|
||||
if err != nil {
|
||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||
|
|
@ -419,6 +423,11 @@ type rtcConnection struct {
|
|||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||
|
||||
// Guards the spawn of the backend->client reader goroutine. HandlePacket is
|
||||
// called on every inbound client packet (STUN keepalives + RTCP feedback at
|
||||
// steady state) but the reader must only start once per connection.
|
||||
startReader stdSync.Once
|
||||
}
|
||||
|
||||
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
|
||||
|
|
@ -467,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Proxy all messages from backend to client.
|
||||
go func() {
|
||||
for ctx.Err() == nil {
|
||||
// Spawn the backend->client reader exactly once per connection. Previously
|
||||
// this goroutine was launched unconditionally here on every inbound client
|
||||
// packet, which leaked tens of thousands of goroutines under steady-state
|
||||
// WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across
|
||||
// iterations: WriteTo copies into the kernel before returning, so the next
|
||||
// Read can safely overwrite.
|
||||
v.startReader.Do(func() {
|
||||
go func() {
|
||||
buf := make([]byte, 4096)
|
||||
n, err := v.backendUDP.Read(buf)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||
break
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
n, err := v.backendUDP.Read(buf)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||
break
|
||||
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
})
|
||||
|
||||
if _, err := v.backendUDP.Write(data); err != nil {
|
||||
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
||||
|
|
|
|||
|
|
@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error {
|
|||
go func() {
|
||||
defer v.wg.Done()
|
||||
|
||||
// Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary
|
||||
// clones ExtraData, and backendUDP.Write copies into the kernel before returning, so
|
||||
// no caller retains a reference to this slice past the call.
|
||||
buf := make([]byte, 4096)
|
||||
for ctx.Err() == nil {
|
||||
buf := make([]byte, 4096)
|
||||
n, caddr, err := v.listener.ReadFrom(buf)
|
||||
if err != nil {
|
||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||
|
|
@ -196,6 +199,11 @@ type SRTConnection struct {
|
|||
handshake2 *SRTHandshakePacket
|
||||
handshake3 *SRTHandshakePacket
|
||||
|
||||
// Guards the spawn of the backend->client reader goroutine. Keeps the spawn
|
||||
// idempotent if the client retries handshake 2 (e.g. because our handshake 3
|
||||
// was dropped) and re-enters the handshake path.
|
||||
startReader stdSync.Once
|
||||
|
||||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||
|
|
@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
|
|||
return errors.Wrapf(err, "write handshake 3")
|
||||
}
|
||||
|
||||
// Start a goroutine to proxy message from backend to client.
|
||||
// Start a goroutine to proxy message from backend to client. The Once guard
|
||||
// keeps the spawn idempotent if the client retries handshake 2 (because our
|
||||
// handshake 3 was lost); the existing reader keeps running and we don't want
|
||||
// a second one racing it on backendUDP.Read.
|
||||
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
||||
go func() {
|
||||
for ctx.Err() == nil {
|
||||
nn, err := v.backendUDP.Read(b)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||
return
|
||||
v.startReader.Do(func() {
|
||||
go func() {
|
||||
buf := make([]byte, 4096)
|
||||
for ctx.Err() == nil {
|
||||
nn, err := v.backendUDP.Read(buf)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||
return
|
||||
}
|
||||
if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
|
|||
// Only support IPv4.
|
||||
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
||||
|
||||
v.ExtraData = b[64:]
|
||||
// Clone so ExtraData owns its bytes independently of the caller's buffer.
|
||||
// Without this, callers that reuse the receive buffer would silently corrupt
|
||||
// any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2).
|
||||
v.ExtraData = bytes.Clone(b[64:])
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user