Compare commits

...

4 Commits

Author SHA1 Message Date
Winlin
0f980d49a6
RTMP: Fix chunk timestamp/basic-header decoding and harden packet unmarshal. v8.0.3 (#4680)
Fixes three RTMP chunk-stream decoding bugs in the proxy and hardens AMF0 command-packet unmarshalling against malformed input, backed by a new protocol unit-test suite.

All changes are confined to the `internal/rtmp` package. No public API, log format, or emitted wire format changes — these are decode-correctness and robustness fixes only.

**3-byte chunk basic header decode (`readBasicHeader`) **

The 3-byte basic-header form (cid 64–65599) was selected by testing `cid == 1` *after* `cid` had already been overwritten with `64 + t`, so it was never detected. Capture the original marker before overwriting and test that instead.

**Extended-timestamp handling (`chunkStream`, `readMessageHeader`)**

- Use the extended timestamp as a delta for fmt=1/2 chunks (and a fmt=3 first chunk continuing them), required when the delta is ≥ `0xffffff`. Timestamp computation is unified into a single post-step: extended timestamp when present, otherwise the 3-byte header delta; fmt=0 absolute, fmt=1/2 accumulated.
- Detect Type-3 chunks that omit the extended timestamp. FMLE/FMS/Flash follow the RTMP 2012 spec and always send it on Type-3 chunks; librtmp/ffmpeg may not. Switched from an unconditional 4-byte read to `Peek` + conditional `Discard`: if the peeked value differs from the stored one on a non-first chunk, those 4 bytes are payload and are left in the reader.
- Split the single `extendedTimestamp` bool into `hasExtendedTimestamp` (bool) and `extendedTimestamp` (the last raw value, used for the detection above).

**Packet unmarshal hardening**
- Add an `advanceBytes(p, n)` helper that bounds-checks each `p = p[field.Size():]` advance, turning a slice-out-of-range panic into a clean error on truncated/untrusted input. Applied in `CallPacket`, `CreateStreamResPacket`, `PublishPacket`, and `PlayPacket`.
- Reset the optional `CommandObject` / `Args` to nil before probing for their presence, so a stale constructor default (e.g. Null) isn't counted by `Size()` and can't overflow a later advance.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-29 07:17:32 -04:00
Winlin
8df9410880
Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v8.0.2 (#4678)
Two edge-cluster regressions surfaced when validating an RTMP
origin/edge setup. Each is a small, surgical fix in its own commit.

- **HTTP-FLV play on edge always 404'd.**
`SrsHttpStreamServer::assemble()` registered the dynamic matcher only
when the mux cast was `NULL` (inverted guard), so the matcher was never
wired up. On edge the FLV mount is created lazily by the dynamic
matcher, so every HTTP-FLV client got 404. Invert the guard to register
when the mux is valid, mirroring the destructor.
(`trunk/src/app/srs_app_http_stream.cpp`)

- **RTMP players that join an edge stream after the first player fail to
decode.** After v7.0.94 (#4513) stopped creating `SrsOriginHub` on edge,
the `hub_active` gate in `SrsLiveSource::consumer_dumps()` always
evaluated false on edge. That gate guards the dump of cached
`onMetaData` + AVC sequence header + AAC sequence header + GOP cache to
a new consumer. Result: the first player attaches before the edge-pull
starts and gets headers via the live fan-out, but every subsequent
player gets coded payload with no codec config and ffmpeg aborts with
`dimensions not set` / `Could not write header`. Fall back to the meta
cache state when `hub_` is `NULL`, so the dump path runs once the
edge-pull has populated the cache.
(`trunk/src/app/srs_app_rtmp_source.cpp`)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 10:53:30 -04:00
Winlin
913b773282
Proxy: Fix RTC/SRT reader leak, legacy WHEP unwrap, WHEP perf guide. v8.0.1 (#4676)
- Fix a goroutine leak on the WHEP path: the backend→client reader was
being spawned on every inbound client packet (STUN keepalives + RTCP
feedback), leaking tens of thousands of goroutines under steady-state
load. Now spawned exactly once per connection via `sync.Once` on both
the RTC and SRT proxies. Listener and reader receive buffers are also
reused across iterations.
- Make the legacy SRS `/rtc/v1/play/` and `/rtc/v1/publish/` APIs work
end-to-end through the proxy. Those endpoints wrap the SDP in a JSON
envelope (`{"sdp":"v=0\r\n..."}` where `\r\n` is the literal 2-byte JSON
escape, not real CRLF), so ICE parsing previously absorbed the rest of
the body into the ufrag. Added `unwrapSDPEnvelope` for ICE extraction
and tightened `ParseIceUfragPwd`'s value class to stop at `\`. The bytes
forwarded to the client and the in-body candidate-port rewrite still
operate on the raw envelope.
- Enable `net/http/pprof` endpoints when `GO_PPROF` is set (blank import
in `internal/debug/pprof.go`) and add `docs/perf/proxy-whep.md` walking
through CPU/alloc/heap/goroutine/trace collection and `pprof -base`
before/after diffs for the WHEP workload (1 publisher + N players).
- Tighten `SRTHandshakePacket.UnmarshalBinary` to
`bytes.Clone(ExtraData)` so decoded handshakes kept on the connection
(`handshake0`, `handshake2`) stay valid once the receive buffer is
reused.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 21:13:21 -04:00
winlin
57d1062e91 Code name: Free. v8.0.0
"Free" represents the new era of open source development empowered by AI. Both freedom and free — the AI agent is not just free labor, it is like a copy of myself, or even tens of copies, all deeply understanding this project, how to deliver high-quality code, and how to serve the community. AI handles all the dirty work — the boring tasks, the documentation, the coding — and often does it better than I could, with ten times the power. We built an AI robot for the community to answer questions and help users learn this project, and we used AI to almost entirely rewrite the SRS Proxy server — its structure, its workflow — so that AI agents can comprehensively manage and maintain it. With AI I have power, and I have choice: no longer waiting for other developers to contribute, I am free to manage this project, freed from the labor of maintaining it alone. This is a fantastic, amazing new era for building and sustaining open source projects and communities.
2026-05-17 12:34:04 -04:00
24 changed files with 2223 additions and 103 deletions

View File

@ -10,7 +10,7 @@
[![](https://img.shields.io/docker/pulls/ossrs/srs)](https://hub.docker.com/r/ossrs/srs/tags) [![](https://img.shields.io/docker/pulls/ossrs/srs)](https://hub.docker.com/r/ossrs/srs/tags)
[![](https://codecov.io/gh/ossrs/srs/graph/badge.svg?token=Zx2LhdtA39)](https://codecov.io/gh/ossrs/srs) [![](https://codecov.io/gh/ossrs/srs/graph/badge.svg?token=Zx2LhdtA39)](https://codecov.io/gh/ossrs/srs)
SRS/7.0 ([Kai](https://ossrs.io/lts/en-us/product#release-70)) is a simple, high-efficiency, and real-time video server, SRS/8.0 ([Free](https://ossrs.io/lts/en-us/product#release-80)) is a simple, high-efficiency, and real-time video server,
supporting RTMP/WebRTC/HLS/HTTP-FLV/SRT/MPEG-DASH/GB28181, Linux/macOS, X86_64/ARMv7/AARCH64/M1/RISCV/LOONGARCH/MIPS, supporting RTMP/WebRTC/HLS/HTTP-FLV/SRT/MPEG-DASH/GB28181, Linux/macOS, X86_64/ARMv7/AARCH64/M1/RISCV/LOONGARCH/MIPS,
with codec support for H.264, H.265, AV1, VP9, AAC, Opus, and G.711, with codec support for H.264, H.265, AV1, VP9, AAC, Opus, and G.711,
and essential [features](trunk/doc/Features.md#features). and essential [features](trunk/doc/Features.md#features).

149
docs/perf/proxy-whep.md Normal file
View File

@ -0,0 +1,149 @@
# How to Analyze WHEP Performance for the Proxy Server
This guide walks through profiling the Go proxy under a WHEP (WebRTC play) load.
The workload of interest is **one RTMP publisher + N WHEP players**, where N is
large enough to stress the proxy's UDP forwarding path (typically 300+).
When analyzing WHEP performance for the proxy, you should:
1. Set up the topology: proxy + SRS origin + publisher + WHEP load
2. Enable Go pprof on the proxy
3. Run the load and let it warm up
4. Collect CPU, allocation, heap, goroutine, and trace profiles
5. Read the profiles and identify hot spots
6. Save profiles to compare before and after a change
## Step 1: Build and Start the Proxy with pprof
The proxy reads `GO_PPROF` from the environment and, when set, exposes
`net/http/pprof` endpoints at that address. Use the same standard ports SRS
uses by default so the publisher and player commands stay unchanged.
```bash
cd ~/git/srs
make && env GO_PPROF=:6060 \
PROXY_RTMP_SERVER=1935 PROXY_HTTP_SERVER=8080 \
PROXY_HTTP_API=1985 PROXY_WEBRTC_SERVER=8000 PROXY_SRT_SERVER=10080 \
PROXY_SYSTEM_API=12025 PROXY_LOAD_BALANCER_TYPE=memory \
./bin/srs-proxy
```
> The pprof endpoints live under `http://localhost:6060/debug/pprof/`. The
> proxy registers them only because `internal/debug/pprof.go` blank-imports
> `net/http/pprof`. Without that import the endpoints return 404.
## Step 2: Start the SRS Origin on Alt Ports
`origin1-for-proxy.conf` runs SRS on non-standard ports (RTMP 19351, HTTP 8081,
API 19851, RTC 8001/udp, SRT 10081) so the proxy can sit on the defaults. SRS
auto-registers with the proxy's system API on startup.
Set `CANDIDATE` to a LAN-reachable IP so the SDP answer the proxy returns
points clients at an address they can route to. The proxy only rewrites the
candidate **port**; the IP comes from the origin's SDP.
```bash
ulimit -n 10000 && bash -c "cd ~/git/srs/trunk && \
CANDIDATE=192.168.3.187 ./objs/srs -c conf/origin1-for-proxy.conf"
```
## Step 3: Run the WHEP Workload
In separate terminals, start the publisher and the WHEP load generator.
**Publisher (RTMP):**
```bash
cd ~/git/srs/trunk
ffmpeg -stream_loop -1 -re -i doc/source.200kbps.768x320.flv \
-c copy -f flv -y rtmp://localhost/live/livestream
```
**WHEP players (use the LAN IP that matches `CANDIDATE`):**
```bash
cd ~/git/srs/trunk/3rdparty/srs-bench
./objs/srs_bench -sr webrtc://192.168.3.187/live/livestream -nn 300
```
Let the workload run for at least 30 seconds before sampling. Connection
setup churn dominates the first few seconds and will skew profiles taken
too early.
> Sanity-check with `-nn 1` first. If a single WHEP session does not play,
> the 300-player run is testing something other than steady-state forwarding.
## Step 4: Collect Profiles
Profiles must be collected **while the workload is steady**, not before or
after. The CPU profile is the single most useful starting point.
```bash
# CPU profile (30s sample) — interactive web UI on :8123
# Use :8123 (or any free port) because :8080 is the proxy's HTTP-FLV/HLS port.
go tool pprof -http=:8123 'http://localhost:6060/debug/pprof/profile?seconds=30'
# Allocation profile — GC pressure / per-packet allocations
go tool pprof -http=:8124 http://localhost:6060/debug/pprof/allocs
# Heap (live memory snapshot)
go tool pprof -http=:8125 http://localhost:6060/debug/pprof/heap
# Goroutine count + stack dump — look for goroutine explosion under load
curl -s 'http://localhost:6060/debug/pprof/goroutine?debug=1' | head -50
# Runtime trace (10s) — GC pauses, scheduler latency, syscall behavior
curl -s -o trace.out 'http://localhost:6060/debug/pprof/trace?seconds=10'
go tool trace trace.out
```
The web UI requires Graphviz for the Flame Graph and Graph views:
```bash
brew install graphviz # macOS
```
If you cannot install Graphviz, the **Top** view in the web UI is HTML-only
and works without it. The CLI form is also unaffected:
```bash
go tool pprof 'http://localhost:6060/debug/pprof/profile?seconds=30'
(pprof) top20
(pprof) top20 -cum
(pprof) list <FunctionName>
```
## Step 5: Read the Profiles
Open the web UI and use the views in this order:
1. **Flame Graph** — visual hot path. Wide bars near the top are where time
is spent. For 300-player WHEP the path should be dominated by
`webRTCProxyServer.Run` and its UDP read/write children.
2. **Top** — sorted list by `flat` (self time) and `cum` (cumulative). The
top 510 functions usually tell the whole story.
3. **Graph** — call graph with edge weights. Good for tracing "who calls this
hot function".
4. **Source** — line-level cost inside a single function. Use after Top has
pointed you at a function worth dissecting.
## Step 6: Save Profiles for Before/After Comparison
When you change code to fix a hot spot, comparing profiles is the only
reliable way to confirm the fix moved the needle (and didn't just shift cost
elsewhere).
```bash
# Save the raw profile from a baseline run
curl -s -o cpu-before.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
# After the code change, sample again under the same workload
curl -s -o cpu-after.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
# Diff the two
go tool pprof -http=:8123 -base cpu-before.pb.gz cpu-after.pb.gz
```
In the diff view, red bars are functions that got more expensive, green
bars are functions that got cheaper. The total should shrink overall if
the change is a net win.

View File

@ -6,6 +6,7 @@ package debug
import ( import (
"context" "context"
"net/http" "net/http"
_ "net/http/pprof"
"srsx/internal/env" "srsx/internal/env"
"srsx/internal/logger" "srsx/internal/logger"

View File

@ -6,6 +6,7 @@ package proxy
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -241,13 +242,17 @@ func (v *webRTCProxyServer) proxyApiToBackend(
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1) localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
} }
// Fetch the ice-ufrag and ice-pwd from local SDP answer. // Fetch the ice-ufrag and ice-pwd from local SDP answer. The legacy SRS
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(remoteSDPOffer) // /rtc/v1/play/ and /rtc/v1/publish/ APIs wrap the SDP in a JSON envelope
// like {"sdp":"v=0\r\n..."}, so unwrap it before parsing ICE attributes.
// The forwarded bytes and the in-body candidate port rewrite still operate
// on the raw envelope, which is what the client expects to see back.
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(remoteSDPOffer))
if err != nil { if err != nil {
return errors.Wrapf(err, "parse remote sdp offer") return errors.Wrapf(err, "parse remote sdp offer")
} }
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(localSDPAnswer) localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(localSDPAnswer))
if err != nil { if err != nil {
return errors.Wrapf(err, "parse local sdp answer") return errors.Wrapf(err, "parse local sdp answer")
} }
@ -297,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error {
go func() { go func() {
defer v.wg.Done() defer v.wg.Done()
for ctx.Err() == nil { // Reuse a single receive buffer across iterations. handleClientUDP and the
// downstream HandlePacket consume the slice synchronously (kernel sendto
// copies bytes; STUN parsing copies the username via string()), so no caller
// retains the slice past the call.
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, addr, err := listener.ReadFrom(buf) n, addr, err := listener.ReadFrom(buf)
if err != nil { if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error. // If context is canceled or connection is closed, exit gracefully without logging error.
@ -414,6 +423,11 @@ type rtcConnection struct {
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
// UDP dial; tests may override via a functional option to supply a fake connection. // UDP dial; tests may override via a functional option to supply a fake connection.
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
// Guards the spawn of the backend->client reader goroutine. HandlePacket is
// called on every inbound client packet (STUN keepalives + RTCP feedback at
// steady state) but the reader must only start once per connection.
startReader stdSync.Once
} }
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection { func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
@ -462,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error {
return nil return nil
} }
// Proxy all messages from backend to client. // Spawn the backend->client reader exactly once per connection. Previously
// this goroutine was launched unconditionally here on every inbound client
// packet, which leaked tens of thousands of goroutines under steady-state
// WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across
// iterations: WriteTo copies into the kernel before returning, so the next
// Read can safely overwrite.
v.startReader.Do(func() {
go func() { go func() {
for ctx.Err() == nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, err := v.backendUDP.Read(buf) n, err := v.backendUDP.Read(buf)
if err != nil { if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "read from backend failed, err=%v", err) logger.Warn(ctx, "read from backend failed, err=%v", err)
break return
} }
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil { if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "write to client failed, err=%v", err) logger.Warn(ctx, "write to client failed, err=%v", err)
break return
} }
} }
}() }()
})
if _, err := v.backendUDP.Write(data); err != nil { if _, err := v.backendUDP.Write(data); err != nil {
return errors.Wrapf(err, "write to backend %v", v.StreamURL) return errors.Wrapf(err, "write to backend %v", v.StreamURL)
@ -520,6 +541,25 @@ func (v *rtcConnection) connectBackend(ctx context.Context) error {
return nil return nil
} }
// unwrapSDPEnvelope returns the SDP string carried inside the legacy SRS RTC
// JSON envelope used by /rtc/v1/play/ and /rtc/v1/publish/, e.g. body of the
// form {"sdp":"v=0\r\n...", ...}. For standards-based WHIP/WHEP bodies (raw
// SDP), or any input we can't recognise, the original body is returned
// unchanged so the caller can parse it as raw SDP.
func unwrapSDPEnvelope(body string) string {
trimmed := strings.TrimLeft(body, " \t\r\n")
if !strings.HasPrefix(trimmed, "{") {
return body
}
var env struct {
SDP string `json:"sdp"`
}
if err := json.Unmarshal([]byte(trimmed), &env); err != nil || env.SDP == "" {
return body
}
return env.SDP
}
type rtcICEPair struct { type rtcICEPair struct {
// The remote ufrag, used for ICE username and session id. // The remote ufrag, used for ICE username and session id.
RemoteICEUfrag string `json:"remote_ufrag"` RemoteICEUfrag string `json:"remote_ufrag"`

View File

@ -896,6 +896,95 @@ func TestWebRTCProxyServer_HandleApiForWHEP_HappyPath(t *testing.T) {
} }
} }
// Legacy /rtc/v1/play/ (used by srs_bench) wraps the SDP in a JSON envelope
// like {"sdp":"v=0\r\n..."} where \r\n is the literal 2-byte JSON escape, not
// real CRLF. The proxy must unwrap the envelope before parsing ICE attributes;
// otherwise the stored ufrag is contaminated with the next attributes and the
// STUN binding from the client cannot be matched to the connection.
func TestWebRTCProxyServer_HandleApiForWHEP_LegacyJSONEnvelope(t *testing.T) {
f := newWebRTCFixture()
f.env.WebRTCServerReturns("19000")
const backendRTCPort = "18000"
answerJSON := `{"code":0,"sessionid":"sid","sdp":"v=0\r\na=ice-ufrag:local-ufrag\r\na=ice-pwd:local-pwd-very-long-value-32xxxx\r\na=candidate:1 1 udp 1 1.2.3.4 ` + backendRTCPort + ` typ host\r\n"}`
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(answerJSON))
}))
defer backend.Close()
f.server.backendURL = func(b *lb.OriginServer, r *http.Request) (string, error) {
return backend.URL + r.URL.Path, nil
}
f.lb.PickReturns(&lb.OriginServer{IP: "10.0.0.1", API: []string{"1985"}, RTC: []string{backendRTCPort}}, nil)
offerJSON := `{"api":"http://10.0.0.1:1985/rtc/v1/play/","clientip":"","sdp":"v=0\r\na=ice-ufrag:remote-ufrag\r\na=ice-pwd:remote-pwd-very-long-value-32xx\r\n","streamurl":"webrtc://example.com/live/demo"}`
req := httptest.NewRequest(http.MethodPost, "http://example.com/rtc/v1/play/", strings.NewReader(offerJSON))
rec := httptest.NewRecorder()
if err := f.server.HandleApiForWHEP(context.Background(), rec, req); err != nil {
t.Fatalf("WHEP: %v", err)
}
if f.lb.StoreWebRTCCallCount() != 1 {
t.Fatalf("StoreWebRTC called %d times, want 1", f.lb.StoreWebRTCCallCount())
}
_, _, stored := f.lb.StoreWebRTCArgsForCall(0)
if got, want := stored.GetUfrag(), "local-ufrag:remote-ufrag"; got != want {
t.Fatalf("stored ufrag=%q, want %q", got, want)
}
// The response forwarded to the client should still be the JSON envelope
// with the backend port rewritten to the proxy's WebRTC port.
body := rec.Body.String()
if !strings.Contains(body, " 19000 typ host") {
t.Fatalf("answer did not rewrite backend port; got %q", body)
}
if strings.Contains(body, " "+backendRTCPort+" typ host") {
t.Fatalf("answer still contains original backend port; got %q", body)
}
}
func TestUnwrapSDPEnvelope(t *testing.T) {
cases := []struct {
name string
in string
want string
}{
{
name: "raw sdp passthrough",
in: "v=0\r\na=ice-ufrag:abc\r\n",
want: "v=0\r\na=ice-ufrag:abc\r\n",
},
{
name: "json envelope unwrapped",
in: `{"code":0,"sdp":"v=0\r\na=ice-ufrag:abc\r\n"}`,
want: "v=0\r\na=ice-ufrag:abc\r\n",
},
{
name: "json envelope with leading whitespace",
in: "\n\t " + `{"sdp":"v=0\r\n"}`,
want: "v=0\r\n",
},
{
name: "malformed json falls back to body",
in: `{not json}`,
want: `{not json}`,
},
{
name: "json without sdp falls back to body",
in: `{"code":0}`,
want: `{"code":0}`,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := unwrapSDPEnvelope(tc.in); got != tc.want {
t.Fatalf("unwrapSDPEnvelope(%q)=%q, want %q", tc.in, got, tc.want)
}
})
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// webRTCProxyServer.proxyApiToBackend: error paths // webRTCProxyServer.proxyApiToBackend: error paths
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error {
go func() { go func() {
defer v.wg.Done() defer v.wg.Done()
for ctx.Err() == nil { // Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary
// clones ExtraData, and backendUDP.Write copies into the kernel before returning, so
// no caller retains a reference to this slice past the call.
buf := make([]byte, 4096) buf := make([]byte, 4096)
for ctx.Err() == nil {
n, caddr, err := v.listener.ReadFrom(buf) n, caddr, err := v.listener.ReadFrom(buf)
if err != nil { if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error. // If context is canceled or connection is closed, exit gracefully without logging error.
@ -196,6 +199,11 @@ type SRTConnection struct {
handshake2 *SRTHandshakePacket handshake2 *SRTHandshakePacket
handshake3 *SRTHandshakePacket handshake3 *SRTHandshakePacket
// Guards the spawn of the backend->client reader goroutine. Keeps the spawn
// idempotent if the client retries handshake 2 (e.g. because our handshake 3
// was dropped) and re-enters the handshake path.
startReader stdSync.Once
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real // dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
// UDP dial; tests may override via a functional option to supply a fake connection. // UDP dial; tests may override via a functional option to supply a fake connection.
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error) dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
return errors.Wrapf(err, "write handshake 3") return errors.Wrapf(err, "write handshake 3")
} }
// Start a goroutine to proxy message from backend to client. // Start a goroutine to proxy message from backend to client. The Once guard
// keeps the spawn idempotent if the client retries handshake 2 (because our
// handshake 3 was lost); the existing reader keeps running and we don't want
// a second one racing it on backendUDP.Read.
// TODO: FIXME: Support close the connection when timeout or client disconnected. // TODO: FIXME: Support close the connection when timeout or client disconnected.
v.startReader.Do(func() {
go func() { go func() {
buf := make([]byte, 4096)
for ctx.Err() == nil { for ctx.Err() == nil {
nn, err := v.backendUDP.Read(b) nn, err := v.backendUDP.Read(buf)
if err != nil { if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "read from backend failed, err=%v", err) logger.Warn(ctx, "read from backend failed, err=%v", err)
return return
} }
if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil { if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Warn(ctx, "write to client failed, err=%v", err) logger.Warn(ctx, "write to client failed, err=%v", err)
return return
} }
} }
}() }()
})
return nil return nil
} }
@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
// Only support IPv4. // Only support IPv4.
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48]) v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
v.ExtraData = b[64:] // Clone so ExtraData owns its bytes independently of the caller's buffer.
// Without this, callers that reuse the receive buffer would silently corrupt
// any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2).
v.ExtraData = bytes.Clone(b[64:])
return nil return nil
} }

View File

@ -141,7 +141,16 @@ type chunkStream struct {
header messageHeader header messageHeader
message *message message *message
count uint64 count uint64
extendedTimestamp bool
// Whether the chunk carries an extended timestamp, set when the (delta) timestamp in
// the message header equals 0xffffff. Type-3 continuation chunks inherit this from the
// preceding Type-0/1/2 chunk.
hasExtendedTimestamp bool
// The raw value last read from the extended timestamp field. Kept separately from
// header.Timestamp (the accumulated message timestamp) so we can both detect Type-3
// chunks that omit the extended timestamp and use it as a delta for fmt=1/2 chunks.
// See readMessageHeader.
extendedTimestamp uint32
} }
func newChunkStream() *chunkStream { func newChunkStream() *chunkStream {
@ -540,29 +549,7 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
// 0x00ffffff), this value MUST be 16777215, and the 'extended // 0x00ffffff), this value MUST be 16777215, and the 'extended
// timestamp header' MUST be present. Otherwise, this value SHOULD be // timestamp header' MUST be present. Otherwise, this value SHOULD be
// the entire delta. // the entire delta.
chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp chunk.hasExtendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
if !chunk.extendedTimestamp {
// Extended timestamp: 0 or 4 bytes
// This field MUST be sent when the normal timsestamp is set to
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
// anything else. So for values less than 0xffffff the normal
// timestamp field SHOULD be used in which case the extended timestamp
// MUST NOT be present. For values greater than or equal to 0xffffff
// the normal timestamp field MUST NOT be used and MUST be set to
// 0xffffff and the extended timestamp MUST be sent.
if format == formatType0 {
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here.
chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
}
}
if format <= formatType1 { if format <= formatType1 {
payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2]) payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
@ -585,27 +572,58 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
p = p[4:] p = p[4:]
} }
} }
} else {
// Update the timestamp even fmt=3 for first chunk packet
if isFirstChunkOfMsg && !chunk.extendedTimestamp {
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
}
} }
// Read extended-timestamp // Read extended-timestamp, present when the (delta) timestamp in the message header is
if chunk.extendedTimestamp { // 0xffffff. Type-3 chunks inherit hasExtendedTimestamp from the preceding chunk.
var timestamp uint32 if chunk.hasExtendedTimestamp {
if err = binary.Read(v.r, binary.BigEndian, &timestamp); err != nil { // Peek instead of read, so the 4 bytes can be left in place when a sender omits the
// extended timestamp on a Type-3 chunk (see the detection below).
var b []byte
if b, err = v.r.Peek(4); err != nil {
return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp) return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
} }
// We always use 31bits timestamp, for some server may use 32bits extended timestamp. // We always use 31bits timestamp, for some server may use 32bits extended timestamp.
// @see https://github.com/ossrs/srs/issues/111 // @see https://github.com/ossrs/srs/issues/111
timestamp &= 0x7fffffff timestamp := binary.BigEndian.Uint32(b) & 0x7fffffff
// TODO: FIXME: Support detect the extended timestamp. // For the RTMP v1 2009 version (6.1.3. Extended Timestamp), Type 3 chunks MUST NOT
// have this field. For the RTMP v1 2012 version (5.3.1.3. Extended Timestamp), it is
// present in Type 3 chunks when the most recent Type 0/1/2 chunk indicated one.
//
// FMLE/FMS/Flash Player follow the 2012 version and always send the extended
// timestamp in Type 3 chunks; librtmp/ffmpeg may not. So detect it: if this is not
// the first chunk of the message and the peeked value differs from the previously
// stored extended timestamp, the sender omitted it and these 4 bytes are payload, so
// leave them in the reader. Otherwise consume and store them.
// @see http://blog.csdn.net/win_lin/article/details/13363699 // @see http://blog.csdn.net/win_lin/article/details/13363699
// @see https://github.com/veovera/enhanced-rtmp/issues/42
if !isFirstChunkOfMsg && chunk.extendedTimestamp > 0 && chunk.extendedTimestamp != timestamp {
// No extended timestamp on this Type-3 chunk; the 4 bytes belong to the payload.
} else {
if _, err = v.r.Discard(4); err != nil {
return errors.Wrapf(err, "discard ext-ts, pkt-ts=%v", chunk.header.Timestamp)
}
chunk.extendedTimestamp = timestamp
}
}
// Compute the message timestamp. The source is the extended timestamp when present,
// otherwise the 3-byte (delta) timestamp from the message header.
//
// fmt=0: the value is the absolute timestamp of the message.
// fmt=1/2 (and a fmt=3 first chunk continuing them): the value is a delta and is
// accumulated onto the previous timestamp. This is required when the delta is >= 0xffffff
// and is therefore carried in the extended timestamp.
timestamp := chunk.header.timestampDelta
if chunk.hasExtendedTimestamp {
timestamp = chunk.extendedTimestamp
}
if format == formatType0 {
chunk.header.Timestamp = uint64(timestamp) chunk.header.Timestamp = uint64(timestamp)
} else if isFirstChunkOfMsg {
chunk.header.Timestamp += uint64(timestamp)
} }
// The extended-timestamp must be unsigned-int, // The extended-timestamp must be unsigned-int,
@ -696,6 +714,11 @@ func (v *protocol) readBasicHeader(ctx context.Context) (format formatType, cid
return return
} }
// Here cid is 0 or 1: a marker selecting the 2B or 3B form, not the real cid. Keep it,
// because cid is overwritten below and the marker decides whether a third byte (the
// high-order part of the cid) follows. Do not test the overwritten cid for this.
marker := cid
// 64-319, 2B chunk header // 64-319, 2B chunk header
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil { if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid) return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
@ -703,7 +726,7 @@ func (v *protocol) readBasicHeader(ctx context.Context) (format formatType, cid
cid = chunkID(64 + uint32(t)) cid = chunkID(64 + uint32(t))
// 64-65599, 3B chunk header // 64-65599, 3B chunk header
if cid == 1 { if marker == 1 {
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil { if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid) return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
} }
@ -1283,6 +1306,12 @@ func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
} }
p = p[v.TransactionID.Size():] p = p[v.TransactionID.Size():]
// Reset the optional command object before deciding whether it is present.
// A New*Packet constructor may have pre-set it to a default (e.g. Null), but
// when the wire data is exhausted here the object is absent. Leaving the stale
// default would make Size() count bytes that were never parsed, overflowing the
// caller's p = p[Size():] advance on truncated, untrusted input.
v.CommandObject = nil
if len(p) > 0 { if len(p) > 0 {
if v.CommandObject, err = Amf0Discovery(p); err != nil { if v.CommandObject, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery command object") return errors.WithMessage(err, "discovery command object")
@ -1353,14 +1382,32 @@ func (v *CallPacket) Size() int {
return size return size
} }
// advanceBytes returns p[n:] after verifying n lies within p. Packet
// UnmarshalBinary advances its cursor by each embedded field's decoded Size();
// on untrusted wire input a malformed length can make Size() exceed the bytes
// actually present, so this guard turns a slice-out-of-range panic into a clean
// error. See the RTMP test plan, P8 (adversarial resource-safety).
func advanceBytes(p []byte, n int) ([]byte, error) {
if n < 0 || n > len(p) {
return nil, errors.Errorf("advance %v exceeds remaining %v bytes", n, len(p))
}
return p[n:], nil
}
func (v *CallPacket) UnmarshalBinary(data []byte) (err error) { func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
p := data p := data
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call") return errors.WithMessage(err, "unmarshal call")
} }
p = p[v.variantCallPacket.Size():] if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
// Reset the optional args before deciding whether they are present, for the
// same reason as variantCallPacket.CommandObject: a stale constructor default
// would be counted by Size() and overflow a later advance.
v.Args = nil
if len(p) > 0 { if len(p) > 0 {
if v.Args, err = Amf0Discovery(p); err != nil { if v.Args, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery args") return errors.WithMessage(err, "discovery args")
@ -1436,7 +1483,9 @@ func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call") return errors.WithMessage(err, "unmarshal call")
} }
p = p[v.variantCallPacket.Size():] if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
if err = v.StreamID.UnmarshalBinary(p); err != nil { if err = v.StreamID.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal sid") return errors.WithMessage(err, "unmarshal sid")
@ -1486,7 +1535,9 @@ func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call") return errors.WithMessage(err, "unmarshal call")
} }
p = p[v.variantCallPacket.Size():] if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("") v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil { if err = v.StreamName.UnmarshalBinary(p); err != nil {
@ -1546,7 +1597,9 @@ func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call") return errors.WithMessage(err, "unmarshal call")
} }
p = p[v.variantCallPacket.Size():] if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("") v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil { if err = v.StreamName.UnmarshalBinary(p); err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -211,10 +211,13 @@ func SrtParseSocketID(data []byte) uint32 {
return 0 return 0
} }
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. // ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. The value class
// stops at any whitespace (real CRLF in raw SDP) or at a backslash, so the parser
// is also safe against JSON-escaped SDP bodies where line breaks appear as the
// 2-byte sequence "\r" / "\n" rather than real control characters.
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) { func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
if true { if true {
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s]+)`) ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s\\]+)`)
ufragMatch := ufragRe.FindStringSubmatch(sdp) ufragMatch := ufragRe.FindStringSubmatch(sdp)
if len(ufragMatch) <= 1 { if len(ufragMatch) <= 1 {
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp) return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
@ -223,7 +226,7 @@ func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
} }
if true { if true {
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s]+)`) pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s\\]+)`)
pwdMatch := pwdRe.FindStringSubmatch(sdp) pwdMatch := pwdRe.FindStringSubmatch(sdp)
if len(pwdMatch) <= 1 { if len(pwdMatch) <= 1 {
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp) return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)

View File

@ -338,6 +338,24 @@ func TestParseIceUfragPwd_MissingPwd(t *testing.T) {
} }
} }
// SDP embedded in the legacy /rtc/v1/play/ JSON envelope arrives with "\r\n" as
// the literal 2-byte sequence (backslash + r/n), not real CRLF. The value
// charset must stop at the backslash, otherwise the ufrag would absorb the rest
// of the SDP up to the next real whitespace.
func TestParseIceUfragPwd_JSONEscapedSDP(t *testing.T) {
sdp := `v=0\r\na=ice-ufrag:1f1n4272\r\na=ice-pwd:5f6y69408x2h55232i080mj894901b8n\r\na=fingerprint:sha-256 2D:1D\r\n`
ufrag, pwd, err := ParseIceUfragPwd(sdp)
if err != nil {
t.Fatalf("err = %v", err)
}
if ufrag != "1f1n4272" {
t.Fatalf("ufrag=%q, want 1f1n4272", ufrag)
}
if pwd != "5f6y69408x2h55232i080mj894901b8n" {
t.Fatalf("pwd=%q, want 5f6y69408x2h55232i080mj894901b8n", pwd)
}
}
func TestParseSRTStreamID_WithHost(t *testing.T) { func TestParseSRTStreamID_WithHost(t *testing.T) {
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream") host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
if err != nil { if err != nil {

View File

@ -6,7 +6,7 @@ package version
import "fmt" import "fmt"
func VersionMajor() int { func VersionMajor() int {
return 7 return 8
} }
// VersionMinor specifies the typical version of SRS we adapt to. // VersionMinor specifies the typical version of SRS we adapt to.
@ -15,7 +15,7 @@ func VersionMinor() int {
} }
func VersionRevision() int { func VersionRevision() int {
return 148 return 3
} }
func Version() string { func Version() string {

View File

@ -9,14 +9,14 @@ import (
) )
func TestVersionComponents(t *testing.T) { func TestVersionComponents(t *testing.T) {
if got := VersionMajor(); got != 7 { if got := VersionMajor(); got != 8 {
t.Fatalf("VersionMajor = %d, want 7", got) t.Fatalf("VersionMajor = %d, want 8", got)
} }
if got := VersionMinor(); got != 0 { if got := VersionMinor(); got != 0 {
t.Fatalf("VersionMinor = %d, want 0", got) t.Fatalf("VersionMinor = %d, want 0", got)
} }
if got := VersionRevision(); got <= 0 { if got := VersionRevision(); got < 0 {
t.Fatalf("VersionRevision = %d, want > 0", got) t.Fatalf("VersionRevision = %d, want >= 0", got)
} }
} }

View File

@ -17,7 +17,7 @@ The C++ media server (`trunk/src/`) serves as both origin server and edge server
`core/` — Foundational definitions: `core/` — Foundational definitions:
- `core` — Core includes, macros, config generated by configure - `core` — Core includes, macros, config generated by configure
- `core_version` through `core_version7` — Version definitions per major release - `core_version` through `core_version8` — Version definitions per major release
- `core_autofree` — SrsUniquePtr smart pointer (RAII) - `core_autofree` — SrsUniquePtr smart pointer (RAII)
- `core_deprecated` — Deprecated SrsAutoFree, kept for compat - `core_deprecated` — Deprecated SrsAutoFree, kept for compat
- `core_performance` — Performance tuning constants (merge-read, etc.) - `core_performance` — Performance tuning constants (merge-read, etc.)
@ -227,7 +227,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine
`internal/env` — Environment-based configuration. All settings via env vars (or `.env` file parsed by an in-tree custom parser — no third-party dep; supports comments, `export` prefix, quoted values, escape sequences, and inline comments). Exposes a `ProxyEnvironment` interface (with a counterfeiter-generated fake in `envfakes/` for downstream tests) with methods for each config value. Default ports: RTMP=11935, HTTP API=11985, HTTP Stream=18080, WebRTC=18000, SRT=20080, System API=12025. Timeouts: grace=20s, force=30s. Supports Redis config and default backend config for debugging. `internal/env` — Environment-based configuration. All settings via env vars (or `.env` file parsed by an in-tree custom parser — no third-party dep; supports comments, `export` prefix, quoted values, escape sequences, and inline comments). Exposes a `ProxyEnvironment` interface (with a counterfeiter-generated fake in `envfakes/` for downstream tests) with methods for each config value. Default ports: RTMP=11935, HTTP API=11985, HTTP Stream=18080, WebRTC=18000, SRT=20080, System API=12025. Timeouts: grace=20s, force=30s. Supports Redis config and default backend config for debugging.
`internal/version` — Version constants. Signature `SRSX`, version tracks the SRS project version (currently 7.0.x). Used in HTTP API responses and startup logging. `internal/version` — Version constants. Signature `SRSX`, version tracks the SRS project version (currently 8.0.x). Used in HTTP API responses and startup logging.
`internal/errors` — Error handling with stack traces, thin wrapper over stdlib `errors`. Provides `New`, `Errorf`, `Wrap`, `Wrapf`, `WithMessage`, `WithStack`, `Cause`, and re-exports `Is`/`As`/`Unwrap`/`Join`. Every error captures a stack trace at creation; `%+v` prints the full trace. `Cause()` walks the error chain to find the root error. `internal/errors` — Error handling with stack traces, thin wrapper over stdlib `errors`. Provides `New`, `Errorf`, `Wrap`, `Wrapf`, `WithMessage`, `WithStack`, `Cause`, and re-exports `Is`/`As`/`Unwrap`/`Join`. Every error captures a stack trace at creation; `%+v` prints the full trace. `Cause()` walks the error chain to find the root error.
@ -303,6 +303,9 @@ The knowledge base (`memory/srs-*.md`) captures William's knowledge about SRS
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state - `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification - `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
**Next-Generation Server Performance Docs** (`docs/perf/`) — Performance analysis guides for the Go server:
- `proxy-whep.md` — WHEP perf analysis: enable GO_PPROF, run publisher + N WHEP players via srs_bench, collect CPU/alloc/heap/goroutine/trace profiles, read hot spots, diff before/after with `pprof -base`
**Next-Generation Server API Examples** — Executable API documentation: **Next-Generation Server API Examples** — Executable API documentation:
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow - `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow

View File

@ -77,13 +77,13 @@ Do NOT attempt unsupported tasks.
1. Ask the user for the PR number if they haven't given it. 1. Ask the user for the PR number if they haven't given it.
2. Bump revision by one in **both** version files, keeping them in sync: 2. Bump revision by one in **both** version files, keeping them in sync:
- `internal/version/version.go``VersionRevision()` - `internal/version/version.go``VersionRevision()`
- `trunk/src/core/srs_core_version7.hpp` — `VERSION_REVISION` - `trunk/src/core/srs_core_version8.hpp` — `VERSION_REVISION`
3. Add a new top entry to `trunk/doc/CHANGELOG.md` under `## SRS 7.0 Changelog`, matching the existing format: 3. Add a new top entry to `trunk/doc/CHANGELOG.md` under `## SRS 8.0 Changelog`, matching the existing format:
``` ```
* v7.0, YYYY-MM-DD, Merge [#PR](URL): <Prefix>: <one-line summary>. v7.0.<rev> (#PR) * v8.0, YYYY-MM-DD, Merge [#PR](URL): <Prefix>: <one-line summary>. v8.0.<rev> (#PR)
``` ```
Propose the summary to the user; don't invent one unilaterally. Propose the summary to the user; don't invent one unilaterally.
4. Stop. Let the user review. When they `git add` the version files and changelog, commit with a short message like `Proxy: Bump to v7.0.<rev> for #<PR>.`. 4. Stop. Let the user review. When they `git add` the version files and changelog, commit with a short message like `Proxy: Bump to v8.0.<rev> for #<PR>.`.
--- ---
@ -143,7 +143,7 @@ Only after the user confirms the routing do you proceed to Step 2.
``` ```
bash scripts/proxy-utest.sh --coverage bash scripts/proxy-utest.sh --coverage
``` ```
4. Run the proxy E2E tests: 4. Run **all** of the proxy E2E tests below — every one, not just the first. Run them one at a time (they bind fixed ports, so they cannot run in parallel), and do not stop early: a later test can fail even when the earlier ones pass.
- Single-origin RTMP proxy test (starts proxy + one SRS origin, publishes RTMP, verifies playback): - Single-origin RTMP proxy test (starts proxy + one SRS origin, publishes RTMP, verifies playback):
``` ```
bash scripts/proxy-e2e-test.sh bash scripts/proxy-e2e-test.sh
@ -152,6 +152,10 @@ Only after the user confirms the routing do you proceed to Step 2.
``` ```
bash scripts/proxy-e2e-cluster-test.sh bash scripts/proxy-e2e-cluster-test.sh
``` ```
- Proxy + SRS edge + SRS origin three-tier topology (starts proxy + one SRS edge in `mode remote` registered with the proxy + one upstream SRS origin, publishes RTMP via proxy→edge→origin, then plays the same stream with two concurrent RTMP players where the second joins after a delay as a late joiner on the active edge-pull):
```
bash scripts/proxy-e2e-edge-test.sh
```
- Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other): - Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other):
``` ```
bash scripts/proxy-e2e-redis-test.sh bash scripts/proxy-e2e-redis-test.sh

View File

@ -0,0 +1,342 @@
#!/bin/bash
# E2E test for proxy + edge + origin: starts proxy + one SRS edge (registered
# with proxy) + one SRS upstream origin (not registered). Publishes a single
# RTMP stream through proxy -> edge -> origin, then plays the stream twice
# concurrently from the proxy -> edge with a delay so the second player is a
# late joiner on an already-active edge-pull. Both players must succeed.
set -e
SCRIPT_DIR="$(cd -P "$(dirname "$0")" && pwd)"
# Walk up from SCRIPT_DIR looking for go.mod. This avoids brittle "../../../.."
# counting when the skills directory is reached via a symlink (which changes
# the symbolic vs. physical depth).
WORKSPACE="$SCRIPT_DIR"
while [[ "$WORKSPACE" != "/" && ! -f "$WORKSPACE/go.mod" ]]; do
WORKSPACE="$(dirname "$WORKSPACE")"
done
if [[ ! -f "$WORKSPACE/go.mod" ]]; then
echo "Error: go.mod not found walking up from: $SCRIPT_DIR" >&2
exit 1
fi
# Proxy ports — high range, avoids the SRS port range.
PROXY_RTMP_PORT=11935
PROXY_HTTP_API_PORT=11985
PROXY_HTTP_SERVER_PORT=18080
PROXY_WEBRTC_PORT=18000
PROXY_SRT_PORT=20080
PROXY_SYSTEM_API_PORT=12025
# Origin ports (from origin-for-edge.conf) — upstream of the edge, NOT
# registered with the proxy. Distinct from origin1/2/3 to avoid collisions
# when running this test alongside the other proxy E2E tests.
ORIGIN_RTMP_PORT=19360
ORIGIN_API_PORT=19860
# Edge ports (from edge-for-proxy.conf) — what the proxy treats as its backend.
EDGE_RTMP_PORT=19361
EDGE_HTTP_PORT=8091
EDGE_API_PORT=19861
SOURCE_FLV="$WORKSPACE/trunk/doc/source.flv"
SRS_BINARY="$WORKSPACE/trunk/objs/srs"
ORIGIN_CONF="$WORKSPACE/trunk/conf/origin-for-edge.conf"
EDGE_CONF="$WORKSPACE/trunk/conf/edge-for-proxy.conf"
# Randomize per run so each invocation starts from clean state and never
# shares state with sibling E2E tests publishing to live/livestream.
STREAM_NAME="edge$(date +%s)"
STREAM_PATH="live/$STREAM_NAME"
# PIDs to clean up on exit.
PROXY_PID=""
ORIGIN_PID=""
EDGE_PID=""
PUBLISH_PID=""
PLAYER1_PID=""
PLAYER2_PID=""
cleanup() {
echo ""
echo "=== Cleaning up ==="
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
kill "$pid" 2>/dev/null || true
fi
done
sleep 1
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
kill -9 "$pid" 2>/dev/null || true
fi
done
echo "Cleanup done."
}
trap cleanup EXIT
wait_for_http() {
local url=$1
local name=$2
local i
for i in $(seq 1 30); do
if curl -fsS --max-time 2 "$url" >/dev/null 2>&1; then
echo "$name is ready."
return 0
fi
sleep 1
done
echo "Error: $name is not ready after 30s: $url" >&2
return 1
}
api_has_stream() {
local api_port=$1
local stream=$2
curl -fsS --max-time 3 "http://127.0.0.1:$api_port/api/v1/streams/" 2>/dev/null | grep -q "$stream"
}
verify_probe_has_av() {
local url=$1
local label=$2
local probe_output
probe_output=$(ffprobe -v error -rw_timeout 5000000 -show_streams "$url" 2>&1 || true)
if ! echo "$probe_output" | grep -q "codec_type=video"; then
echo "FAIL: No video stream detected for $label." >&2
echo "ffprobe output:" >&2
echo "$probe_output" >&2
exit 1
fi
if ! echo "$probe_output" | grep -q "codec_type=audio"; then
echo "FAIL: No audio stream detected for $label." >&2
echo "ffprobe output:" >&2
echo "$probe_output" >&2
exit 1
fi
echo "PASS: Audio/video detected for $label."
}
echo "=== E2E Proxy + Edge + Origin Test ==="
echo "Workspace: $WORKSPACE"
echo "Stream: $STREAM_PATH"
echo ""
# --- Pre-checks ---
if [[ ! -f "$SOURCE_FLV" ]]; then
echo "Error: test source not found: $SOURCE_FLV" >&2
exit 1
fi
if [[ ! -f "$ORIGIN_CONF" ]]; then
echo "Error: origin conf not found: $ORIGIN_CONF" >&2
exit 1
fi
if [[ ! -f "$EDGE_CONF" ]]; then
echo "Error: edge conf not found: $EDGE_CONF" >&2
exit 1
fi
for tool in ffmpeg ffprobe curl; do
if ! command -v "$tool" &>/dev/null; then
echo "Error: $tool not found in PATH" >&2
exit 1
fi
done
# --- Step 0: Clean up stale state ---
rm -f "$WORKSPACE/trunk/objs/origin-for-edge.pid" "$WORKSPACE/trunk/objs/edge-for-proxy.pid"
ALL_PORTS="$PROXY_RTMP_PORT $PROXY_HTTP_API_PORT $PROXY_HTTP_SERVER_PORT $PROXY_WEBRTC_PORT $PROXY_SRT_PORT $PROXY_SYSTEM_API_PORT"
ALL_PORTS="$ALL_PORTS $ORIGIN_RTMP_PORT $ORIGIN_API_PORT $EDGE_RTMP_PORT $EDGE_HTTP_PORT $EDGE_API_PORT"
for port in $ALL_PORTS; do
lsof -ti :"$port" 2>/dev/null | xargs kill 2>/dev/null || true
done
sleep 1
# --- Step 1: Build proxy ---
echo "=== Step 1: Building proxy ==="
cd "$WORKSPACE"
make -s 2>&1
echo "Proxy built: $WORKSPACE/bin/srs-proxy"
# --- Step 2: Build SRS (if not already built) ---
if [[ ! -f "$SRS_BINARY" ]]; then
echo "=== Step 2: Building SRS ==="
cd "$WORKSPACE/trunk"
./configure && make 2>&1 | tail -3
echo "SRS built: $SRS_BINARY"
else
echo "=== Step 2: SRS already built ==="
fi
# --- Step 3: Start proxy ---
echo "=== Step 3: Starting proxy (RTMP :$PROXY_RTMP_PORT, System API :$PROXY_SYSTEM_API_PORT) ==="
cd "$WORKSPACE"
env PROXY_RTMP_SERVER=$PROXY_RTMP_PORT \
PROXY_HTTP_API=$PROXY_HTTP_API_PORT \
PROXY_HTTP_SERVER=$PROXY_HTTP_SERVER_PORT \
PROXY_WEBRTC_SERVER=$PROXY_WEBRTC_PORT \
PROXY_SRT_SERVER=$PROXY_SRT_PORT \
PROXY_SYSTEM_API=$PROXY_SYSTEM_API_PORT \
PROXY_LOAD_BALANCER_TYPE=memory \
./bin/srs-proxy >/tmp/srs-proxy-edge-e2e.log 2>&1 &
PROXY_PID=$!
echo "Proxy PID: $PROXY_PID"
wait_for_http "http://127.0.0.1:$PROXY_SYSTEM_API_PORT/api/v1/versions" "Proxy System API"
if ! kill -0 "$PROXY_PID" 2>/dev/null; then
echo "Error: proxy failed to start. Logs:" >&2
cat /tmp/srs-proxy-edge-e2e.log >&2
exit 1
fi
echo "Proxy started."
# --- Step 4: Start upstream origin (no proxy heartbeat) ---
echo "=== Step 4: Starting upstream SRS origin (RTMP :$ORIGIN_RTMP_PORT) ==="
ulimit -n 10000 2>/dev/null || true
cd "$WORKSPACE/trunk"
./objs/srs -c conf/origin-for-edge.conf >/tmp/srs-origin-edge-e2e.log 2>&1 &
ORIGIN_PID=$!
echo "Origin PID: $ORIGIN_PID"
wait_for_http "http://127.0.0.1:$ORIGIN_API_PORT/api/v1/versions" "Origin HTTP API"
if ! kill -0 "$ORIGIN_PID" 2>/dev/null; then
echo "Error: origin failed to start. Logs:" >&2
cat /tmp/srs-origin-edge-e2e.log >&2
exit 1
fi
echo "Origin started."
# --- Step 5: Start edge (mode remote, registered with proxy) ---
echo "=== Step 5: Starting SRS edge (RTMP :$EDGE_RTMP_PORT, upstream :$ORIGIN_RTMP_PORT) ==="
./objs/srs -c conf/edge-for-proxy.conf >/tmp/srs-edge-e2e.log 2>&1 &
EDGE_PID=$!
echo "Edge PID: $EDGE_PID"
wait_for_http "http://127.0.0.1:$EDGE_API_PORT/api/v1/versions" "Edge HTTP API"
# Wait for the edge to register with the proxy (heartbeat interval is 9s).
echo "Waiting for edge to register with proxy (up to 20s)..."
for i in $(seq 1 20); do
if grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
echo "Edge registered with proxy."
break
fi
sleep 1
done
if ! grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
echo "Error: edge did not register with proxy after 20s. Proxy logs:" >&2
cat /tmp/srs-proxy-edge-e2e.log >&2
exit 1
fi
if ! kill -0 "$EDGE_PID" 2>/dev/null; then
echo "Error: edge failed to start. Logs:" >&2
cat /tmp/srs-edge-e2e.log >&2
exit 1
fi
echo "Edge started and registered."
# --- Step 6: Publish RTMP stream to proxy ---
# Path: ffmpeg -> proxy (:$PROXY_RTMP_PORT) -> edge (:$EDGE_RTMP_PORT, mode remote forwards
# publish) -> origin (:$ORIGIN_RTMP_PORT). Verify the publish reached the
# upstream origin via the origin's HTTP API.
echo "=== Step 6: Publishing $STREAM_PATH through proxy -> edge -> origin ==="
ffmpeg -stream_loop -1 -re -i "$SOURCE_FLV" -c copy -f flv \
"rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" >/tmp/srs-ffmpeg-edge-e2e.log 2>&1 &
PUBLISH_PID=$!
echo "Publisher PID: $PUBLISH_PID"
echo "Waiting for stream to reach origin (up to 15s)..."
reached_origin=0
for i in $(seq 1 15); do
if api_has_stream "$ORIGIN_API_PORT" "$STREAM_NAME"; then
reached_origin=1
echo "Stream visible on origin after ${i}s."
break
fi
sleep 1
done
if [[ $reached_origin -ne 1 ]]; then
echo "FAIL: stream did not reach upstream origin via edge." >&2
echo "Publisher logs:" >&2
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
echo "Edge logs:" >&2
cat /tmp/srs-edge-e2e.log >&2
exit 1
fi
if ! kill -0 "$PUBLISH_PID" 2>/dev/null; then
echo "Error: publisher exited unexpectedly. Logs:" >&2
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
exit 1
fi
echo "PASS: publish path proxy -> edge -> origin works."
# --- Step 7: Two concurrent players on the same stream ---
# Player 1 attaches first and triggers the edge-pull from origin. Player 2
# joins a few seconds later as a late joiner on the already-active edge-pull.
# Both must succeed — this is the proxy-side analogue of the C++ edge late-
# join fix.
echo "=== Step 7: Two concurrent RTMP players via proxy ==="
PLAY_DURATION=8
PLAYER_URL="rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH"
echo "Starting player 1 (immediate)..."
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
>/tmp/srs-player1-edge-e2e.log 2>&1 &
PLAYER1_PID=$!
sleep 3
echo "Starting player 2 (late joiner, +3s)..."
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
>/tmp/srs-player2-edge-e2e.log 2>&1 &
PLAYER2_PID=$!
# Wait for both players to finish.
player1_rc=0
player2_rc=0
wait "$PLAYER1_PID" || player1_rc=$?
wait "$PLAYER2_PID" || player2_rc=$?
# Clear PIDs so cleanup() doesn't try to re-kill exited processes.
PLAYER1_PID=""
PLAYER2_PID=""
check_player() {
local label=$1
local rc=$2
local log=$3
if [[ $rc -ne 0 ]]; then
echo "FAIL: $label exited with code $rc. Logs:" >&2
cat "$log" >&2
exit 1
fi
# Decoded-frames check — ffmpeg's progress lines contain `frame=` once it has
# successfully started decoding. Catches "dimensions not set"-style failures
# where ffmpeg returns 0 but never produced output.
if ! grep -qE 'frame= *[1-9]' "$log"; then
echo "FAIL: $label produced no frames. Logs:" >&2
cat "$log" >&2
exit 1
fi
echo "PASS: $label played successfully."
}
check_player "player 1" "$player1_rc" /tmp/srs-player1-edge-e2e.log
check_player "player 2 (late joiner)" "$player2_rc" /tmp/srs-player2-edge-e2e.log
# --- Step 8: Final probe via proxy confirms A/V is still queryable ---
echo "=== Step 8: Final ffprobe verification via proxy ==="
verify_probe_has_av "rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" "proxy $STREAM_PATH"
echo ""
echo "=== E2E Proxy + Edge + Origin Test PASSED ==="

View File

@ -0,0 +1,39 @@
# Edge for the proxy+edge E2E test (skills/srs-develop).
# Registers with the proxy via heartbeat, and in mode remote forwards publishes
# to and pulls plays from origin-for-edge.conf (RTMP :19360).
max_connections 1000;
pid objs/edge-for-proxy.pid;
daemon off;
srs_log_tank console;
rtmp {
listen 19361;
}
http_server {
enabled on;
listen 8091;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 19861;
}
heartbeat {
enabled on;
interval 9;
url http://127.0.0.1:12025/api/v1/srs/register;
device_id edge-for-proxy;
ports on;
}
vhost __defaultVhost__ {
cluster {
mode remote;
origin 127.0.0.1:19360;
}
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
}
}

View File

@ -0,0 +1,19 @@
# Upstream origin for the proxy+edge E2E test (skills/srs-develop).
# Not registered with the proxy; the edge in front of it is. The edge pulls
# from this origin on play and pushes publishes here on publish.
max_connections 1000;
pid objs/origin-for-edge.pid;
daemon off;
srs_log_tank console;
rtmp {
listen 19360;
}
http_api {
enabled on;
listen 19860;
}
vhost __defaultVhost__ {
}

2
trunk/configure vendored
View File

@ -237,7 +237,7 @@ fi
MODULE_ID="CORE" MODULE_ID="CORE"
MODULE_DEPENDS=() MODULE_DEPENDS=()
ModuleLibIncs=(${SRS_OBJS}) ModuleLibIncs=(${SRS_OBJS})
MODULE_FILES=("srs_core" "srs_core_version" "srs_core_version7" "srs_core_autofree" MODULE_FILES=("srs_core" "srs_core_version" "srs_core_version8" "srs_core_autofree"
"srs_core_time" "srs_core_platform" "srs_core_deprecated" "srs_core_performance") "srs_core_time" "srs_core_platform" "srs_core_deprecated" "srs_core_performance")
CORE_INCS="src/core"; MODULE_DIR=${CORE_INCS} . $SRS_WORKDIR/auto/modules.sh CORE_INCS="src/core"; MODULE_DIR=${CORE_INCS} . $SRS_WORKDIR/auto/modules.sh
CORE_OBJS="${MODULE_OBJS[@]}" CORE_OBJS="${MODULE_OBJS[@]}"

View File

@ -4,6 +4,14 @@
The changelog for SRS. The changelog for SRS.
<a name="v8-changes"></a>
## SRS 8.0 Changelog
* v8.0, 2026-05-28, Merge [#4680](https://github.com/ossrs/srs/pull/4680): RTMP: Fix chunk timestamp/basic-header decoding and harden packet unmarshal. v8.0.3 (#4680)
* v8.0, 2026-05-19, Merge [#4678](https://github.com/ossrs/srs/pull/4678): Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v8.0.2 (#4678)
* v8.0, 2026-05-17, Merge [#4676](https://github.com/ossrs/srs/pull/4676): Proxy: Fix RTC/SRT reader goroutine leak; unwrap legacy WHEP JSON envelope; add WHEP pprof guide. v8.0.1 (#4676)
* v8.0, 2026-05-17, Init SRS 8.0, code Free. v8.0.0
<a name="v7-changes"></a> <a name="v7-changes"></a>
## SRS 7.0 Changelog ## SRS 7.0 Changelog

View File

@ -1076,7 +1076,7 @@ SrsHttpStreamServer::SrsHttpStreamServer()
void SrsHttpStreamServer::assemble() void SrsHttpStreamServer::assemble()
{ {
SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_); SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_);
if (!mux) { if (mux) {
mux->add_dynamic_matcher(this); mux->add_dynamic_matcher(this);
} }
} }

View File

@ -2559,7 +2559,10 @@ srs_error_t SrsLiveSource::consumer_dumps(ISrsLiveConsumer *consumer, bool ds, b
} }
// If stream is publishing, dumps the sequence header and gop cache. // If stream is publishing, dumps the sequence header and gop cache.
bool hub_active = hub_ ? hub_->active() : false; // On edge, hub_ is NULL; the source is "publishing" once the edge-pull has
// populated the meta cache. Late-joining consumers must still receive the
// cached metadata + sequence headers + GOP via this path.
bool hub_active = hub_ ? hub_->active() : (meta_->data() || meta_->vsh() || meta_->ash());
if (hub_active) { if (hub_active) {
// Copy metadata and sequence header to consumer. // Copy metadata and sequence header to consumer.
if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) { if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) {

View File

@ -7,6 +7,6 @@
#ifndef SRS_CORE_VERSION_HPP #ifndef SRS_CORE_VERSION_HPP
#define SRS_CORE_VERSION_HPP #define SRS_CORE_VERSION_HPP
#include <srs_core_version7.hpp> #include <srs_core_version8.hpp>
#endif #endif

View File

@ -0,0 +1,7 @@
//
// Copyright (c) 2013-2026 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_core_version8.hpp>

View File

@ -0,0 +1,14 @@
//
// Copyright (c) 2013-2026 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_CORE_VERSION8_HPP
#define SRS_CORE_VERSION8_HPP
#define VERSION_MAJOR 8
#define VERSION_MINOR 0
#define VERSION_REVISION 3
#endif