Compare commits
2 Commits
develop
...
7.0release
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
133f66afba | ||
|
|
386a3768df |
149
docs/perf/proxy-whep.md
Normal file
149
docs/perf/proxy-whep.md
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
# How to Analyze WHEP Performance for the Proxy Server
|
||||||
|
|
||||||
|
This guide walks through profiling the Go proxy under a WHEP (WebRTC play) load.
|
||||||
|
The workload of interest is **one RTMP publisher + N WHEP players**, where N is
|
||||||
|
large enough to stress the proxy's UDP forwarding path (typically 300+).
|
||||||
|
|
||||||
|
When analyzing WHEP performance for the proxy, you should:
|
||||||
|
|
||||||
|
1. Set up the topology: proxy + SRS origin + publisher + WHEP load
|
||||||
|
2. Enable Go pprof on the proxy
|
||||||
|
3. Run the load and let it warm up
|
||||||
|
4. Collect CPU, allocation, heap, goroutine, and trace profiles
|
||||||
|
5. Read the profiles and identify hot spots
|
||||||
|
6. Save profiles to compare before and after a change
|
||||||
|
|
||||||
|
## Step 1: Build and Start the Proxy with pprof
|
||||||
|
|
||||||
|
The proxy reads `GO_PPROF` from the environment and, when set, exposes
|
||||||
|
`net/http/pprof` endpoints at that address. Use the same standard ports SRS
|
||||||
|
uses by default so the publisher and player commands stay unchanged.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs
|
||||||
|
make && env GO_PPROF=:6060 \
|
||||||
|
PROXY_RTMP_SERVER=1935 PROXY_HTTP_SERVER=8080 \
|
||||||
|
PROXY_HTTP_API=1985 PROXY_WEBRTC_SERVER=8000 PROXY_SRT_SERVER=10080 \
|
||||||
|
PROXY_SYSTEM_API=12025 PROXY_LOAD_BALANCER_TYPE=memory \
|
||||||
|
./bin/srs-proxy
|
||||||
|
```
|
||||||
|
|
||||||
|
> The pprof endpoints live under `http://localhost:6060/debug/pprof/`. The
|
||||||
|
> proxy registers them only because `internal/debug/pprof.go` blank-imports
|
||||||
|
> `net/http/pprof`. Without that import the endpoints return 404.
|
||||||
|
|
||||||
|
## Step 2: Start the SRS Origin on Alt Ports
|
||||||
|
|
||||||
|
`origin1-for-proxy.conf` runs SRS on non-standard ports (RTMP 19351, HTTP 8081,
|
||||||
|
API 19851, RTC 8001/udp, SRT 10081) so the proxy can sit on the defaults. SRS
|
||||||
|
auto-registers with the proxy's system API on startup.
|
||||||
|
|
||||||
|
Set `CANDIDATE` to a LAN-reachable IP so the SDP answer the proxy returns
|
||||||
|
points clients at an address they can route to. The proxy only rewrites the
|
||||||
|
candidate **port**; the IP comes from the origin's SDP.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ulimit -n 10000 && bash -c "cd ~/git/srs/trunk && \
|
||||||
|
CANDIDATE=192.168.3.187 ./objs/srs -c conf/origin1-for-proxy.conf"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 3: Run the WHEP Workload
|
||||||
|
|
||||||
|
In separate terminals, start the publisher and the WHEP load generator.
|
||||||
|
|
||||||
|
**Publisher (RTMP):**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs/trunk
|
||||||
|
ffmpeg -stream_loop -1 -re -i doc/source.200kbps.768x320.flv \
|
||||||
|
-c copy -f flv -y rtmp://localhost/live/livestream
|
||||||
|
```
|
||||||
|
|
||||||
|
**WHEP players (use the LAN IP that matches `CANDIDATE`):**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd ~/git/srs/trunk/3rdparty/srs-bench
|
||||||
|
./objs/srs_bench -sr webrtc://192.168.3.187/live/livestream -nn 300
|
||||||
|
```
|
||||||
|
|
||||||
|
Let the workload run for at least 30 seconds before sampling. Connection
|
||||||
|
setup churn dominates the first few seconds and will skew profiles taken
|
||||||
|
too early.
|
||||||
|
|
||||||
|
> Sanity-check with `-nn 1` first. If a single WHEP session does not play,
|
||||||
|
> the 300-player run is testing something other than steady-state forwarding.
|
||||||
|
|
||||||
|
## Step 4: Collect Profiles
|
||||||
|
|
||||||
|
Profiles must be collected **while the workload is steady**, not before or
|
||||||
|
after. The CPU profile is the single most useful starting point.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# CPU profile (30s sample) — interactive web UI on :8123
|
||||||
|
# Use :8123 (or any free port) because :8080 is the proxy's HTTP-FLV/HLS port.
|
||||||
|
go tool pprof -http=:8123 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# Allocation profile — GC pressure / per-packet allocations
|
||||||
|
go tool pprof -http=:8124 http://localhost:6060/debug/pprof/allocs
|
||||||
|
|
||||||
|
# Heap (live memory snapshot)
|
||||||
|
go tool pprof -http=:8125 http://localhost:6060/debug/pprof/heap
|
||||||
|
|
||||||
|
# Goroutine count + stack dump — look for goroutine explosion under load
|
||||||
|
curl -s 'http://localhost:6060/debug/pprof/goroutine?debug=1' | head -50
|
||||||
|
|
||||||
|
# Runtime trace (10s) — GC pauses, scheduler latency, syscall behavior
|
||||||
|
curl -s -o trace.out 'http://localhost:6060/debug/pprof/trace?seconds=10'
|
||||||
|
go tool trace trace.out
|
||||||
|
```
|
||||||
|
|
||||||
|
The web UI requires Graphviz for the Flame Graph and Graph views:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
brew install graphviz # macOS
|
||||||
|
```
|
||||||
|
|
||||||
|
If you cannot install Graphviz, the **Top** view in the web UI is HTML-only
|
||||||
|
and works without it. The CLI form is also unaffected:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go tool pprof 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
(pprof) top20
|
||||||
|
(pprof) top20 -cum
|
||||||
|
(pprof) list <FunctionName>
|
||||||
|
```
|
||||||
|
|
||||||
|
## Step 5: Read the Profiles
|
||||||
|
|
||||||
|
Open the web UI and use the views in this order:
|
||||||
|
|
||||||
|
1. **Flame Graph** — visual hot path. Wide bars near the top are where time
|
||||||
|
is spent. For 300-player WHEP the path should be dominated by
|
||||||
|
`webRTCProxyServer.Run` and its UDP read/write children.
|
||||||
|
2. **Top** — sorted list by `flat` (self time) and `cum` (cumulative). The
|
||||||
|
top 5–10 functions usually tell the whole story.
|
||||||
|
3. **Graph** — call graph with edge weights. Good for tracing "who calls this
|
||||||
|
hot function".
|
||||||
|
4. **Source** — line-level cost inside a single function. Use after Top has
|
||||||
|
pointed you at a function worth dissecting.
|
||||||
|
|
||||||
|
## Step 6: Save Profiles for Before/After Comparison
|
||||||
|
|
||||||
|
When you change code to fix a hot spot, comparing profiles is the only
|
||||||
|
reliable way to confirm the fix moved the needle (and didn't just shift cost
|
||||||
|
elsewhere).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Save the raw profile from a baseline run
|
||||||
|
curl -s -o cpu-before.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# After the code change, sample again under the same workload
|
||||||
|
curl -s -o cpu-after.pb.gz 'http://localhost:6060/debug/pprof/profile?seconds=30'
|
||||||
|
|
||||||
|
# Diff the two
|
||||||
|
go tool pprof -http=:8123 -base cpu-before.pb.gz cpu-after.pb.gz
|
||||||
|
```
|
||||||
|
|
||||||
|
In the diff view, red bars are functions that got more expensive, green
|
||||||
|
bars are functions that got cheaper. The total should shrink overall if
|
||||||
|
the change is a net win.
|
||||||
|
|
@ -6,6 +6,7 @@ package debug
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
|
|
||||||
"srsx/internal/env"
|
"srsx/internal/env"
|
||||||
"srsx/internal/logger"
|
"srsx/internal/logger"
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package proxy
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
@ -241,13 +242,17 @@ func (v *webRTCProxyServer) proxyApiToBackend(
|
||||||
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
|
localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the ice-ufrag and ice-pwd from local SDP answer.
|
// Fetch the ice-ufrag and ice-pwd from local SDP answer. The legacy SRS
|
||||||
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(remoteSDPOffer)
|
// /rtc/v1/play/ and /rtc/v1/publish/ APIs wrap the SDP in a JSON envelope
|
||||||
|
// like {"sdp":"v=0\r\n..."}, so unwrap it before parsing ICE attributes.
|
||||||
|
// The forwarded bytes and the in-body candidate port rewrite still operate
|
||||||
|
// on the raw envelope, which is what the client expects to see back.
|
||||||
|
remoteICEUfrag, remoteICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(remoteSDPOffer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "parse remote sdp offer")
|
return errors.Wrapf(err, "parse remote sdp offer")
|
||||||
}
|
}
|
||||||
|
|
||||||
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(localSDPAnswer)
|
localICEUfrag, localICEPwd, err := utils.ParseIceUfragPwd(unwrapSDPEnvelope(localSDPAnswer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "parse local sdp answer")
|
return errors.Wrapf(err, "parse local sdp answer")
|
||||||
}
|
}
|
||||||
|
|
@ -297,8 +302,12 @@ func (v *webRTCProxyServer) Run(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
defer v.wg.Done()
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
// Reuse a single receive buffer across iterations. handleClientUDP and the
|
||||||
|
// downstream HandlePacket consume the slice synchronously (kernel sendto
|
||||||
|
// copies bytes; STUN parsing copies the username via string()), so no caller
|
||||||
|
// retains the slice past the call.
|
||||||
|
buf := make([]byte, 4096)
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
buf := make([]byte, 4096)
|
|
||||||
n, addr, err := listener.ReadFrom(buf)
|
n, addr, err := listener.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||||
|
|
@ -414,6 +423,11 @@ type rtcConnection struct {
|
||||||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||||
|
|
||||||
|
// Guards the spawn of the backend->client reader goroutine. HandlePacket is
|
||||||
|
// called on every inbound client packet (STUN keepalives + RTCP feedback at
|
||||||
|
// steady state) but the reader must only start once per connection.
|
||||||
|
startReader stdSync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
|
func newRTCConnection(opts ...func(*rtcConnection)) *rtcConnection {
|
||||||
|
|
@ -462,24 +476,31 @@ func (v *rtcConnection) HandlePacket(addr net.Addr, data []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy all messages from backend to client.
|
// Spawn the backend->client reader exactly once per connection. Previously
|
||||||
go func() {
|
// this goroutine was launched unconditionally here on every inbound client
|
||||||
for ctx.Err() == nil {
|
// packet, which leaked tens of thousands of goroutines under steady-state
|
||||||
|
// WHEP load (STUN keepalives + RTCP feedback). The buffer is reused across
|
||||||
|
// iterations: WriteTo copies into the kernel before returning, so the next
|
||||||
|
// Read can safely overwrite.
|
||||||
|
v.startReader.Do(func() {
|
||||||
|
go func() {
|
||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
n, err := v.backendUDP.Read(buf)
|
for ctx.Err() == nil {
|
||||||
if err != nil {
|
n, err := v.backendUDP.Read(buf)
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
if err != nil {
|
||||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
break
|
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
if _, err = v.listenerUDP.WriteTo(buf[:n], v.clientUDP); err != nil {
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||||
break
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
}()
|
})
|
||||||
|
|
||||||
if _, err := v.backendUDP.Write(data); err != nil {
|
if _, err := v.backendUDP.Write(data); err != nil {
|
||||||
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
||||||
|
|
@ -520,6 +541,25 @@ func (v *rtcConnection) connectBackend(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// unwrapSDPEnvelope returns the SDP string carried inside the legacy SRS RTC
|
||||||
|
// JSON envelope used by /rtc/v1/play/ and /rtc/v1/publish/, e.g. body of the
|
||||||
|
// form {"sdp":"v=0\r\n...", ...}. For standards-based WHIP/WHEP bodies (raw
|
||||||
|
// SDP), or any input we can't recognise, the original body is returned
|
||||||
|
// unchanged so the caller can parse it as raw SDP.
|
||||||
|
func unwrapSDPEnvelope(body string) string {
|
||||||
|
trimmed := strings.TrimLeft(body, " \t\r\n")
|
||||||
|
if !strings.HasPrefix(trimmed, "{") {
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
var env struct {
|
||||||
|
SDP string `json:"sdp"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(trimmed), &env); err != nil || env.SDP == "" {
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
return env.SDP
|
||||||
|
}
|
||||||
|
|
||||||
type rtcICEPair struct {
|
type rtcICEPair struct {
|
||||||
// The remote ufrag, used for ICE username and session id.
|
// The remote ufrag, used for ICE username and session id.
|
||||||
RemoteICEUfrag string `json:"remote_ufrag"`
|
RemoteICEUfrag string `json:"remote_ufrag"`
|
||||||
|
|
|
||||||
|
|
@ -896,6 +896,95 @@ func TestWebRTCProxyServer_HandleApiForWHEP_HappyPath(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Legacy /rtc/v1/play/ (used by srs_bench) wraps the SDP in a JSON envelope
|
||||||
|
// like {"sdp":"v=0\r\n..."} where \r\n is the literal 2-byte JSON escape, not
|
||||||
|
// real CRLF. The proxy must unwrap the envelope before parsing ICE attributes;
|
||||||
|
// otherwise the stored ufrag is contaminated with the next attributes and the
|
||||||
|
// STUN binding from the client cannot be matched to the connection.
|
||||||
|
func TestWebRTCProxyServer_HandleApiForWHEP_LegacyJSONEnvelope(t *testing.T) {
|
||||||
|
f := newWebRTCFixture()
|
||||||
|
f.env.WebRTCServerReturns("19000")
|
||||||
|
|
||||||
|
const backendRTCPort = "18000"
|
||||||
|
answerJSON := `{"code":0,"sessionid":"sid","sdp":"v=0\r\na=ice-ufrag:local-ufrag\r\na=ice-pwd:local-pwd-very-long-value-32xxxx\r\na=candidate:1 1 udp 1 1.2.3.4 ` + backendRTCPort + ` typ host\r\n"}`
|
||||||
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = io.ReadAll(r.Body)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write([]byte(answerJSON))
|
||||||
|
}))
|
||||||
|
defer backend.Close()
|
||||||
|
|
||||||
|
f.server.backendURL = func(b *lb.OriginServer, r *http.Request) (string, error) {
|
||||||
|
return backend.URL + r.URL.Path, nil
|
||||||
|
}
|
||||||
|
f.lb.PickReturns(&lb.OriginServer{IP: "10.0.0.1", API: []string{"1985"}, RTC: []string{backendRTCPort}}, nil)
|
||||||
|
|
||||||
|
offerJSON := `{"api":"http://10.0.0.1:1985/rtc/v1/play/","clientip":"","sdp":"v=0\r\na=ice-ufrag:remote-ufrag\r\na=ice-pwd:remote-pwd-very-long-value-32xx\r\n","streamurl":"webrtc://example.com/live/demo"}`
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "http://example.com/rtc/v1/play/", strings.NewReader(offerJSON))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
if err := f.server.HandleApiForWHEP(context.Background(), rec, req); err != nil {
|
||||||
|
t.Fatalf("WHEP: %v", err)
|
||||||
|
}
|
||||||
|
if f.lb.StoreWebRTCCallCount() != 1 {
|
||||||
|
t.Fatalf("StoreWebRTC called %d times, want 1", f.lb.StoreWebRTCCallCount())
|
||||||
|
}
|
||||||
|
_, _, stored := f.lb.StoreWebRTCArgsForCall(0)
|
||||||
|
if got, want := stored.GetUfrag(), "local-ufrag:remote-ufrag"; got != want {
|
||||||
|
t.Fatalf("stored ufrag=%q, want %q", got, want)
|
||||||
|
}
|
||||||
|
// The response forwarded to the client should still be the JSON envelope
|
||||||
|
// with the backend port rewritten to the proxy's WebRTC port.
|
||||||
|
body := rec.Body.String()
|
||||||
|
if !strings.Contains(body, " 19000 typ host") {
|
||||||
|
t.Fatalf("answer did not rewrite backend port; got %q", body)
|
||||||
|
}
|
||||||
|
if strings.Contains(body, " "+backendRTCPort+" typ host") {
|
||||||
|
t.Fatalf("answer still contains original backend port; got %q", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnwrapSDPEnvelope(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
in string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "raw sdp passthrough",
|
||||||
|
in: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
want: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json envelope unwrapped",
|
||||||
|
in: `{"code":0,"sdp":"v=0\r\na=ice-ufrag:abc\r\n"}`,
|
||||||
|
want: "v=0\r\na=ice-ufrag:abc\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json envelope with leading whitespace",
|
||||||
|
in: "\n\t " + `{"sdp":"v=0\r\n"}`,
|
||||||
|
want: "v=0\r\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "malformed json falls back to body",
|
||||||
|
in: `{not json}`,
|
||||||
|
want: `{not json}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "json without sdp falls back to body",
|
||||||
|
in: `{"code":0}`,
|
||||||
|
want: `{"code":0}`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
if got := unwrapSDPEnvelope(tc.in); got != tc.want {
|
||||||
|
t.Fatalf("unwrapSDPEnvelope(%q)=%q, want %q", tc.in, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// webRTCProxyServer.proxyApiToBackend: error paths
|
// webRTCProxyServer.proxyApiToBackend: error paths
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -100,8 +100,11 @@ func (v *srsSRTProxyServer) Run(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
defer v.wg.Done()
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
// Reuse a single receive buffer across iterations. SRTHandshakePacket.UnmarshalBinary
|
||||||
|
// clones ExtraData, and backendUDP.Write copies into the kernel before returning, so
|
||||||
|
// no caller retains a reference to this slice past the call.
|
||||||
|
buf := make([]byte, 4096)
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
buf := make([]byte, 4096)
|
|
||||||
n, caddr, err := v.listener.ReadFrom(buf)
|
n, caddr, err := v.listener.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If context is canceled or connection is closed, exit gracefully without logging error.
|
// If context is canceled or connection is closed, exit gracefully without logging error.
|
||||||
|
|
@ -196,6 +199,11 @@ type SRTConnection struct {
|
||||||
handshake2 *SRTHandshakePacket
|
handshake2 *SRTHandshakePacket
|
||||||
handshake3 *SRTHandshakePacket
|
handshake3 *SRTHandshakePacket
|
||||||
|
|
||||||
|
// Guards the spawn of the backend->client reader goroutine. Keeps the spawn
|
||||||
|
// idempotent if the client retries handshake 2 (e.g. because our handshake 3
|
||||||
|
// was dropped) and re-enters the handshake path.
|
||||||
|
startReader stdSync.Once
|
||||||
|
|
||||||
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
// dialBackendUDP opens a UDP connection to a backend SRS server. Defaults to a real
|
||||||
// UDP dial; tests may override via a functional option to supply a fake connection.
|
// UDP dial; tests may override via a functional option to supply a fake connection.
|
||||||
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
dialBackendUDP func(ctx context.Context, ip string, port int) (io.ReadWriteCloser, error)
|
||||||
|
|
@ -349,23 +357,29 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
|
||||||
return errors.Wrapf(err, "write handshake 3")
|
return errors.Wrapf(err, "write handshake 3")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a goroutine to proxy message from backend to client.
|
// Start a goroutine to proxy message from backend to client. The Once guard
|
||||||
|
// keeps the spawn idempotent if the client retries handshake 2 (because our
|
||||||
|
// handshake 3 was lost); the existing reader keeps running and we don't want
|
||||||
|
// a second one racing it on backendUDP.Read.
|
||||||
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
||||||
go func() {
|
v.startReader.Do(func() {
|
||||||
for ctx.Err() == nil {
|
go func() {
|
||||||
nn, err := v.backendUDP.Read(b)
|
buf := make([]byte, 4096)
|
||||||
if err != nil {
|
for ctx.Err() == nil {
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
nn, err := v.backendUDP.Read(buf)
|
||||||
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
if err != nil {
|
||||||
return
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
|
logger.Warn(ctx, "read from backend failed, err=%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = v.listenerUDP.WriteTo(buf[:nn], addr); err != nil {
|
||||||
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||||
|
logger.Warn(ctx, "write to client failed, err=%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if _, err = v.listenerUDP.WriteTo(b[:nn], addr); err != nil {
|
}()
|
||||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
})
|
||||||
logger.Warn(ctx, "write to client failed, err=%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -583,7 +597,10 @@ func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
|
||||||
// Only support IPv4.
|
// Only support IPv4.
|
||||||
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
||||||
|
|
||||||
v.ExtraData = b[64:]
|
// Clone so ExtraData owns its bytes independently of the caller's buffer.
|
||||||
|
// Without this, callers that reuse the receive buffer would silently corrupt
|
||||||
|
// any decoded packet they keep alive (e.g. v.handshake0 / v.handshake2).
|
||||||
|
v.ExtraData = bytes.Clone(b[64:])
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -211,10 +211,13 @@ func SrtParseSocketID(data []byte) uint32 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP.
|
// ParseIceUfragPwd parse the ice-ufrag and ice-pwd from the SDP. The value class
|
||||||
|
// stops at any whitespace (real CRLF in raw SDP) or at a backslash, so the parser
|
||||||
|
// is also safe against JSON-escaped SDP bodies where line breaks appear as the
|
||||||
|
// 2-byte sequence "\r" / "\n" rather than real control characters.
|
||||||
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
||||||
if true {
|
if true {
|
||||||
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s]+)`)
|
ufragRe := regexp.MustCompile(`a=ice-ufrag:([^\s\\]+)`)
|
||||||
ufragMatch := ufragRe.FindStringSubmatch(sdp)
|
ufragMatch := ufragRe.FindStringSubmatch(sdp)
|
||||||
if len(ufragMatch) <= 1 {
|
if len(ufragMatch) <= 1 {
|
||||||
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
|
return "", "", errors.Errorf("no ice-ufrag in sdp %v", sdp)
|
||||||
|
|
@ -223,7 +226,7 @@ func ParseIceUfragPwd(sdp string) (ufrag, pwd string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if true {
|
if true {
|
||||||
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s]+)`)
|
pwdRe := regexp.MustCompile(`a=ice-pwd:([^\s\\]+)`)
|
||||||
pwdMatch := pwdRe.FindStringSubmatch(sdp)
|
pwdMatch := pwdRe.FindStringSubmatch(sdp)
|
||||||
if len(pwdMatch) <= 1 {
|
if len(pwdMatch) <= 1 {
|
||||||
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)
|
return "", "", errors.Errorf("no ice-pwd in sdp %v", sdp)
|
||||||
|
|
|
||||||
|
|
@ -338,6 +338,24 @@ func TestParseIceUfragPwd_MissingPwd(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SDP embedded in the legacy /rtc/v1/play/ JSON envelope arrives with "\r\n" as
|
||||||
|
// the literal 2-byte sequence (backslash + r/n), not real CRLF. The value
|
||||||
|
// charset must stop at the backslash, otherwise the ufrag would absorb the rest
|
||||||
|
// of the SDP up to the next real whitespace.
|
||||||
|
func TestParseIceUfragPwd_JSONEscapedSDP(t *testing.T) {
|
||||||
|
sdp := `v=0\r\na=ice-ufrag:1f1n4272\r\na=ice-pwd:5f6y69408x2h55232i080mj894901b8n\r\na=fingerprint:sha-256 2D:1D\r\n`
|
||||||
|
ufrag, pwd, err := ParseIceUfragPwd(sdp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err = %v", err)
|
||||||
|
}
|
||||||
|
if ufrag != "1f1n4272" {
|
||||||
|
t.Fatalf("ufrag=%q, want 1f1n4272", ufrag)
|
||||||
|
}
|
||||||
|
if pwd != "5f6y69408x2h55232i080mj894901b8n" {
|
||||||
|
t.Fatalf("pwd=%q, want 5f6y69408x2h55232i080mj894901b8n", pwd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseSRTStreamID_WithHost(t *testing.T) {
|
func TestParseSRTStreamID_WithHost(t *testing.T) {
|
||||||
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
|
host, resource, err := ParseSRTStreamID("h=example.com,r=live/stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ func VersionMinor() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func VersionRevision() int {
|
func VersionRevision() int {
|
||||||
return 148
|
return 150
|
||||||
}
|
}
|
||||||
|
|
||||||
func Version() string {
|
func Version() string {
|
||||||
|
|
|
||||||
|
|
@ -303,6 +303,9 @@ The knowledge base (`memory/srs-*.md`) captures William's knowledge about SRS
|
||||||
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
|
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
|
||||||
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
|
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
|
||||||
|
|
||||||
|
**Next-Generation Server Performance Docs** (`docs/perf/`) — Performance analysis guides for the Go server:
|
||||||
|
- `proxy-whep.md` — WHEP perf analysis: enable GO_PPROF, run publisher + N WHEP players via srs_bench, collect CPU/alloc/heap/goroutine/trace profiles, read hot spots, diff before/after with `pprof -base`
|
||||||
|
|
||||||
**Next-Generation Server API Examples** — Executable API documentation:
|
**Next-Generation Server API Examples** — Executable API documentation:
|
||||||
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow
|
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -152,6 +152,10 @@ Only after the user confirms the routing do you proceed to Step 2.
|
||||||
```
|
```
|
||||||
bash scripts/proxy-e2e-cluster-test.sh
|
bash scripts/proxy-e2e-cluster-test.sh
|
||||||
```
|
```
|
||||||
|
- Proxy + SRS edge + SRS origin three-tier topology (starts proxy + one SRS edge in `mode remote` registered with the proxy + one upstream SRS origin, publishes RTMP via proxy→edge→origin, then plays the same stream with two concurrent RTMP players where the second joins after a delay as a late joiner on the active edge-pull):
|
||||||
|
```
|
||||||
|
bash scripts/proxy-e2e-edge-test.sh
|
||||||
|
```
|
||||||
- Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other):
|
- Redis multi-proxy routing test (requires local Redis; starts two proxy instances with Redis LB, publishes through one proxy, verifies playback through the other):
|
||||||
```
|
```
|
||||||
bash scripts/proxy-e2e-redis-test.sh
|
bash scripts/proxy-e2e-redis-test.sh
|
||||||
|
|
|
||||||
342
skills/srs-develop/scripts/proxy-e2e-edge-test.sh
Executable file
342
skills/srs-develop/scripts/proxy-e2e-edge-test.sh
Executable file
|
|
@ -0,0 +1,342 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# E2E test for proxy + edge + origin: starts proxy + one SRS edge (registered
|
||||||
|
# with proxy) + one SRS upstream origin (not registered). Publishes a single
|
||||||
|
# RTMP stream through proxy -> edge -> origin, then plays the stream twice
|
||||||
|
# concurrently from the proxy -> edge with a delay so the second player is a
|
||||||
|
# late joiner on an already-active edge-pull. Both players must succeed.
|
||||||
|
set -e
|
||||||
|
|
||||||
|
SCRIPT_DIR="$(cd -P "$(dirname "$0")" && pwd)"
|
||||||
|
# Walk up from SCRIPT_DIR looking for go.mod. This avoids brittle "../../../.."
|
||||||
|
# counting when the skills directory is reached via a symlink (which changes
|
||||||
|
# the symbolic vs. physical depth).
|
||||||
|
WORKSPACE="$SCRIPT_DIR"
|
||||||
|
while [[ "$WORKSPACE" != "/" && ! -f "$WORKSPACE/go.mod" ]]; do
|
||||||
|
WORKSPACE="$(dirname "$WORKSPACE")"
|
||||||
|
done
|
||||||
|
|
||||||
|
if [[ ! -f "$WORKSPACE/go.mod" ]]; then
|
||||||
|
echo "Error: go.mod not found walking up from: $SCRIPT_DIR" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Proxy ports — high range, avoids the SRS port range.
|
||||||
|
PROXY_RTMP_PORT=11935
|
||||||
|
PROXY_HTTP_API_PORT=11985
|
||||||
|
PROXY_HTTP_SERVER_PORT=18080
|
||||||
|
PROXY_WEBRTC_PORT=18000
|
||||||
|
PROXY_SRT_PORT=20080
|
||||||
|
PROXY_SYSTEM_API_PORT=12025
|
||||||
|
|
||||||
|
# Origin ports (from origin-for-edge.conf) — upstream of the edge, NOT
|
||||||
|
# registered with the proxy. Distinct from origin1/2/3 to avoid collisions
|
||||||
|
# when running this test alongside the other proxy E2E tests.
|
||||||
|
ORIGIN_RTMP_PORT=19360
|
||||||
|
ORIGIN_API_PORT=19860
|
||||||
|
|
||||||
|
# Edge ports (from edge-for-proxy.conf) — what the proxy treats as its backend.
|
||||||
|
EDGE_RTMP_PORT=19361
|
||||||
|
EDGE_HTTP_PORT=8091
|
||||||
|
EDGE_API_PORT=19861
|
||||||
|
|
||||||
|
SOURCE_FLV="$WORKSPACE/trunk/doc/source.flv"
|
||||||
|
SRS_BINARY="$WORKSPACE/trunk/objs/srs"
|
||||||
|
ORIGIN_CONF="$WORKSPACE/trunk/conf/origin-for-edge.conf"
|
||||||
|
EDGE_CONF="$WORKSPACE/trunk/conf/edge-for-proxy.conf"
|
||||||
|
# Randomize per run so each invocation starts from clean state and never
|
||||||
|
# shares state with sibling E2E tests publishing to live/livestream.
|
||||||
|
STREAM_NAME="edge$(date +%s)"
|
||||||
|
STREAM_PATH="live/$STREAM_NAME"
|
||||||
|
|
||||||
|
# PIDs to clean up on exit.
|
||||||
|
PROXY_PID=""
|
||||||
|
ORIGIN_PID=""
|
||||||
|
EDGE_PID=""
|
||||||
|
PUBLISH_PID=""
|
||||||
|
PLAYER1_PID=""
|
||||||
|
PLAYER2_PID=""
|
||||||
|
|
||||||
|
cleanup() {
|
||||||
|
echo ""
|
||||||
|
echo "=== Cleaning up ==="
|
||||||
|
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
|
||||||
|
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
|
||||||
|
kill "$pid" 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
sleep 1
|
||||||
|
for pid in $PUBLISH_PID $PLAYER1_PID $PLAYER2_PID $EDGE_PID $ORIGIN_PID $PROXY_PID; do
|
||||||
|
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
|
||||||
|
kill -9 "$pid" 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
echo "Cleanup done."
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
wait_for_http() {
|
||||||
|
local url=$1
|
||||||
|
local name=$2
|
||||||
|
local i
|
||||||
|
|
||||||
|
for i in $(seq 1 30); do
|
||||||
|
if curl -fsS --max-time 2 "$url" >/dev/null 2>&1; then
|
||||||
|
echo "$name is ready."
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "Error: $name is not ready after 30s: $url" >&2
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
api_has_stream() {
|
||||||
|
local api_port=$1
|
||||||
|
local stream=$2
|
||||||
|
|
||||||
|
curl -fsS --max-time 3 "http://127.0.0.1:$api_port/api/v1/streams/" 2>/dev/null | grep -q "$stream"
|
||||||
|
}
|
||||||
|
|
||||||
|
verify_probe_has_av() {
|
||||||
|
local url=$1
|
||||||
|
local label=$2
|
||||||
|
local probe_output
|
||||||
|
|
||||||
|
probe_output=$(ffprobe -v error -rw_timeout 5000000 -show_streams "$url" 2>&1 || true)
|
||||||
|
|
||||||
|
if ! echo "$probe_output" | grep -q "codec_type=video"; then
|
||||||
|
echo "FAIL: No video stream detected for $label." >&2
|
||||||
|
echo "ffprobe output:" >&2
|
||||||
|
echo "$probe_output" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if ! echo "$probe_output" | grep -q "codec_type=audio"; then
|
||||||
|
echo "FAIL: No audio stream detected for $label." >&2
|
||||||
|
echo "ffprobe output:" >&2
|
||||||
|
echo "$probe_output" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "PASS: Audio/video detected for $label."
|
||||||
|
}
|
||||||
|
|
||||||
|
echo "=== E2E Proxy + Edge + Origin Test ==="
|
||||||
|
echo "Workspace: $WORKSPACE"
|
||||||
|
echo "Stream: $STREAM_PATH"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# --- Pre-checks ---
|
||||||
|
if [[ ! -f "$SOURCE_FLV" ]]; then
|
||||||
|
echo "Error: test source not found: $SOURCE_FLV" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
if [[ ! -f "$ORIGIN_CONF" ]]; then
|
||||||
|
echo "Error: origin conf not found: $ORIGIN_CONF" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
if [[ ! -f "$EDGE_CONF" ]]; then
|
||||||
|
echo "Error: edge conf not found: $EDGE_CONF" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
for tool in ffmpeg ffprobe curl; do
|
||||||
|
if ! command -v "$tool" &>/dev/null; then
|
||||||
|
echo "Error: $tool not found in PATH" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# --- Step 0: Clean up stale state ---
|
||||||
|
rm -f "$WORKSPACE/trunk/objs/origin-for-edge.pid" "$WORKSPACE/trunk/objs/edge-for-proxy.pid"
|
||||||
|
ALL_PORTS="$PROXY_RTMP_PORT $PROXY_HTTP_API_PORT $PROXY_HTTP_SERVER_PORT $PROXY_WEBRTC_PORT $PROXY_SRT_PORT $PROXY_SYSTEM_API_PORT"
|
||||||
|
ALL_PORTS="$ALL_PORTS $ORIGIN_RTMP_PORT $ORIGIN_API_PORT $EDGE_RTMP_PORT $EDGE_HTTP_PORT $EDGE_API_PORT"
|
||||||
|
for port in $ALL_PORTS; do
|
||||||
|
lsof -ti :"$port" 2>/dev/null | xargs kill 2>/dev/null || true
|
||||||
|
done
|
||||||
|
sleep 1
|
||||||
|
|
||||||
|
# --- Step 1: Build proxy ---
|
||||||
|
echo "=== Step 1: Building proxy ==="
|
||||||
|
cd "$WORKSPACE"
|
||||||
|
make -s 2>&1
|
||||||
|
echo "Proxy built: $WORKSPACE/bin/srs-proxy"
|
||||||
|
|
||||||
|
# --- Step 2: Build SRS (if not already built) ---
|
||||||
|
if [[ ! -f "$SRS_BINARY" ]]; then
|
||||||
|
echo "=== Step 2: Building SRS ==="
|
||||||
|
cd "$WORKSPACE/trunk"
|
||||||
|
./configure && make 2>&1 | tail -3
|
||||||
|
echo "SRS built: $SRS_BINARY"
|
||||||
|
else
|
||||||
|
echo "=== Step 2: SRS already built ==="
|
||||||
|
fi
|
||||||
|
|
||||||
|
# --- Step 3: Start proxy ---
|
||||||
|
echo "=== Step 3: Starting proxy (RTMP :$PROXY_RTMP_PORT, System API :$PROXY_SYSTEM_API_PORT) ==="
|
||||||
|
cd "$WORKSPACE"
|
||||||
|
env PROXY_RTMP_SERVER=$PROXY_RTMP_PORT \
|
||||||
|
PROXY_HTTP_API=$PROXY_HTTP_API_PORT \
|
||||||
|
PROXY_HTTP_SERVER=$PROXY_HTTP_SERVER_PORT \
|
||||||
|
PROXY_WEBRTC_SERVER=$PROXY_WEBRTC_PORT \
|
||||||
|
PROXY_SRT_SERVER=$PROXY_SRT_PORT \
|
||||||
|
PROXY_SYSTEM_API=$PROXY_SYSTEM_API_PORT \
|
||||||
|
PROXY_LOAD_BALANCER_TYPE=memory \
|
||||||
|
./bin/srs-proxy >/tmp/srs-proxy-edge-e2e.log 2>&1 &
|
||||||
|
PROXY_PID=$!
|
||||||
|
echo "Proxy PID: $PROXY_PID"
|
||||||
|
|
||||||
|
wait_for_http "http://127.0.0.1:$PROXY_SYSTEM_API_PORT/api/v1/versions" "Proxy System API"
|
||||||
|
|
||||||
|
if ! kill -0 "$PROXY_PID" 2>/dev/null; then
|
||||||
|
echo "Error: proxy failed to start. Logs:" >&2
|
||||||
|
cat /tmp/srs-proxy-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "Proxy started."
|
||||||
|
|
||||||
|
# --- Step 4: Start upstream origin (no proxy heartbeat) ---
|
||||||
|
echo "=== Step 4: Starting upstream SRS origin (RTMP :$ORIGIN_RTMP_PORT) ==="
|
||||||
|
ulimit -n 10000 2>/dev/null || true
|
||||||
|
cd "$WORKSPACE/trunk"
|
||||||
|
./objs/srs -c conf/origin-for-edge.conf >/tmp/srs-origin-edge-e2e.log 2>&1 &
|
||||||
|
ORIGIN_PID=$!
|
||||||
|
echo "Origin PID: $ORIGIN_PID"
|
||||||
|
|
||||||
|
wait_for_http "http://127.0.0.1:$ORIGIN_API_PORT/api/v1/versions" "Origin HTTP API"
|
||||||
|
|
||||||
|
if ! kill -0 "$ORIGIN_PID" 2>/dev/null; then
|
||||||
|
echo "Error: origin failed to start. Logs:" >&2
|
||||||
|
cat /tmp/srs-origin-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "Origin started."
|
||||||
|
|
||||||
|
# --- Step 5: Start edge (mode remote, registered with proxy) ---
|
||||||
|
echo "=== Step 5: Starting SRS edge (RTMP :$EDGE_RTMP_PORT, upstream :$ORIGIN_RTMP_PORT) ==="
|
||||||
|
./objs/srs -c conf/edge-for-proxy.conf >/tmp/srs-edge-e2e.log 2>&1 &
|
||||||
|
EDGE_PID=$!
|
||||||
|
echo "Edge PID: $EDGE_PID"
|
||||||
|
|
||||||
|
wait_for_http "http://127.0.0.1:$EDGE_API_PORT/api/v1/versions" "Edge HTTP API"
|
||||||
|
|
||||||
|
# Wait for the edge to register with the proxy (heartbeat interval is 9s).
|
||||||
|
echo "Waiting for edge to register with proxy (up to 20s)..."
|
||||||
|
for i in $(seq 1 20); do
|
||||||
|
if grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
|
||||||
|
echo "Edge registered with proxy."
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
if ! grep -q "Register SRS media server" /tmp/srs-proxy-edge-e2e.log 2>/dev/null; then
|
||||||
|
echo "Error: edge did not register with proxy after 20s. Proxy logs:" >&2
|
||||||
|
cat /tmp/srs-proxy-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if ! kill -0 "$EDGE_PID" 2>/dev/null; then
|
||||||
|
echo "Error: edge failed to start. Logs:" >&2
|
||||||
|
cat /tmp/srs-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "Edge started and registered."
|
||||||
|
|
||||||
|
# --- Step 6: Publish RTMP stream to proxy ---
|
||||||
|
# Path: ffmpeg -> proxy (:$PROXY_RTMP_PORT) -> edge (:$EDGE_RTMP_PORT, mode remote forwards
|
||||||
|
# publish) -> origin (:$ORIGIN_RTMP_PORT). Verify the publish reached the
|
||||||
|
# upstream origin via the origin's HTTP API.
|
||||||
|
echo "=== Step 6: Publishing $STREAM_PATH through proxy -> edge -> origin ==="
|
||||||
|
ffmpeg -stream_loop -1 -re -i "$SOURCE_FLV" -c copy -f flv \
|
||||||
|
"rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" >/tmp/srs-ffmpeg-edge-e2e.log 2>&1 &
|
||||||
|
PUBLISH_PID=$!
|
||||||
|
echo "Publisher PID: $PUBLISH_PID"
|
||||||
|
|
||||||
|
echo "Waiting for stream to reach origin (up to 15s)..."
|
||||||
|
reached_origin=0
|
||||||
|
for i in $(seq 1 15); do
|
||||||
|
if api_has_stream "$ORIGIN_API_PORT" "$STREAM_NAME"; then
|
||||||
|
reached_origin=1
|
||||||
|
echo "Stream visible on origin after ${i}s."
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
if [[ $reached_origin -ne 1 ]]; then
|
||||||
|
echo "FAIL: stream did not reach upstream origin via edge." >&2
|
||||||
|
echo "Publisher logs:" >&2
|
||||||
|
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
|
||||||
|
echo "Edge logs:" >&2
|
||||||
|
cat /tmp/srs-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if ! kill -0 "$PUBLISH_PID" 2>/dev/null; then
|
||||||
|
echo "Error: publisher exited unexpectedly. Logs:" >&2
|
||||||
|
cat /tmp/srs-ffmpeg-edge-e2e.log >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "PASS: publish path proxy -> edge -> origin works."
|
||||||
|
|
||||||
|
# --- Step 7: Two concurrent players on the same stream ---
|
||||||
|
# Player 1 attaches first and triggers the edge-pull from origin. Player 2
|
||||||
|
# joins a few seconds later as a late joiner on the already-active edge-pull.
|
||||||
|
# Both must succeed — this is the proxy-side analogue of the C++ edge late-
|
||||||
|
# join fix.
|
||||||
|
echo "=== Step 7: Two concurrent RTMP players via proxy ==="
|
||||||
|
PLAY_DURATION=8
|
||||||
|
PLAYER_URL="rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH"
|
||||||
|
|
||||||
|
echo "Starting player 1 (immediate)..."
|
||||||
|
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
|
||||||
|
>/tmp/srs-player1-edge-e2e.log 2>&1 &
|
||||||
|
PLAYER1_PID=$!
|
||||||
|
|
||||||
|
sleep 3
|
||||||
|
|
||||||
|
echo "Starting player 2 (late joiner, +3s)..."
|
||||||
|
ffmpeg -rw_timeout 5000000 -i "$PLAYER_URL" -t $PLAY_DURATION -c copy -f flv -y /dev/null \
|
||||||
|
>/tmp/srs-player2-edge-e2e.log 2>&1 &
|
||||||
|
PLAYER2_PID=$!
|
||||||
|
|
||||||
|
# Wait for both players to finish.
|
||||||
|
player1_rc=0
|
||||||
|
player2_rc=0
|
||||||
|
wait "$PLAYER1_PID" || player1_rc=$?
|
||||||
|
wait "$PLAYER2_PID" || player2_rc=$?
|
||||||
|
# Clear PIDs so cleanup() doesn't try to re-kill exited processes.
|
||||||
|
PLAYER1_PID=""
|
||||||
|
PLAYER2_PID=""
|
||||||
|
|
||||||
|
check_player() {
|
||||||
|
local label=$1
|
||||||
|
local rc=$2
|
||||||
|
local log=$3
|
||||||
|
|
||||||
|
if [[ $rc -ne 0 ]]; then
|
||||||
|
echo "FAIL: $label exited with code $rc. Logs:" >&2
|
||||||
|
cat "$log" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
# Decoded-frames check — ffmpeg's progress lines contain `frame=` once it has
|
||||||
|
# successfully started decoding. Catches "dimensions not set"-style failures
|
||||||
|
# where ffmpeg returns 0 but never produced output.
|
||||||
|
if ! grep -qE 'frame= *[1-9]' "$log"; then
|
||||||
|
echo "FAIL: $label produced no frames. Logs:" >&2
|
||||||
|
cat "$log" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "PASS: $label played successfully."
|
||||||
|
}
|
||||||
|
|
||||||
|
check_player "player 1" "$player1_rc" /tmp/srs-player1-edge-e2e.log
|
||||||
|
check_player "player 2 (late joiner)" "$player2_rc" /tmp/srs-player2-edge-e2e.log
|
||||||
|
|
||||||
|
# --- Step 8: Final probe via proxy confirms A/V is still queryable ---
|
||||||
|
echo "=== Step 8: Final ffprobe verification via proxy ==="
|
||||||
|
verify_probe_has_av "rtmp://localhost:$PROXY_RTMP_PORT/$STREAM_PATH" "proxy $STREAM_PATH"
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== E2E Proxy + Edge + Origin Test PASSED ==="
|
||||||
39
trunk/conf/edge-for-proxy.conf
Normal file
39
trunk/conf/edge-for-proxy.conf
Normal file
|
|
@ -0,0 +1,39 @@
|
||||||
|
|
||||||
|
# Edge for the proxy+edge E2E test (skills/srs-develop).
|
||||||
|
# Registers with the proxy via heartbeat, and in mode remote forwards publishes
|
||||||
|
# to and pulls plays from origin-for-edge.conf (RTMP :19360).
|
||||||
|
|
||||||
|
max_connections 1000;
|
||||||
|
pid objs/edge-for-proxy.pid;
|
||||||
|
daemon off;
|
||||||
|
srs_log_tank console;
|
||||||
|
|
||||||
|
rtmp {
|
||||||
|
listen 19361;
|
||||||
|
}
|
||||||
|
http_server {
|
||||||
|
enabled on;
|
||||||
|
listen 8091;
|
||||||
|
dir ./objs/nginx/html;
|
||||||
|
}
|
||||||
|
http_api {
|
||||||
|
enabled on;
|
||||||
|
listen 19861;
|
||||||
|
}
|
||||||
|
heartbeat {
|
||||||
|
enabled on;
|
||||||
|
interval 9;
|
||||||
|
url http://127.0.0.1:12025/api/v1/srs/register;
|
||||||
|
device_id edge-for-proxy;
|
||||||
|
ports on;
|
||||||
|
}
|
||||||
|
vhost __defaultVhost__ {
|
||||||
|
cluster {
|
||||||
|
mode remote;
|
||||||
|
origin 127.0.0.1:19360;
|
||||||
|
}
|
||||||
|
http_remux {
|
||||||
|
enabled on;
|
||||||
|
mount [vhost]/[app]/[stream].flv;
|
||||||
|
}
|
||||||
|
}
|
||||||
19
trunk/conf/origin-for-edge.conf
Normal file
19
trunk/conf/origin-for-edge.conf
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
# Upstream origin for the proxy+edge E2E test (skills/srs-develop).
|
||||||
|
# Not registered with the proxy; the edge in front of it is. The edge pulls
|
||||||
|
# from this origin on play and pushes publishes here on publish.
|
||||||
|
|
||||||
|
max_connections 1000;
|
||||||
|
pid objs/origin-for-edge.pid;
|
||||||
|
daemon off;
|
||||||
|
srs_log_tank console;
|
||||||
|
|
||||||
|
rtmp {
|
||||||
|
listen 19360;
|
||||||
|
}
|
||||||
|
http_api {
|
||||||
|
enabled on;
|
||||||
|
listen 19860;
|
||||||
|
}
|
||||||
|
vhost __defaultVhost__ {
|
||||||
|
}
|
||||||
|
|
@ -7,6 +7,8 @@ The changelog for SRS.
|
||||||
<a name="v7-changes"></a>
|
<a name="v7-changes"></a>
|
||||||
|
|
||||||
## SRS 7.0 Changelog
|
## SRS 7.0 Changelog
|
||||||
|
* v7.0, 2026-05-19, Merge [#4678](https://github.com/ossrs/srs/pull/4678): Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v7.0.150 (#4678)
|
||||||
|
* v7.0, 2026-05-17, Merge [#4676](https://github.com/ossrs/srs/pull/4676): Proxy: Fix RTC/SRT reader goroutine leak; unwrap legacy WHEP JSON envelope; add WHEP pprof guide. v7.0.149 (#4676)
|
||||||
* v7.0, 2026-05-17, Merge [#4675](https://github.com/ossrs/srs/pull/4675): Proxy: Refactor for testability; add SRT/WHIP E2E and unit tests. v7.0.148 (#4675)
|
* v7.0, 2026-05-17, Merge [#4675](https://github.com/ossrs/srs/pull/4675): Proxy: Refactor for testability; add SRT/WHIP E2E and unit tests. v7.0.148 (#4675)
|
||||||
* v7.0, 2026-05-02, Merge [#4672](https://github.com/ossrs/srs/pull/4672): Proxy: Refactor server APIs and expand RTMP test coverage. v7.0.147 (#4672)
|
* v7.0, 2026-05-02, Merge [#4672](https://github.com/ossrs/srs/pull/4672): Proxy: Refactor server APIs and expand RTMP test coverage. v7.0.147 (#4672)
|
||||||
* v7.0, 2026-04-28, Merge [#4670](https://github.com/ossrs/srs/pull/4670): Proxy: Refine logger and environment APIs. v7.0.146 (#4670)
|
* v7.0, 2026-04-28, Merge [#4670](https://github.com/ossrs/srs/pull/4670): Proxy: Refine logger and environment APIs. v7.0.146 (#4670)
|
||||||
|
|
|
||||||
|
|
@ -1076,7 +1076,7 @@ SrsHttpStreamServer::SrsHttpStreamServer()
|
||||||
void SrsHttpStreamServer::assemble()
|
void SrsHttpStreamServer::assemble()
|
||||||
{
|
{
|
||||||
SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_);
|
SrsHttpServeMux *mux = dynamic_cast<SrsHttpServeMux *>(mux_);
|
||||||
if (!mux) {
|
if (mux) {
|
||||||
mux->add_dynamic_matcher(this);
|
mux->add_dynamic_matcher(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2559,7 +2559,10 @@ srs_error_t SrsLiveSource::consumer_dumps(ISrsLiveConsumer *consumer, bool ds, b
|
||||||
}
|
}
|
||||||
|
|
||||||
// If stream is publishing, dumps the sequence header and gop cache.
|
// If stream is publishing, dumps the sequence header and gop cache.
|
||||||
bool hub_active = hub_ ? hub_->active() : false;
|
// On edge, hub_ is NULL; the source is "publishing" once the edge-pull has
|
||||||
|
// populated the meta cache. Late-joining consumers must still receive the
|
||||||
|
// cached metadata + sequence headers + GOP via this path.
|
||||||
|
bool hub_active = hub_ ? hub_->active() : (meta_->data() || meta_->vsh() || meta_->ash());
|
||||||
if (hub_active) {
|
if (hub_active) {
|
||||||
// Copy metadata and sequence header to consumer.
|
// Copy metadata and sequence header to consumer.
|
||||||
if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) {
|
if ((err = meta_->dumps(consumer, atc_, jitter_algorithm_, dm, ds)) != srs_success) {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,6 @@
|
||||||
|
|
||||||
#define VERSION_MAJOR 7
|
#define VERSION_MAJOR 7
|
||||||
#define VERSION_MINOR 0
|
#define VERSION_MINOR 0
|
||||||
#define VERSION_REVISION 148
|
#define VERSION_REVISION 150
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user