Compare commits

...

2 Commits

Author SHA1 Message Date
winlin
133f66afba Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v7.0.150 (#4678)
Cherry-pick of the v8.0.2 fix to the 7.0 release line. Two edge-cluster
regressions surface when validating an RTMP origin/edge setup:

- **HTTP-FLV play on edge always 404'd.**
`SrsHttpStreamServer::assemble()` registered the dynamic matcher only
when the mux cast was `NULL` (inverted guard), so the matcher was never
wired up. On edge the FLV mount is created lazily by the dynamic
matcher, so every HTTP-FLV client got 404. Invert the guard to register
when the mux is valid, mirroring the destructor.
(`trunk/src/app/srs_app_http_stream.cpp`)

- **RTMP players that join an edge stream after the first player fail to
decode.** After v7.0.94 (#4513) stopped creating `SrsOriginHub` on edge,
the `hub_active` gate in `SrsLiveSource::consumer_dumps()` always
evaluated false on edge. That gate guards the dump of cached
`onMetaData` + AVC sequence header + AAC sequence header + GOP cache to
a new consumer. Result: the first player attaches before the edge-pull
starts and gets headers via the live fan-out, but every subsequent
player gets coded payload with no codec config and ffmpeg aborts with
`dimensions not set` / `Could not write header`. Fall back to the meta
cache state when `hub_` is `NULL`, so the dump path runs once the
edge-pull has populated the cache.
(`trunk/src/app/srs_app_rtmp_source.cpp`)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 11:20:25 -04:00
Winlin
386a3768df Proxy: Fix RTC/SRT reader leak, legacy WHEP unwrap, WHEP perf guide. v7.0.149 (#4676)
- Fix a goroutine leak on the WHEP path: the backend→client reader was
being spawned on every inbound client packet (STUN keepalives + RTCP
feedback), leaking tens of thousands of goroutines under steady-state
load. Now spawned exactly once per connection via `sync.Once` on both
the RTC and SRT proxies. Listener and reader receive buffers are also
reused across iterations.
- Make the legacy SRS `/rtc/v1/play/` and `/rtc/v1/publish/` APIs work
end-to-end through the proxy. Those endpoints wrap the SDP in a JSON
envelope (`{"sdp":"v=0\r\n..."}` where `\r\n` is the literal 2-byte JSON
escape, not real CRLF), so ICE parsing previously absorbed the rest of
the body into the ufrag. Added `unwrapSDPEnvelope` for ICE extraction
and tightened `ParseIceUfragPwd`'s value class to stop at `\`. The bytes
forwarded to the client and the in-body candidate-port rewrite still
operate on the raw envelope.
- Enable `net/http/pprof` endpoints when `GO_PPROF` is set (blank import
in `internal/debug/pprof.go`) and add `docs/perf/proxy-whep.md` walking
through CPU/alloc/heap/goroutine/trace collection and `pprof -base`
before/after diffs for the WHEP workload (1 publisher + N players).
- Tighten `SRTHandshakePacket.UnmarshalBinary` to
`bytes.Clone(ExtraData)` so decoded handshakes kept on the connection
(`handshake0`, `handshake2`) stay valid once the receive buffer is
reused.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 21:18:35 -04:00
17 changed files with 772 additions and 43 deletions

149
docs/perf/proxy-whep.md Normal file
View File

@ -0,0 +1,149 @@
# How to Analyze WHEP Performance for the Proxy Server
This guide walks through profiling the Go proxy under a WHEP (WebRTC play) load.
The workload of interest is **one RTMP publisher + N WHEP players**, where N is
large enough to stress the proxy's UDP forwarding path (typically 300+).
When analyzing WHEP performance for the proxy, you should:
1. Set up the topology: proxy + SRS origin + publisher + WHEP load
2. Enable Go pprof on the proxy
3. Run the load and let it warm up
4. Collect CPU, allocation, heap, goroutine, and trace profiles
5. Read the profiles and identify hot spots
6. Save profiles to compare before and after a change
## Step 1: Build and Start the Proxy with pprof
The proxy reads `GO_PPROF` from the environment and, when set, exposes
`net/http/pprof` endpoints at that address. Use the same standard ports SRS
uses by default so the publisher and player commands stay unchanged.
```bash
cd ~/git/srs
make && env GO_PPROF=:6060 \
PROXY_RTMP_SERVER=1935 PROXY_HTTP_SERVER=8080 \
PROXY_HTTP_API=1985 PROXY_WEBRTC_SERVER=8000 PROXY_SRT_SERVER=10080 \
PROXY_SYSTEM_API=12025 PROXY_LOAD_BALANCER_TYPE=memory \
./bin/srs-proxy
```
> The pprof endpoints live under `http://localhost:6060/debug/pprof/`. The
> proxy registers them only because `internal/debug/pprof.go` blank-imports
> `net/http/pprof`. Without that import the endpoints return 404.
## Step 2: Start the SRS Origin on Alt Ports
`origin1-for-proxy.conf` runs SRS on non-standard ports (RTMP 19351, HTTP 8081,
API 19851, RTC 8001/udp, SRT 10081) so the proxy can sit on the defaults. SRS
auto-registers with the proxy's system API on startup.
Set `CANDIDATE` to a LAN-reachable IP so the SDP answer the proxy returns
points clients at an address they can route to. The proxy only rewrites the
candidate **port**; the IP comes from the origin's SDP.
```bash
ulimit -n 10000 && bash -c "cd ~/git/srs/trunk && \
CANDIDATE=192.168.3.187 ./objs/srs -c conf/origin1-for-proxy.conf"
```
## Step 3: Run the WHEP Workload
In separate terminals, start the publisher and the WHEP load generator.
**Publisher (RTMP):**
```bash
cd ~/git/srs/trunk
ffmpeg -stream_loop -1 -re -i doc/source.200kbps.768x320.flv \
-c copy -f flv -y rtmp://localhost/live/livestream
```
**WHEP players (use the LAN IP that matches `CANDIDATE`):**
```bash
cd ~/git/srs/trunk/3rdparty/srs-bench
./objs/srs_bench -sr webrtc://192.168.3.187/live/livestream -nn 300
```
Let the workload run for at least 30 seconds before sampling. Connection
setup churn dominates the first few seconds and will skew profiles taken
too early.
> Sanity-check with `-nn 1` first. If a single WHEP session does not play,
> the 300-player run is testing something other than steady-state forwarding.
## Step 4: Collect Profiles
Profiles must be collected **while the workload is steady**, not before or
after. The CPU profile is the single most useful starting point.
```bash
# CPU profile (30s sample) — interactive web UI on :8123
# Use :8123 (or any free port) because :8080 is the proxy's HTTP-FLV/HLS port.
go tool pprof -http=:8123 'http://localhost:6060/debug/pprof/profile?seconds=30'
# Allocation profile — GC pressure / per-packet allocations
go tool pprof -http=:8124 http://localhost:6060/debug/pprof/allocs
# Heap (live memory snapshot)
go tool pprof -http=:8125 http://localhost:6060/debug/pprof/heap
# Goroutine count + stack dump — look for goroutine explosion under load
curl -s 'http://localhost:6060/debug/pprof/goroutine?debug=1' | head -50
# Runtime trace (10s) — GC pauses, scheduler latency, syscall behavior
curl -s -o trace.out 'http://localhost:6060/debug/pprof/trace?seconds=10'
go tool trace trace.out
```
The web UI requires Graphviz for the Flame Graph and Graph views:
```bash
brew install graphviz # macOS
```
If you cannot install Graphviz, the **Top** view in the web UI is HTML-only
and works without it. The CLI form is also unaffected:
```bash
go tool pprof 'http://localhost:6060/debug/pprof/profile?seconds=30'
(pprof) top20
(pprof) top20 -cum
(pprof) list <FunctionName>
```
## Step 5: Read the Profiles
Open the web UI and use the views in this order:
1. **Flame Graph** — visual hot path. Wide bars near the top are where time
is spent. For 300-player WHEP the path should be dominated by
`webRTCProxyServer.Run` and its UDP read/write children.
2. **Top** — sorted list by `flat` (self time) and `cum` (cumulative). The
top 510 functions usually tell the whole story.
3. **Graph** — call graph with edge weights. Good for tracing "who calls this
hot function".
4. **Source** — line-level cost inside a single function. Use after Top has
pointed you at a function worth dissecting.
## Step 6: Save Profiles for Before/After Comparison
When you change code to fix a hot spot, comparing profiles is the only
reliable way to confirm the fix moved the needle (and didn't just shift cost
elsewhere).
```bash
# Save the raw profile from a baseline run
curl -s -o cpu-before.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
# After the code change, sample again under the same workload
curl -s -o cpu-after.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
# Diff the two
go tool pprof -http=:8123 -base cpu-before.pb.gz cpu-after.pb.gz
```
In the diff view, red bars are functions that got more expensive, green
bars are functions that got cheaper. The total should shrink overall if
the change is a net win.

View File

@ -6,6 +6,7 @@ package debug
import ( import (
"context" "context"
"net/http" "net/http"
_ "net/http/pprof"
"srsx/internal/env" "srsx/internal/env"
"srsx/internal/logger" "srsx/internal/logger"

View File

@ -6,6 +6,7 @@ package proxy
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -241,13 +242,17 @@ func (v *webRTCProxyServer) proxyApiToBackend(
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1) localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
} }
// Fetch the ice-ufrag and ice-pwd from local SDP answer. // Fetch the ice-ufrag and ice-pwd from local SDP answer. The legacy SRS
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(remoteSDPOffer) // /rtc/v1/play/ and /rtc/v1/publish/ APIs wrap the SDP in a JSON envelope
// like {"sdp":"v=0\r\n..."}, so unwrap it before parsing ICE attributes.
// The forwarded bytes and the in-body candidate port rewrite still operate
// on the raw envelope, which is what the client expects to see back.
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(remoteSDPOffer))
if err != nil { if err != nil {
return errors.Wrapf(err, "parse remote sdp offer") return errors.Wrapf(err, "parse remote sdp offer")
} }
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(localSDPAnswer) localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(localSDPAnswer))
if err != nil { if err != nil {
return errors.Wrapf(err, "parse local sdp answer") return errors.Wrapf(err, "parse local sdp answer")
} }
@ -297,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error {
go func() { go func() {
defer v.wg.Done() defer v.wg.Done()
for ctx.Err() == nil { // Reuse a single receive buffer across iterations. handleClientUDP and the
// downstream HandlePacket consume the slice synchronously (kernel sendto
// copies bytes; STUN parsing copies the username via string()), so no caller
// retains the slice past the call.
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, addr, err := listener.ReadFrom(buf) n, addr, err := listener.ReadFrom(buf)
if err != nil { if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error. // If context is canceled or connection is closed, exit gracefully without logging error.
@ -414,6 +423,11 @@ type rtcConnection struct {
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
// UDP dial; tests may override via a functional option to supply a fake connection. // UDP dial; tests may override via a functional option to supply a fake connection.
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
// Guards the spawn of the backend->client reader goroutine. HandlePacket is
// called on every inbound client packet (STUN keepalives + RTCP feedback at
// steady state) but the reader must only start once per connection.
startReader stdSync.Once
} }
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection { func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
@ -462,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error {
return nil return nil
} }
// Proxy all messages from backend to client. // Spawn the backend->client reader exactly once per connection. Previously
// this goroutine was launched unconditionally here on every inbound client
// packet, which leaked tens of thousands of goroutines under steady-state
// WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across
// iterations: WriteTo copies into the kernel before returning, so the next
// Read can safely overwrite.
v.startReader.Do(func() {
go func() { go func() {
for ctx.Err() == nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, err := v.backendUDP.Read(buf) n, err := v.backendUDP.Read(buf)
if err != nil { if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "read from backend failed, err=%v", err) logger.Warn(ctx, "read from backend failed, err=%v", err)
break return
} }
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil { if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "write to client failed, err=%v", err) logger.Warn(ctx, "write to client failed, err=%v", err)
break return
} }
} }
}() }()
})
if _, err := v.backendUDP.Write(data); err != nil { if _, err := v.backendUDP.Write(data); err != nil {
return errors.Wrapf(err, "write to backend %v", v.StreamURL) return errors.Wrapf(err, "write to backend %v", v.StreamURL)
@ -520,6 +541,25 @@ func (v *rtcConnection) connectBackend(ctx context.Context) error {
return nil return nil
} }
// unwrapSDPEnvelope returns the SDP string carried inside the legacy SRS RTC
// JSON envelope used by /rtc/v1/play/ and /rtc/v1/publish/, e.g. body of the
// form {"sdp":"v=0\r\n...", ...}. For standards-based WHIP/WHEP bodies (raw
// SDP), or any input we can't recognise, the original body is returned
// unchanged so the caller can parse it as raw SDP.
func unwrapSDPEnvelope(body string) string {
trimmed := strings.TrimLeft(body, " \t\r\n")
if !strings.HasPrefix(trimmed, "{") {
return body
}
var env struct {
SDP string `json:"sdp"`
}
if err := json.Unmarshal([]byte(trimmed), &env); err != nil || env.SDP == "" {
return body
}
return env.SDP
}
type rtcICEPair struct { type rtcICEPair struct {
// The remote ufrag, used for ICE username and session id. // The remote ufrag, used for ICE username and session id.
RemoteICEUfrag string `json:"remote_ufrag"` RemoteICEUfrag string `json:"remote_ufrag"`

View File

@ -896,6 +896,95 @@ func TestWebRTCProxyServer_HandleApiForWHEP_HappyPath(t *testing.T) {
} }
} }
// Legacy /rtc/v1/play/ (used by srs_bench) wraps the SDP in a JSON envelope
// like {"sdp":"v=0\r\n..."} where \r\n is the literal 2-byte JSON escape, not
// real CRLF. The proxy must unwrap the envelope before parsing ICE attributes;
// otherwise the stored ufrag is contaminated with the next attributes and the
// STUN binding from the client cannot be matched to the connection.
func TestWebRTCProxyServer_HandleApiForWHEP_LegacyJSONEnvelope(t *testing.T) {
f := newWebRTCFixture()
f.env.WebRTCServerReturns("19000")
const backendRTCPort = "18000"
answerJSON := `{"code":0,"sessionid":"sid","sdp":"v=0\r\na=ice-ufrag:local-ufrag\r\na=ice-pwd:local-pwd-very-long-value-32xxxx\r\na=candidate:1 1 udp 1 1.2.3.4 ` + backendRTCPort + ` typ host\r\n"}`
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(answerJSON))
}))
defer backend.Close()
f.server.backendURL = func(b *lb.OriginServer, r *http.Request) (string, error) {
return backend.URL + r.URL.Path, nil
}
f.lb.PickReturns(&lb.OriginServer{IP: "10.0.0.1", API: []string{"1985"}, RTC: []string{backendRTCPort}}, nil)
offerJSON := `{"api":"http://10.0.0.1:1985/rtc/v1/play/","clientip":"","sdp":"v=0\r\na=ice-ufrag:remote-ufrag\r\na=ice-pwd:remote-pwd-very-long-value-32xx\r\n","streamurl":"webrtc://example.com/live/demo"}`
req := httptest.NewRequest(http.MethodPost, "http://example.com/rtc/v1/play/", strings.NewReader(offerJSON))
rec := httptest.NewRecorder()
if err := f.server.HandleApiForWHEP(context.Background(), rec, req); err != nil {
t.Fatalf("WHEP: %v", err)
}
if f.lb.StoreWebRTCCallCount() != 1 {
t.Fatalf("StoreWebRTC called %d times, want 1", f.lb.StoreWebRTCCallCount())
}
_, _, stored := f.lb.StoreWebRTCArgsForCall(0)
if got, want := stored.GetUfrag(), "local-ufrag:remote-ufrag"; got != want {
t.Fatalf("stored ufrag=%q, want %q", got, want)
}
// The response forwarded to the client should still be the JSON envelope
// with the backend port rewritten to the proxy's WebRTC port.
body := rec.Body.String()
if !strings.Contains(body, " 19000 typ host") {
t.Fatalf("answer did not rewrite backend port; got %q", body)
}
if strings.Contains(body, " "+backendRTCPort+" typ host") {
t.Fatalf("answer still contains original backend port; got %q", body)
}
}
func TestUnwrapSDPEnvelope(t *testing.T) {
cases := []struct {
name string
in string
want string
}{
{
name: "raw sdp passthrough",
in: "v=0\r\na=ice-ufrag:abc\r\n",
want: "v=0\r\na=ice-ufrag:abc\r\n",
},
{
name: "json envelope unwrapped",
in: `{"code":0,"sdp":"v=0\r\na=ice-ufrag:abc\r\n"}`,
want: "v=0\r\na=ice-ufrag:abc\r\n",
},
{
name: "json envelope with leading whitespace",
in: "\n\t " + `{"sdp":"v=0\r\n"}`,
want: "v=0\r\n",
},
{
name: "malformed json falls back to body",
in: `{not json}`,
want: `{not json}`,
},
{
name: "json without sdp falls back to body",
in: `{"code":0}`,
want: `{"code":0}`,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := unwrapSDPEnvelope(tc.in); got != tc.want {
t.Fatalf("unwrapSDPEnvelope(%q)=%q, want %q", tc.in, got, tc.want)
}
})
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// webRTCProxyServer.proxyApiToBackend: error paths // webRTCProxyServer.proxyApiToBackend: error paths
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error {
go func() { go func() {
defer v.wg.Done() defer v.wg.Done()
for ctx.Err() == nil { // Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary
// clones ExtraData, and backendUDP.Write copies into the kernel before returning, so
// no caller retains a reference to this slice past the call.
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, caddr, err := v.listener.ReadFrom(buf) n, caddr, err := v.listener.ReadFrom(buf)
if err != nil { if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error. // If context is canceled or connection is closed, exit gracefully without logging error.
@ -196,6 +199,11 @@ type SRTConnection struct {
handshake2 *SRTHandshakePacket handshake2 *SRTHandshakePacket
handshake3 *SRTHandshakePacket handshake3 *SRTHandshakePacket
// Guards the spawn of the backend->client reader goroutine. Keeps the spawn
// idempotent if the client retries handshake 2 (e.g. because our handshake 3
// was dropped) and re-enters the handshake path.
startReader stdSync.Once
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
// UDP dial; tests may override via a functional option to supply a fake connection. // UDP dial; tests may override via a functional option to supply a fake connection.
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
return errors.Wrapf(err, "write handshake 3") return errors.Wrapf(err, "write handshake 3")
} }
// Start a goroutine to proxy message from backend to client. // Start a goroutine to proxy message from backend to client. The Once guard
// keeps the spawn idempotent if the client retries handshake 2 (because our
// handshake 3 was lost); the existing reader keeps running and we don't want
// a second one racing it on backendUDP.Read.
// TODO: FIXME: Support close the connection when timeout or client disconnected. // TODO: FIXME: Support close the connection when timeout or client disconnected.
v.startReader.Do(func() {
go func() { go func() {
buf := make([]byte, 4096)
for ctx.Err() == nil { for ctx.Err() == nil {
nn, err := v.backendUDP.Read(b) nn, err := v.backendUDP.Read(buf)
if err != nil { if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "read from backend failed, err=%v", err) logger.Warn(ctx, "read from backend failed, err=%v", err)
return return
} }
if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil { if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "write to client failed, err=%v", err) logger.Warn(ctx, "write to client failed, err=%v", err)
return return
} }
} }
}() }()
})
return nil return nil
} }
@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
// Only support IPv4. // Only support IPv4.
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48]) v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
v.ExtraData = b[64:] // Clone so ExtraData owns its bytes independently of the caller's buffer.
// Without this, callers that reuse the receive buffer would silently corrupt
// any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2).
v.ExtraData = bytes.Clone(b[64:])
return nil return nil
} }

View File

@ -211,10 +211,13 @@ func SrtParseSocketID(data []byte) uint32 {
return 0 return 0
} }
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. // ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. The value class
// stops at any whitespace (real CRLF in raw SDP) or at a backslash, so the parser
// is also safe against JSON-escaped SDP bodies where line breaks appear as the
// 2-byte sequence "\r" / "\n" rather than real control characters.
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) { func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
if true { if true {
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s]+)`) ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s\\]+)`)
ufragMatch := ufragRe.FindStringSubmatch(sdp) ufragMatch := ufragRe.FindStringSubmatch(sdp)
if len(ufragMatch) <= 1 { if len(ufragMatch) <= 1 {
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp) return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
@ -223,7 +226,7 @@ func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
} }
if true { if true {
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s]+)`) pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s\\]+)`)
pwdMatch := pwdRe.FindStringSubmatch(sdp) pwdMatch := pwdRe.FindStringSubmatch(sdp)
if len(pwdMatch) <= 1 { if len(pwdMatch) <= 1 {
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp) return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)

View File

@ -338,6 +338,24 @@ func TestParseIceUfragPwd_MissingPwd(t *testing.T) {
} }
} }
// SDP embedded in the legacy /rtc/v1/play/ JSON envelope arrives with "\r\n" as
// the literal 2-byte sequence (backslash + r/n), not real CRLF. The value
// charset must stop at the backslash, otherwise the ufrag would absorb the rest
// of the SDP up to the next real whitespace.
func TestParseIceUfragPwd_JSONEscapedSDP(t *testing.T) {
sdp := `v=0\r\na=ice-ufrag:1f1n4272\r\na=ice-pwd:5f6y69408x2h55232i080mj894901b8n\r\na=fingerprint:sha-256 2D:1D\r\n`
ufrag, pwd, err := ParseIceUfragPwd(sdp)
if err != nil {
t.Fatalf("err = %v", err)
}
if ufrag != "1f1n4272" {
t.Fatalf("ufrag=%q, want 1f1n4272", ufrag)
}
if pwd != "5f6y69408x2h55232i080mj894901b8n" {
t.Fatalf("pwd=%q, want 5f6y69408x2h55232i080mj894901b8n", pwd)
}
}
func TestParseSRTStreamID_WithHost(t *testing.T) { func TestParseSRTStreamID_WithHost(t *testing.T) {
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream") host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
if err != nil { if err != nil {

View File

@ -15,7 +15,7 @@ func VersionMinor() int {
} }
func VersionRevision() int { func VersionRevision() int {
return 148 return 150
} }
func Version() string { func Version() string {

View File

@ -303,6 +303,9 @@ The knowledge base (`memory/srs-*.md`) captures William's knowledge about SRS
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state - `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification - `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
**Next-Generation Server Performance Docs** (`docs/perf/`) — Performance analysis guides for the Go server:
- `proxy-whep.md` — WHEP perf analysis: enable GO_PPROF, run publisher + N WHEP players via srs_bench, collect CPU/alloc/heap/goroutine/trace profiles, read hot spots, diff before/after with `pprof -base`
**Next-Generation Server API Examples** — Executable API documentation: **Next-Generation Server API Examples** — Executable API documentation:
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow - `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow

View File

@ -152,6 +152,10 @@ Only after the user confirms the routing do you proceed to Step 2.
``` ```
bash scripts/proxy-e2e-cluster-test.sh bash scripts/proxy-e2e-cluster-test.sh
``` ```
- Proxy + SRS edge + SRS origin three-tier topology (starts proxy + one SRS edge in `mode remote` registered with the proxy + one upstream SRS origin, publishes RTMP via proxy→edge→origin, then plays the same stream with two concurrent RTMP players where the second joins after a delay as a late joiner on the active edge-pull):
```
bash scripts/proxy-e2e-edge-test.sh
```
- Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other): - Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other):
``` ```
bash scripts/proxy-e2e-redis-test.sh bash scripts/proxy-e2e-redis-test.sh

View File

@ -0,0 +1,342 @@
#!/bin/bash
# E2E test for proxy + edge + origin: starts proxy + one SRS edge (registered
# with proxy) + one SRS upstream origin (not registered). Publishes a single
# RTMP stream through proxy -> edge -> origin, then plays the stream twice
# concurrently from the proxy -> edge with a delay so the second player is a
# late joiner on an already-active edge-pull. Both players must succeed.
set -e
SCRIPT_DIR="$(cd -P "$(dirname "$0")" && pwd)"
# Walk up from SCRIPT_DIR looking for go.mod. This avoids brittle "../../../.."
# counting when the skills directory is reached via a symlink (which changes
# the symbolic vs. physical depth).
WORKSPACE="$SCRIPT_DIR"
while [[ "$WORKSPACE" != "/" && ! -f "$WORKSPACE/go.mod" ]]; do
WORKSPACE="$(dirname "$WORKSPACE")"
done
if [[ ! -f "$WORKSPACE/go.mod" ]]; then
echo "Error: go.mod not found walking up from: $SCRIPT_DIR" >&2
exit 1
fi
# Proxy ports — high range, avoids the SRS port range.
PROXY_RTMP_PORT=11935
PROXY_HTTP_API_PORT=11985
PROXY_HTTP_SERVER_PORT=18080
PROXY_WEBRTC_PORT=18000
PROXY_SRT_PORT=20080
PROXY_SYSTEM_API_PORT=12025
# Origin ports (from origin-for-edge.conf) — upstream of the edge, NOT
# registered with the proxy. Distinct from origin1/2/3 to avoid collisions
# when running this test alongside the other proxy E2E tests.
ORIGIN_RTMP_PORT=19360
ORIGIN_API_PORT=19860
# Edge ports (from edge-for-proxy.conf) — what the proxy treats as its backend.
EDGE_RTMP_PORT=19361
EDGE_HTTP_PORT=8091
EDGE_API_PORT=19861
SOURCE_FLV="$WORKSPACE/trunk/doc/source.flv"
SRS_BINARY="$WORKSPACE/trunk/objs/srs"
ORIGIN_CONF="$WORKSPACE/trunk/conf/origin-for-edge.conf"
EDGE_CONF="$WORKSPACE/trunk/conf/edge-for-proxy.conf"
# Randomize per run so each invocation starts from clean state and never
# shares state with sibling E2E tests publishing to live/livestream.
STREAM_NAME="edge$(date +%s)"
STREAM_PATH="live/$STREAM_NAME"
# PIDs to clean up on exit.
PROXY_PID=""
ORIGIN_PID=""
EDGE_PID=""
PUBLISH_PID=""
PLAYER1_PID=""
PLAYER2_PID=""
cleanup() {
echo ""
echo "=== Cleaning up ==="
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
kill "$pid" 2>/dev/null || true
fi
done
sleep 1
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
kill -9 "$pid" 2>/dev/null || true
fi
done
echo "Cleanup done."
}
trap cleanup EXIT
wait_for_http() {
local url=$1
local name=$2
local i
for i in $(seq 1 30); do
if curl -fsS --max-time 2 "$url" >/dev/null 2>&1; then
echo "$name is ready."
return 0
fi
sleep 1
done
echo "Error: $name is not ready after 30s: $url" >&2
return 1
}
api_has_stream() {
local api_port=$1
local stream=$2
curl -fsS --max-time 3 "http://127.0.0.1:$api_port/api/v1/streams/" 2>/dev/null | grep -q "$stream"
}
verify_probe_has_av() {
local url=$1
local label=$2
local probe_output
probe_output=$(ffprobe -v error -rw_timeout 5000000 -show_streams "$url" 2>&1 || true)
if ! echo "$probe_output" | grep -q "codec_type=video"; then
echo "FAIL: No video stream detected for $label." >&2
echo "ffprobe output:" >&2
echo "$probe_output" >&2
exit 1
fi
if ! echo "$probe_output" | grep -q "codec_type=audio"; then
echo "FAIL: No audio stream detected for $label." >&2
echo "ffprobe output:" >&2
echo "$probe_output" >&2
exit 1
fi
echo "PASS: Audio/video detected for $label."
}
echo "=== E2E Proxy + Edge + Origin Test ==="
echo "Workspace: $WORKSPACE"
echo "Stream: $STREAM_PATH"
echo ""
# --- Pre-checks ---
if [[ ! -f "$SOURCE_FLV" ]]; then
echo "Error: test source not found: $SOURCE_FLV" >&2
exit 1
fi
if [[ ! -f "$ORIGIN_CONF" ]]; then
echo "Error: origin conf not found: $ORIGIN_CONF" >&2
exit 1
fi
if [[ ! -f "$EDGE_CONF" ]]; then
echo "Error: edge conf not found: $EDGE_CONF" >&2
exit 1
fi
for tool in ffmpeg ffprobe curl; do
if ! command -v "$tool" &>/dev/null; then
echo "Error: $tool not found in PATH" >&2
exit 1
fi
done
# --- Step 0: Clean up stale state ---
rm -f "$WORKSPACE/trunk/objs/origin-for-edge.pid" "$WORKSPACE/trunk/objs/edge-for-proxy.pid"
ALL_PORTS="$PROXY_RTMP_PORT $PROXY_HTTP_API_PORT $PROXY_HTTP_SERVER_PORT $PROXY_WEBRTC_PORT $PROXY_SRT_PORT $PROXY_SYSTEM_API_PORT"
ALL_PORTS="$ALL_PORTS $ORIGIN_RTMP_PORT $ORIGIN_API_PORT $EDGE_RTMP_PORT $EDGE_HTTP_PORT $EDGE_API_PORT"
for port in $ALL_PORTS; do
lsof -ti :"$port" 2>/dev/null | xargs kill 2>/dev/null || true
done
sleep 1
# --- Step 1: Build proxy ---
echo "=== Step 1: Building proxy ==="
cd "$WORKSPACE"
make -s 2>&1
echo "Proxy built: $WORKSPACE/bin/srs-proxy"
# --- Step 2: Build SRS (if not already built) ---
if [[ ! -f "$SRS_BINARY" ]]; then
echo "=== Step 2: Building SRS ==="
cd "$WORKSPACE/trunk"
./configure && make 2>&1 | tail -3
echo "SRS built: $SRS_BINARY"
else
echo "=== Step 2: SRS already built ==="
fi
# --- Step 3: Start proxy ---
echo "=== Step 3: Starting proxy (RTMP :$PROXY_RTMP_PORT, System API :$PROXY_SYSTEM_API_PORT) ==="
cd "$WORKSPACE"
env PROXY_RTMP_SERVER=$PROXY_RTMP_PORT \
PROXY_HTTP_API=$PROXY_HTTP_API_PORT \
PROXY_HTTP_SERVER=$PROXY_HTTP_SERVER_PORT \
PROXY_WEBRTC_SERVER=$PROXY_WEBRTC_PORT \
PROXY_SRT_SERVER=$PROXY_SRT_PORT \
PROXY_SYSTEM_API=$PROXY_SYSTEM_API_PORT \
PROXY_LOAD_BALANCER_TYPE=memory \
./bin/srs-proxy >/tmp/srs-proxy-edge-e2e.log 2>&1 &
PROXY_PID=$!
echo "Proxy PID: $PROXY_PID"
wait_for_http "http://127.0.0.1:$PROXY_SYSTEM_API_PORT/api/v1/versions" "Proxy System API"
if ! kill -0 "$PROXY_PID" 2>/dev/null; then
echo "Error: proxy failed to start. Logs:" >&2
cat /tmp/srs-proxy-edge-e2e.log >&2
exit 1
fi
echo "Proxy started."
# --- Step 4: Start upstream origin (no proxy heartbeat) ---
echo "=== Step 4: Starting upstream SRS origin (RTMP :$ORIGIN_RTMP_PORT) ==="
ulimit -n 10000 2>/dev/null || true
cd "$WORKSPACE/trunk"
./objs/srs -c conf/origin-for-edge.conf >/tmp/srs-origin-edge-e2e.log 2>&1 &
ORIGIN_PID=$!
echo "Origin PID: $ORIGIN_PID"
wait_for_http "http://127.0.0.1:$ORIGIN_API_PORT/api/v1/versions" "Origin HTTP API"
if ! kill -0 "$ORIGIN_PID" 2>/dev/null; then
echo "Error: origin failed to start. Logs:" >&2
cat /tmp/srs-origin-edge-e2e.log >&2
exit 1
fi
echo "Origin started."
# --- Step 5: Start edge (mode remote, registered with proxy) ---
echo "=== Step 5: Starting SRS edge (RTMP :$EDGE_RTMP_PORT, upstream :$ORIGIN_RTMP_PORT) ==="
./objs/srs -c conf/edge-for-proxy.conf >/tmp/srs-edge-e2e.log 2>&1 &
EDGE_PID=$!
echo "Edge PID: $EDGE_PID"
wait_for_http "http://127.0.0.1:$EDGE_API_PORT/api/v1/versions" "Edge HTTP API"
# Wait for the edge to register with the proxy (heartbeat interval is 9s).
echo "Waiting for edge to register with proxy (up to 20s)..."
for i in $(seq 1 20); do
if grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
echo "Edge registered with proxy."
break
fi
sleep 1
done
if ! grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
echo "Error: edge did not register with proxy after 20s. Proxy logs:" >&2
cat /tmp/srs-proxy-edge-e2e.log >&2
exit 1
fi
if ! kill -0 "$EDGE_PID" 2>/dev/null; then
echo "Error: edge failed to start. Logs:" >&2
cat /tmp/srs-edge-e2e.log >&2
exit 1
fi
echo "Edge started and registered."
# --- Step 6: Publish RTMP stream to proxy ---
# Path: ffmpeg -> proxy (:$PROXY_RTMP_PORT) -> edge (:$EDGE_RTMP_PORT, mode remote forwards
# publish) -> origin (:$ORIGIN_RTMP_PORT). Verify the publish reached the
# upstream origin via the origin's HTTP API.
echo "=== Step 6: Publishing $STREAM_PATH through proxy -> edge -> origin ==="
ffmpeg -stream_loop -1 -re -i "$SOURCE_FLV" -c copy -f flv \
"rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" >/tmp/srs-ffmpeg-edge-e2e.log 2>&1 &
PUBLISH_PID=$!
echo "Publisher PID: $PUBLISH_PID"
echo "Waiting for stream to reach origin (up to 15s)..."
reached_origin=0
for i in $(seq 1 15); do
if api_has_stream "$ORIGIN_API_PORT" "$STREAM_NAME"; then
reached_origin=1
echo "Stream visible on origin after ${i}s."
break
fi
sleep 1
done
if [[ $reached_origin -ne 1 ]]; then
echo "FAIL: stream did not reach upstream origin via edge." >&2
echo "Publisher logs:" >&2
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
echo "Edge logs:" >&2
cat /tmp/srs-edge-e2e.log >&2
exit 1
fi
if ! kill -0 "$PUBLISH_PID" 2>/dev/null; then
echo "Error: publisher exited unexpectedly. Logs:" >&2
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
exit 1
fi
echo "PASS: publish path proxy -> edge -> origin works."
# --- Step 7: Two concurrent players on the same stream ---
# Player 1 attaches first and triggers the edge-pull from origin. Player 2
# joins a few seconds later as a late joiner on the already-active edge-pull.
# Both must succeed — this is the proxy-side analogue of the C++ edge late-
# join fix.
echo "=== Step 7: Two concurrent RTMP players via proxy ==="
PLAY_DURATION=8
PLAYER_URL="rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH"
echo "Starting player 1 (immediate)..."
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
>/tmp/srs-player1-edge-e2e.log 2>&1 &
PLAYER1_PID=$!
sleep 3
echo "Starting player 2 (late joiner, +3s)..."
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
>/tmp/srs-player2-edge-e2e.log 2>&1 &
PLAYER2_PID=$!
# Wait for both players to finish.
player1_rc=0
player2_rc=0
wait "$PLAYER1_PID" || player1_rc=$?
wait "$PLAYER2_PID" || player2_rc=$?
# Clear PIDs so cleanup() doesn't try to re-kill exited processes.
PLAYER1_PID=""
PLAYER2_PID=""
check_player() {
local label=$1
local rc=$2
local log=$3
if [[ $rc -ne 0 ]]; then
echo "FAIL: $label exited with code $rc. Logs:" >&2
cat "$log" >&2
exit 1
fi
# Decoded-frames check — ffmpeg's progress lines contain `frame=` once it has
# successfully started decoding. Catches "dimensions not set"-style failures
# where ffmpeg returns 0 but never produced output.
if ! grep -qE 'frame= *[1-9]' "$log"; then
echo "FAIL: $label produced no frames. Logs:" >&2
cat "$log" >&2
exit 1
fi
echo "PASS: $label played successfully."
}
check_player "player 1" "$player1_rc" /tmp/srs-player1-edge-e2e.log
check_player "player 2 (late joiner)" "$player2_rc" /tmp/srs-player2-edge-e2e.log
# --- Step 8: Final probe via proxy confirms A/V is still queryable ---
echo "=== Step 8: Final ffprobe verification via proxy ==="
verify_probe_has_av "rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" "proxy $STREAM_PATH"
echo ""
echo "=== E2E Proxy + Edge + Origin Test PASSED ==="

View File

@ -0,0 +1,39 @@
# Edge for the proxy+edge E2E test (skills/srs-develop).
# Registers with the proxy via heartbeat, and in mode remote forwards publishes
# to and pulls plays from origin-for-edge.conf (RTMP :19360).
max_connections 1000;
pid objs/edge-for-proxy.pid;
daemon off;
srs_log_tank console;
rtmp {
listen 19361;
}
http_server {
enabled on;
listen 8091;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 19861;
}
heartbeat {
enabled on;
interval 9;
url http://127.0.0.1:12025/api/v1/srs/register;
device_id edge-for-proxy;
ports on;
}
vhost __defaultVhost__ {
cluster {
mode remote;
origin 127.0.0.1:19360;
}
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
}
}

View File

@ -0,0 +1,19 @@
# Upstream origin for the proxy+edge E2E test (skills/srs-develop).
# Not registered with the proxy; the edge in front of it is. The edge pulls
# from this origin on play and pushes publishes here on publish.
max_connections 1000;
pid objs/origin-for-edge.pid;
daemon off;
srs_log_tank console;
rtmp {
listen 19360;
}
http_api {
enabled on;
listen 19860;
}
vhost __defaultVhost__ {
}

View File

@ -7,6 +7,8 @@ The changelog for SRS.
<a name="v7-changes"></a> <a name="v7-changes"></a>
## SRS 7.0 Changelog ## SRS 7.0 Changelog
* v7.0, 2026-05-19, Merge [#4678](https://github.com/ossrs/srs/pull/4678): Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v7.0.150 (#4678)
* v7.0, 2026-05-17, Merge [#4676](https://github.com/ossrs/srs/pull/4676): Proxy: Fix RTC/SRT reader goroutine leak; unwrap legacy WHEP JSON envelope; add WHEP pprof guide. v7.0.149 (#4676)
* v7.0, 2026-05-17, Merge [#4675](https://github.com/ossrs/srs/pull/4675): Proxy: Refactor for testability; add SRT/WHIP E2E and unit tests. v7.0.148 (#4675) * v7.0, 2026-05-17, Merge [#4675](https://github.com/ossrs/srs/pull/4675): Proxy: Refactor for testability; add SRT/WHIP E2E and unit tests. v7.0.148 (#4675)
* v7.0, 2026-05-02, Merge [#4672](https://github.com/ossrs/srs/pull/4672): Proxy: Refactor server APIs and expand RTMP test coverage. v7.0.147 (#4672) * v7.0, 2026-05-02, Merge [#4672](https://github.com/ossrs/srs/pull/4672): Proxy: Refactor server APIs and expand RTMP test coverage. v7.0.147 (#4672)
* v7.0, 2026-04-28, Merge [#4670](https://github.com/ossrs/srs/pull/4670): Proxy: Refine logger and environment APIs. v7.0.146 (#4670) * v7.0, 2026-04-28, Merge [#4670](https://github.com/ossrs/srs/pull/4670): Proxy: Refine logger and environment APIs. v7.0.146 (#4670)

View File

@ -1076,7 +1076,7 @@ SrsHttpStreamServer::SrsHttpStreamServer()
void SrsHttpStreamServer::assemble() void SrsHttpStreamServer::assemble()
{ {
SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_); SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_);
if (!mux) { if (mux) {
mux->add_dynamic_matcher(this); mux->add_dynamic_matcher(this);
} }
} }

View File

@ -2559,7 +2559,10 @@ srs_error_t SrsLiveSource::consumer_dumps(ISrsLiveConsumer *consumer, bool ds, b
} }
// If stream is publishing, dumps the sequence header and gop cache. // If stream is publishing, dumps the sequence header and gop cache.
bool hub_active = hub_ ? hub_->active() : false; // On edge, hub_ is NULL; the source is "publishing" once the edge-pull has
// populated the meta cache. Late-joining consumers must still receive the
// cached metadata + sequence headers + GOP via this path.
bool hub_active = hub_ ? hub_->active() : (meta_->data() || meta_->vsh() || meta_->ash());
if (hub_active) { if (hub_active) {
// Copy metadata and sequence header to consumer. // Copy metadata and sequence header to consumer.
if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) { if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) {

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7 #define VERSION_MAJOR 7
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 148 #define VERSION_REVISION 150
#endif #endif