Proxy: Fix RTC/SRT reader leak, legacy WHEP unwrap, WHEP perf guide. v8.0.1 (#4676)
- Fix a goroutine leak on the WHEP path: the backend→client reader was
being spawned on every inbound client packet (STUN keepalives + RTCP
feedback), leaking tens of thousands of goroutines under steady-state
load. Now spawned exactly once per connection via `sync.Once` on both
the RTC and SRT proxies. Listener and reader receive buffers are also
reused across iterations.
- Make the legacy SRS `/rtc/v1/play/` and `/rtc/v1/publish/` APIs work
end-to-end through the proxy. Those endpoints wrap the SDP in a JSON
envelope (`{"sdp":"v=0\r\n..."}` where `\r\n` is the literal 2-byte JSON
escape, not real CRLF), so ICE parsing previously absorbed the rest of
the body into the ufrag. Added `unwrapSDPEnvelope` for ICE extraction
and tightened `ParseIceUfragPwd`'s value class to stop at `\`. The bytes
forwarded to the client and the in-body candidate-port rewrite still
operate on the raw envelope.
- Enable `net/http/pprof` endpoints when `GO_PPROF` is set (blank import
in `internal/debug/pprof.go`) and add `docs/perf/proxy-whep.md` walking
through CPU/alloc/heap/goroutine/trace collection and `pprof -base`
before/after diffs for the WHEP workload (1 publisher + N players).
- Tighten `SRTHandshakePacket.UnmarshalBinary` to
`bytes.Clone(ExtraData)` so decoded handshakes kept on the connection
(`handshake0`, `handshake2`) stay valid once the receive buffer is
reused.
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
57d1062e91
commit
913b773282
149
docs/perf/proxy-whep.md
Normal file
149
docs/perf/proxy-whep.md
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
# How to Analyze WHEP Performance for the Proxy Server
|
||||||
|
|
||||||
|
This guide walks through profiling the Go proxy under a WHEP (WebRTC play) load.
|
||||||
|
The workload of interest is **one RTMP publisher + N WHEP players**, where N is
|
||||||
|
large enough to stress the proxy's UDP forwarding path (typically 300+).
|
||||||
|
|
||||||
|
When analyzing WHEP performance for the proxy, you should:
|
||||||
|
|
||||||
|
1. Set up the topology: proxy + SRS origin + publisher + WHEP load
|
||||||
|
2. Enable Go pprof on the proxy
|
||||||
|
3. Run the load and let it warm up
|
||||||
|
4. Collect CPU, allocation, heap, goroutine, and trace profiles
|
||||||
|
5. Read the profiles and identify hot spots
|
||||||
|
6. Save profiles to compare before and after a change
|
||||||
|
|
||||||
|
## Step 1: Build and Start the Proxy with pprof
|
||||||
|
|
||||||
|
The proxy reads `GO_PPROF` from the environment and, when set, exposes
|
||||||
|
`net/http/pprof` endpoints at that address. Use the same standard ports SRS
|
||||||
|
uses by default so the publisher and player commands stay unchanged.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs
|
||||||
|
make && env GO_PPROF=:6060 \
|
||||||
|
PROXY_RTMP_SERVER=1935 PROXY_HTTP_SERVER=8080 \
|
||||||
|
PROXY_HTTP_API=1985 PROXY_WEBRTC_SERVER=8000 PROXY_SRT_SERVER=10080 \
|
||||||
|
PROXY_SYSTEM_API=12025 PROXY_LOAD_BALANCER_TYPE=memory \
|
||||||
|
./bin/srs-proxy
|
||||||
|
```
|
||||||
|
|
||||||
|
> The pprof endpoints live under `http://localhost:6060/debug/pprof/`. The
|
||||||
|
> proxy registers them only because `internal/debug/pprof.go` blank-imports
|
||||||
|
> `net/http/pprof`. Without that import the endpoints return 404.
|
||||||
|
|
||||||
|
## Step 2: Start the SRS Origin on Alt Ports
|
||||||
|
|
||||||
|
`origin1-for-proxy.conf` runs SRS on non-standard ports (RTMP 19351, HTTP 8081,
|
||||||
|
API 19851, RTC 8001/udp, SRT 10081) so the proxy can sit on the defaults. SRS
|
||||||
|
auto-registers with the proxy's system API on startup.
|
||||||
|
|
||||||
|
Set `CANDIDATE` to a LAN-reachable IP so the SDP answer the proxy returns
|
||||||
|
points clients at an address they can route to. The proxy only rewrites the
|
||||||
|
candidate **port**; the IP comes from the origin's SDP.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ulimit -n 10000 && bash -c "cd ~/git/srs/trunk && \
|
||||||
|
CANDIDATE=192.168.3.187 ./objs/srs -c conf/origin1-for-proxy.conf"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 3: Run the WHEP Workload
|
||||||
|
|
||||||
|
In separate terminals, start the publisher and the WHEP load generator.
|
||||||
|
|
||||||
|
**Publisher (RTMP):**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs/trunk
|
||||||
|
ffmpeg -stream_loop -1 -re -i doc/source.200kbps.768x320.flv \
|
||||||
|
-c copy -f flv -y rtmp://localhost/live/livestream
|
||||||
|
```
|
||||||
|
|
||||||
|
**WHEP players (use the LAN IP that matches `CANDIDATE`):**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs/trunk/3rdparty/srs-bench
|
||||||
|
./objs/srs_bench -sr webrtc://192.168.3.187/live/livestream -nn 300
|
||||||
|
```
|
||||||
|
|
||||||
|
Let the workload run for at least 30 seconds before sampling. Connection
|
||||||
|
setup churn dominates the first few seconds and will skew profiles taken
|
||||||
|
too early.
|
||||||
|
|
||||||
|
> Sanity-check with `-nn 1` first. If a single WHEP session does not play,
|
||||||
|
> the 300-player run is testing something other than steady-state forwarding.
|
||||||
|
|
||||||
|
## Step 4: Collect Profiles
|
||||||
|
|
||||||
|
Profiles must be collected **while the workload is steady**, not before or
|
||||||
|
after. The CPU profile is the single most useful starting point.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# CPU profile (30s sample) — interactive web UI on :8123
|
||||||
|
# Use :8123 (or any free port) because :8080 is the proxy's HTTP-FLV/HLS port.
|
||||||
|
go tool pprof -http=:8123 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# Allocation profile — GC pressure / per-packet allocations
|
||||||
|
go tool pprof -http=:8124 http://localhost:6060/debug/pprof/allocs
|
||||||
|
|
||||||
|
# Heap (live memory snapshot)
|
||||||
|
go tool pprof -http=:8125 http://localhost:6060/debug/pprof/heap
|
||||||
|
|
||||||
|
# Goroutine count + stack dump — look for goroutine explosion under load
|
||||||
|
curl -s 'http://localhost:6060/debug/pprof/goroutine?debug=1' | head -50
|
||||||
|
|
||||||
|
# Runtime trace (10s) — GC pauses, scheduler latency, syscall behavior
|
||||||
|
curl -s -o trace.out 'http://localhost:6060/debug/pprof/trace?seconds=10'
|
||||||
|
go tool trace trace.out
|
||||||
|
```
|
||||||
|
|
||||||
|
The web UI requires Graphviz for the Flame Graph and Graph views:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
brew install graphviz # macOS
|
||||||
|
```
|
||||||
|
|
||||||
|
If you cannot install Graphviz, the **Top** view in the web UI is HTML-only
|
||||||
|
and works without it. The CLI form is also unaffected:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go tool pprof 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
(pprof) top20
|
||||||
|
(pprof) top20 -cum
|
||||||
|
(pprof) list <FunctionName>
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 5: Read the Profiles
|
||||||
|
|
||||||
|
Open the web UI and use the views in this order:
|
||||||
|
|
||||||
|
1. **Flame Graph** — visual hot path. Wide bars near the top are where time
|
||||||
|
is spent. For 300-player WHEP the path should be dominated by
|
||||||
|
`webRTCProxyServer.Run` and its UDP read/write children.
|
||||||
|
2. **Top** — sorted list by `flat` (self time) and `cum` (cumulative). The
|
||||||
|
top 5–10 functions usually tell the whole story.
|
||||||
|
3. **Graph** — call graph with edge weights. Good for tracing "who calls this
|
||||||
|
hot function".
|
||||||
|
4. **Source** — line-level cost inside a single function. Use after Top has
|
||||||
|
pointed you at a function worth dissecting.
|
||||||
|
|
||||||
|
## Step 6: Save Profiles for Before/After Comparison
|
||||||
|
|
||||||
|
When you change code to fix a hot spot, comparing profiles is the only
|
||||||
|
reliable way to confirm the fix moved the needle (and didn't just shift cost
|
||||||
|
elsewhere).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Save the raw profile from a baseline run
|
||||||
|
curl -s -o cpu-before.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# After the code change, sample again under the same workload
|
||||||
|
curl -s -o cpu-after.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# Diff the two
|
||||||
|
go tool pprof -http=:8123 -base cpu-before.pb.gz cpu-after.pb.gz
|
||||||
|
```
|
||||||
|
|
||||||
|
In the diff view, red bars are functions that got more expensive, green
|
||||||
|
bars are functions that got cheaper. The total should shrink overall if
|
||||||
|
the change is a net win.
|
||||||
|
|
@ -6,6 +6,7 @@ package debug
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
|
|
||||||
"srsx/internal/env"
|
"srsx/internal/env"
|
||||||
"srsx/internal/logger"
|
"srsx/internal/logger"
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package proxy
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
@ -241,13 +242,17 @@ func (v *webRTCProxyServer) proxyApiToBackend(
|
||||||
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
|
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the ice-ufrag and ice-pwd from local SDP answer.
|
// Fetch the ice-ufrag and ice-pwd from local SDP answer. The legacy SRS
|
||||||
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(remoteSDPOffer)
|
// /rtc/v1/play/ and /rtc/v1/publish/ APIs wrap the SDP in a JSON envelope
|
||||||
|
// like {"sdp":"v=0\r\n..."}, so unwrap it before parsing ICE attributes.
|
||||||
|
// The forwarded bytes and the in-body candidate port rewrite still operate
|
||||||
|
// on the raw envelope, which is what the client expects to see back.
|
||||||
|
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(remoteSDPOffer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "parse remote sdp offer")
|
return errors.Wrapf(err, "parse remote sdp offer")
|
||||||
}
|
}
|
||||||
|
|
||||||
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(localSDPAnswer)
|
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(localSDPAnswer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "parse local sdp answer")
|
return errors.Wrapf(err, "parse local sdp answer")
|
||||||
}
|
}
|
||||||
|
|
@ -297,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
defer v.wg.Done()
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
// Reuse a single receive buffer across iterations. handleClientUDP and the
|
||||||
|
// downstream HandlePacket consume the slice synchronously (kernel sendto
|
||||||
|
// copies bytes; STUN parsing copies the username via string()), so no caller
|
||||||
|
// retains the slice past the call.
|
||||||
|
buf := make([]byte, 4096)
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
buf := make([]byte, 4096)
|
|
||||||
n, addr, err := listener.ReadFrom(buf)
|
n, addr, err := listener.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||||
|
|
@ -414,6 +423,11 @@ type rtcConnection struct {
|
||||||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||||
|
|
||||||
|
// Guards the spawn of the backend->client reader goroutine. HandlePacket is
|
||||||
|
// called on every inbound client packet (STUN keepalives + RTCP feedback at
|
||||||
|
// steady state) but the reader must only start once per connection.
|
||||||
|
startReader stdSync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
|
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
|
||||||
|
|
@ -462,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy all messages from backend to client.
|
// Spawn the backend->client reader exactly once per connection. Previously
|
||||||
go func() {
|
// this goroutine was launched unconditionally here on every inbound client
|
||||||
for ctx.Err() == nil {
|
// packet, which leaked tens of thousands of goroutines under steady-state
|
||||||
|
// WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across
|
||||||
|
// iterations: WriteTo copies into the kernel before returning, so the next
|
||||||
|
// Read can safely overwrite.
|
||||||
|
v.startReader.Do(func() {
|
||||||
|
go func() {
|
||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
n, err := v.backendUDP.Read(buf)
|
for ctx.Err() == nil {
|
||||||
if err != nil {
|
n, err := v.backendUDP.Read(buf)
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
if err != nil {
|
||||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
break
|
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||||
break
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
}()
|
})
|
||||||
|
|
||||||
if _, err := v.backendUDP.Write(data); err != nil {
|
if _, err := v.backendUDP.Write(data); err != nil {
|
||||||
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
||||||
|
|
@ -520,6 +541,25 @@ func (v *rtcConnection) connectBackend(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// unwrapSDPEnvelope returns the SDP string carried inside the legacy SRS RTC
|
||||||
|
// JSON envelope used by /rtc/v1/play/ and /rtc/v1/publish/, e.g. body of the
|
||||||
|
// form {"sdp":"v=0\r\n...", ...}. For standards-based WHIP/WHEP bodies (raw
|
||||||
|
// SDP), or any input we can't recognise, the original body is returned
|
||||||
|
// unchanged so the caller can parse it as raw SDP.
|
||||||
|
func unwrapSDPEnvelope(body string) string {
|
||||||
|
trimmed := strings.TrimLeft(body, " \t\r\n")
|
||||||
|
if !strings.HasPrefix(trimmed, "{") {
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
var env struct {
|
||||||
|
SDP string `json:"sdp"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(trimmed), &env); err != nil || env.SDP == "" {
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
return env.SDP
|
||||||
|
}
|
||||||
|
|
||||||
type rtcICEPair struct {
|
type rtcICEPair struct {
|
||||||
// The remote ufrag, used for ICE username and session id.
|
// The remote ufrag, used for ICE username and session id.
|
||||||
RemoteICEUfrag string `json:"remote_ufrag"`
|
RemoteICEUfrag string `json:"remote_ufrag"`
|
||||||
|
|
|
||||||
|
|
@ -896,6 +896,95 @@ func TestWebRTCProxyServer_HandleApiForWHEP_HappyPath(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Legacy /rtc/v1/play/ (used by srs_bench) wraps the SDP in a JSON envelope
|
||||||
|
// like {"sdp":"v=0\r\n..."} where \r\n is the literal 2-byte JSON escape, not
|
||||||
|
// real CRLF. The proxy must unwrap the envelope before parsing ICE attributes;
|
||||||
|
// otherwise the stored ufrag is contaminated with the next attributes and the
|
||||||
|
// STUN binding from the client cannot be matched to the connection.
|
||||||
|
func TestWebRTCProxyServer_HandleApiForWHEP_LegacyJSONEnvelope(t *testing.T) {
|
||||||
|
f := newWebRTCFixture()
|
||||||
|
f.env.WebRTCServerReturns("19000")
|
||||||
|
|
||||||
|
const backendRTCPort = "18000"
|
||||||
|
answerJSON := `{"code":0,"sessionid":"sid","sdp":"v=0\r\na=ice-ufrag:local-ufrag\r\na=ice-pwd:local-pwd-very-long-value-32xxxx\r\na=candidate:1 1 udp 1 1.2.3.4 ` + backendRTCPort + ` typ host\r\n"}`
|
||||||
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = io.ReadAll(r.Body)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write([]byte(answerJSON))
|
||||||
|
}))
|
||||||
|
defer backend.Close()
|
||||||
|
|
||||||
|
f.server.backendURL = func(b *lb.OriginServer, r *http.Request) (string, error) {
|
||||||
|
return backend.URL + r.URL.Path, nil
|
||||||
|
}
|
||||||
|
f.lb.PickReturns(&lb.OriginServer{IP: "10.0.0.1", API: []string{"1985"}, RTC: []string{backendRTCPort}}, nil)
|
||||||
|
|
||||||
|
offerJSON := `{"api":"http://10.0.0.1:1985/rtc/v1/play/","clientip":"","sdp":"v=0\r\na=ice-ufrag:remote-ufrag\r\na=ice-pwd:remote-pwd-very-long-value-32xx\r\n","streamurl":"webrtc://example.com/live/demo"}`
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "http://example.com/rtc/v1/play/", strings.NewReader(offerJSON))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
if err := f.server.HandleApiForWHEP(context.Background(), rec, req); err != nil {
|
||||||
|
t.Fatalf("WHEP: %v", err)
|
||||||
|
}
|
||||||
|
if f.lb.StoreWebRTCCallCount() != 1 {
|
||||||
|
t.Fatalf("StoreWebRTC called %d times, want 1", f.lb.StoreWebRTCCallCount())
|
||||||
|
}
|
||||||
|
_, _, stored := f.lb.StoreWebRTCArgsForCall(0)
|
||||||
|
if got, want := stored.GetUfrag(), "local-ufrag:remote-ufrag"; got != want {
|
||||||
|
t.Fatalf("stored ufrag=%q, want %q", got, want)
|
||||||
|
}
|
||||||
|
// The response forwarded to the client should still be the JSON envelope
|
||||||
|
// with the backend port rewritten to the proxy's WebRTC port.
|
||||||
|
body := rec.Body.String()
|
||||||
|
if !strings.Contains(body, " 19000 typ host") {
|
||||||
|
t.Fatalf("answer did not rewrite backend port; got %q", body)
|
||||||
|
}
|
||||||
|
if strings.Contains(body, " "+backendRTCPort+" typ host") {
|
||||||
|
t.Fatalf("answer still contains original backend port; got %q", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnwrapSDPEnvelope(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
in string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "raw sdp passthrough",
|
||||||
|
in: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
want: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json envelope unwrapped",
|
||||||
|
in: `{"code":0,"sdp":"v=0\r\na=ice-ufrag:abc\r\n"}`,
|
||||||
|
want: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json envelope with leading whitespace",
|
||||||
|
in: "\n\t " + `{"sdp":"v=0\r\n"}`,
|
||||||
|
want: "v=0\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "malformed json falls back to body",
|
||||||
|
in: `{not json}`,
|
||||||
|
want: `{not json}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json without sdp falls back to body",
|
||||||
|
in: `{"code":0}`,
|
||||||
|
want: `{"code":0}`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
if got := unwrapSDPEnvelope(tc.in); got != tc.want {
|
||||||
|
t.Fatalf("unwrapSDPEnvelope(%q)=%q, want %q", tc.in, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// webRTCProxyServer.proxyApiToBackend: error paths
|
// webRTCProxyServer.proxyApiToBackend: error paths
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
defer v.wg.Done()
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
// Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary
|
||||||
|
// clones ExtraData, and backendUDP.Write copies into the kernel before returning, so
|
||||||
|
// no caller retains a reference to this slice past the call.
|
||||||
|
buf := make([]byte, 4096)
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
buf := make([]byte, 4096)
|
|
||||||
n, caddr, err := v.listener.ReadFrom(buf)
|
n, caddr, err := v.listener.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||||
|
|
@ -196,6 +199,11 @@ type SRTConnection struct {
|
||||||
handshake2 *SRTHandshakePacket
|
handshake2 *SRTHandshakePacket
|
||||||
handshake3 *SRTHandshakePacket
|
handshake3 *SRTHandshakePacket
|
||||||
|
|
||||||
|
// Guards the spawn of the backend->client reader goroutine. Keeps the spawn
|
||||||
|
// idempotent if the client retries handshake 2 (e.g. because our handshake 3
|
||||||
|
// was dropped) and re-enters the handshake path.
|
||||||
|
startReader stdSync.Once
|
||||||
|
|
||||||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||||
|
|
@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
|
||||||
return errors.Wrapf(err, "write handshake 3")
|
return errors.Wrapf(err, "write handshake 3")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a goroutine to proxy message from backend to client.
|
// Start a goroutine to proxy message from backend to client. The Once guard
|
||||||
|
// keeps the spawn idempotent if the client retries handshake 2 (because our
|
||||||
|
// handshake 3 was lost); the existing reader keeps running and we don't want
|
||||||
|
// a second one racing it on backendUDP.Read.
|
||||||
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
||||||
go func() {
|
v.startReader.Do(func() {
|
||||||
for ctx.Err() == nil {
|
go func() {
|
||||||
nn, err := v.backendUDP.Read(b)
|
buf := make([]byte, 4096)
|
||||||
if err != nil {
|
for ctx.Err() == nil {
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
nn, err := v.backendUDP.Read(buf)
|
||||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
if err != nil {
|
||||||
return
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
|
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil {
|
||||||
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
|
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil {
|
}()
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
})
|
||||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
|
||||||
// Only support IPv4.
|
// Only support IPv4.
|
||||||
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
||||||
|
|
||||||
v.ExtraData = b[64:]
|
// Clone so ExtraData owns its bytes independently of the caller's buffer.
|
||||||
|
// Without this, callers that reuse the receive buffer would silently corrupt
|
||||||
|
// any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2).
|
||||||
|
v.ExtraData = bytes.Clone(b[64:])
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -211,10 +211,13 @@ func SrtParseSocketID(data []byte) uint32 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP.
|
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. The value class
|
||||||
|
// stops at any whitespace (real CRLF in raw SDP) or at a backslash, so the parser
|
||||||
|
// is also safe against JSON-escaped SDP bodies where line breaks appear as the
|
||||||
|
// 2-byte sequence "\r" / "\n" rather than real control characters.
|
||||||
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
||||||
if true {
|
if true {
|
||||||
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s]+)`)
|
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s\\]+)`)
|
||||||
ufragMatch := ufragRe.FindStringSubmatch(sdp)
|
ufragMatch := ufragRe.FindStringSubmatch(sdp)
|
||||||
if len(ufragMatch) <= 1 {
|
if len(ufragMatch) <= 1 {
|
||||||
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
|
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
|
||||||
|
|
@ -223,7 +226,7 @@ func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if true {
|
if true {
|
||||||
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s]+)`)
|
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s\\]+)`)
|
||||||
pwdMatch := pwdRe.FindStringSubmatch(sdp)
|
pwdMatch := pwdRe.FindStringSubmatch(sdp)
|
||||||
if len(pwdMatch) <= 1 {
|
if len(pwdMatch) <= 1 {
|
||||||
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)
|
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)
|
||||||
|
|
|
||||||
|
|
@ -338,6 +338,24 @@ func TestParseIceUfragPwd_MissingPwd(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SDP embedded in the legacy /rtc/v1/play/ JSON envelope arrives with "\r\n" as
|
||||||
|
// the literal 2-byte sequence (backslash + r/n), not real CRLF. The value
|
||||||
|
// charset must stop at the backslash, otherwise the ufrag would absorb the rest
|
||||||
|
// of the SDP up to the next real whitespace.
|
||||||
|
func TestParseIceUfragPwd_JSONEscapedSDP(t *testing.T) {
|
||||||
|
sdp := `v=0\r\na=ice-ufrag:1f1n4272\r\na=ice-pwd:5f6y69408x2h55232i080mj894901b8n\r\na=fingerprint:sha-256 2D:1D\r\n`
|
||||||
|
ufrag, pwd, err := ParseIceUfragPwd(sdp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err = %v", err)
|
||||||
|
}
|
||||||
|
if ufrag != "1f1n4272" {
|
||||||
|
t.Fatalf("ufrag=%q, want 1f1n4272", ufrag)
|
||||||
|
}
|
||||||
|
if pwd != "5f6y69408x2h55232i080mj894901b8n" {
|
||||||
|
t.Fatalf("pwd=%q, want 5f6y69408x2h55232i080mj894901b8n", pwd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseSRTStreamID_WithHost(t *testing.T) {
|
func TestParseSRTStreamID_WithHost(t *testing.T) {
|
||||||
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
|
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ func VersionMinor() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func VersionRevision() int {
|
func VersionRevision() int {
|
||||||
return 0
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func Version() string {
|
func Version() string {
|
||||||
|
|
|
||||||
|
|
@ -303,6 +303,9 @@ The knowledge base (`memory/srs-*.md`) captures William's knowledge about SRS
|
||||||
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
|
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
|
||||||
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
|
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
|
||||||
|
|
||||||
|
**Next-Generation Server Performance Docs** (`docs/perf/`) — Performance analysis guides for the Go server:
|
||||||
|
- `proxy-whep.md` — WHEP perf analysis: enable GO_PPROF, run publisher + N WHEP players via srs_bench, collect CPU/alloc/heap/goroutine/trace profiles, read hot spots, diff before/after with `pprof -base`
|
||||||
|
|
||||||
**Next-Generation Server API Examples** — Executable API documentation:
|
**Next-Generation Server API Examples** — Executable API documentation:
|
||||||
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow
|
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -77,13 +77,13 @@ Do NOT attempt unsupported tasks.
|
||||||
1. Ask the user for the PR number if they haven't given it.
|
1. Ask the user for the PR number if they haven't given it.
|
||||||
2. Bump revision by one in **both** version files, keeping them in sync:
|
2. Bump revision by one in **both** version files, keeping them in sync:
|
||||||
- `internal/version/version.go` — `VersionRevision()`
|
- `internal/version/version.go` — `VersionRevision()`
|
||||||
- `trunk/src/core/srs_core_version7.hpp` — `VERSION_REVISION`
|
- `trunk/src/core/srs_core_version8.hpp` — `VERSION_REVISION`
|
||||||
3. Add a new top entry to `trunk/doc/CHANGELOG.md` under `## SRS 7.0 Changelog`, matching the existing format:
|
3. Add a new top entry to `trunk/doc/CHANGELOG.md` under `## SRS 8.0 Changelog`, matching the existing format:
|
||||||
```
|
```
|
||||||
* v7.0, YYYY-MM-DD, Merge [#PR](URL): <Prefix>: <one-line summary>. v7.0.<rev> (#PR)
|
* v8.0, YYYY-MM-DD, Merge [#PR](URL): <Prefix>: <one-line summary>. v8.0.<rev> (#PR)
|
||||||
```
|
```
|
||||||
Propose the summary to the user; don't invent one unilaterally.
|
Propose the summary to the user; don't invent one unilaterally.
|
||||||
4. Stop. Let the user review. When they `git add` the version files and changelog, commit with a short message like `Proxy: Bump to v7.0.<rev> for #<PR>.`.
|
4. Stop. Let the user review. When they `git add` the version files and changelog, commit with a short message like `Proxy: Bump to v8.0.<rev> for #<PR>.`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ The changelog for SRS.
|
||||||
<a name="v8-changes"></a>
|
<a name="v8-changes"></a>
|
||||||
|
|
||||||
## SRS 8.0 Changelog
|
## SRS 8.0 Changelog
|
||||||
|
* v8.0, 2026-05-17, Merge [#4676](https://github.com/ossrs/srs/pull/4676): Proxy: Fix RTC/SRT reader goroutine leak; unwrap legacy WHEP JSON envelope; add WHEP pprof guide. v8.0.1 (#4676)
|
||||||
* v8.0, 2026-05-17, Init SRS 8.0, code Free. v8.0.0
|
* v8.0, 2026-05-17, Init SRS 8.0, code Free. v8.0.0
|
||||||
|
|
||||||
<a name="v7-changes"></a>
|
<a name="v7-changes"></a>
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,6 @@
|
||||||
|
|
||||||
#define VERSION_MAJOR 8
|
#define VERSION_MAJOR 8
|
||||||
#define VERSION_MINOR 0
|
#define VERSION_MINOR 0
|
||||||
#define VERSION_REVISION 0
|
#define VERSION_REVISION 1
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user