From 7ede26453ed22c9b5909dc967c073efaf2d75881 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 17 May 2026 10:39:07 -0400 Subject: [PATCH] Claude: Add HTTP API proxy seams, unit tests, and proxyfakes. --- internal/proxy/api.go | 74 +- internal/proxy/api_test.go | 892 ++++++++++++++++++ internal/proxy/gen.go | 9 + .../proxyfakes/fake_http_api_proxy_server.go | 172 ++++ .../fake_http_stream_proxy_server.go | 172 ++++ .../proxyfakes/fake_rtmp_proxy_server.go | 172 ++++ .../proxyfakes/fake_web_rtc_proxy_server.go | 325 +++++++ 7 files changed, 1804 insertions(+), 12 deletions(-) create mode 100644 internal/proxy/api_test.go create mode 100644 internal/proxy/gen.go create mode 100644 internal/proxy/proxyfakes/fake_http_api_proxy_server.go create mode 100644 internal/proxy/proxyfakes/fake_http_stream_proxy_server.go create mode 100644 internal/proxy/proxyfakes/fake_rtmp_proxy_server.go create mode 100644 internal/proxy/proxyfakes/fake_web_rtc_proxy_server.go diff --git a/internal/proxy/api.go b/internal/proxy/api.go index 30189d032..381173c61 100644 --- a/internal/proxy/api.go +++ b/internal/proxy/api.go @@ -31,28 +31,53 @@ type httpAPIProxyServer struct { // The environment interface. environment env.ProxyEnvironment // The underlayer HTTP server. - server *http.Server + server httpServer // The WebRTC server. rtc WebRTCProxyServer // The gracefully quit timeout, wait server to quit. gracefulQuitTimeout time.Duration // The wait group for all goroutines. wg sync.WaitGroup + // shutdown gracefully shuts down the underlying HTTP server. Defaults to + // v.server.Shutdown; tests may override via a functional option to verify + // the shutdown contract without binding a real socket. + shutdown func(ctx context.Context) error + // newServer constructs the underlying HTTP server bound to addr and the + // ServeMux that handlers are registered on. Defaults to a real http.Server + // and ServeMux; tests may override via a functional option to supply a fake + // server that does not bind a real port. + newServer func(addr string) (httpServer, *http.ServeMux) } -func NewHTTPAPIProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCProxyServer) HTTPAPIProxyServer { +func NewHTTPAPIProxyServer(environment env.ProxyEnvironment, gracefulQuitTimeout time.Duration, rtc WebRTCProxyServer, opts ...func(*httpAPIProxyServer)) HTTPAPIProxyServer { v := &httpAPIProxyServer{ environment: environment, gracefulQuitTimeout: gracefulQuitTimeout, rtc: rtc, } + + // Default shutdown: delegate to the underlying http.Server. The closure + // captures v rather than v.server so the dereference happens at call time, + // after Run() has assigned v.server. + v.shutdown = func(ctx context.Context) error { + return v.server.Shutdown(ctx) + } + // Default newServer: a real http.Server and ServeMux pair. + v.newServer = func(addr string) (httpServer, *http.ServeMux) { + mux := http.NewServeMux() + return &http.Server{Addr: addr, Handler: mux}, mux + } + + for _, opt := range opts { + opt(v) + } return v } func (v *httpAPIProxyServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() - v.server.Shutdown(ctx) + v.shutdown(ctx) v.wg.Wait() return nil @@ -66,8 +91,8 @@ func (v *httpAPIProxyServer) Run(ctx context.Context) error { } // Create server and handler. - mux := http.NewServeMux() - v.server = &http.Server{Addr: addr, Handler: mux} + server, mux := v.newServer(addr) + v.server = server logger.Debug(ctx, "HTTP API server listen at %v", addr) // Shutdown the server gracefully when quiting. @@ -78,7 +103,7 @@ func (v *httpAPIProxyServer) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() - v.server.Shutdown(ctx) + v.shutdown(ctx) }() // The basic version handler, also can be used as health check API. @@ -150,26 +175,51 @@ type systemAPI struct { // The load balancer for origin servers. loadBalancer lb.OriginLoadBalancer // The underlayer HTTP server. - server *http.Server + server httpServer // The gracefully quit timeout, wait server to quit. gracefulQuitTimeout time.Duration // The wait group for all goroutines. wg sync.WaitGroup + // shutdown gracefully shuts down the underlying HTTP server. Defaults to + // v.server.Shutdown; tests may override via a functional option to verify + // the shutdown contract without binding a real socket. + shutdown func(ctx context.Context) error + // newServer constructs the underlying HTTP server bound to addr and the + // ServeMux that handlers are registered on. Defaults to a real http.Server + // and ServeMux; tests may override via a functional option to supply a fake + // server that does not bind a real port. + newServer func(addr string) (httpServer, *http.ServeMux) } -func NewSystemAPI(environment env.ProxyEnvironment, loadBalancer lb.OriginLoadBalancer, gracefulQuitTimeout time.Duration) *systemAPI { +func NewSystemAPI(environment env.ProxyEnvironment, loadBalancer lb.OriginLoadBalancer, gracefulQuitTimeout time.Duration, opts ...func(*systemAPI)) *systemAPI { v := &systemAPI{ environment: environment, loadBalancer: loadBalancer, gracefulQuitTimeout: gracefulQuitTimeout, } + + // Default shutdown: delegate to the underlying http.Server. The closure + // captures v rather than v.server so the dereference happens at call time, + // after Run() has assigned v.server. + v.shutdown = func(ctx context.Context) error { + return v.server.Shutdown(ctx) + } + // Default newServer: a real http.Server and ServeMux pair. + v.newServer = func(addr string) (httpServer, *http.ServeMux) { + mux := http.NewServeMux() + return &http.Server{Addr: addr, Handler: mux}, mux + } + + for _, opt := range opts { + opt(v) + } return v } func (v *systemAPI) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() - v.server.Shutdown(ctx) + v.shutdown(ctx) v.wg.Wait() return nil @@ -183,8 +233,8 @@ func (v *systemAPI) Run(ctx context.Context) error { } // Create server and handler. - mux := http.NewServeMux() - v.server = &http.Server{Addr: addr, Handler: mux} + server, mux := v.newServer(addr) + v.server = server logger.Debug(ctx, "System API server listen at %v", addr) // Shutdown the server gracefully when quiting. @@ -195,7 +245,7 @@ func (v *systemAPI) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() - v.server.Shutdown(ctx) + v.shutdown(ctx) }() // The basic version handler, also can be used as health check API. diff --git a/internal/proxy/api_test.go b/internal/proxy/api_test.go new file mode 100644 index 000000000..c05941fb6 --- /dev/null +++ b/internal/proxy/api_test.go @@ -0,0 +1,892 @@ +// Copyright (c) 2026 Winlin +// +// SPDX-License-Identifier: MIT +package proxy + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "srsx/internal/env/envfakes" + "srsx/internal/lb/lbfakes" +) + +// fakeWebRTCProxyServer is a minimal in-package WebRTCProxyServer used by +// httpAPIProxyServer tests. Only the WHIP/WHEP handler methods are exercised. +// Run/Close are inert stubs so the type satisfies the interface. +type fakeWebRTCProxyServer struct { + whipCalls atomic.Int32 + whepCalls atomic.Int32 + whipReturn error + whepReturn error + whipResponseBody string + whepResponseBody string +} + +func (f *fakeWebRTCProxyServer) Run(ctx context.Context) error { return nil } +func (f *fakeWebRTCProxyServer) Close() error { return nil } +func (f *fakeWebRTCProxyServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + f.whipCalls.Add(1) + if f.whipResponseBody != "" { + w.WriteHeader(http.StatusOK) + io.WriteString(w, f.whipResponseBody) + } + return f.whipReturn +} +func (f *fakeWebRTCProxyServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + f.whepCalls.Add(1) + if f.whepResponseBody != "" { + w.WriteHeader(http.StatusOK) + io.WriteString(w, f.whepResponseBody) + } + return f.whepReturn +} + +// captureMuxFromHTTPAPIRun drives NewHTTPAPIProxyServer.Run with a fake server +// that captures the registered mux. Caller is responsible for cancelling ctx +// to trigger shutdown. +func captureMuxFromHTTPAPIRun(t *testing.T, env *envfakes.FakeProxyEnvironment, + rtc WebRTCProxyServer, ctx context.Context, + opts ...func(*httpAPIProxyServer)) (*http.ServeMux, *fakeHTTPProxyServer, *httpAPIProxyServer) { + t.Helper() + + fakeSrv := newFakeHTTPProxyServer() + var capturedMux *http.ServeMux + + baseOpts := []func(*httpAPIProxyServer){ + func(s *httpAPIProxyServer) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + mux := http.NewServeMux() + capturedMux = mux + return fakeSrv, mux + } + }, + } + srvIface := NewHTTPAPIProxyServer(env, 50*time.Millisecond, rtc, append(baseOpts, opts...)...) + srv := srvIface.(*httpAPIProxyServer) + + if err := srv.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedMux == nil { + t.Fatal("newServer was not called by Run") + } + return capturedMux, fakeSrv, srv +} + +// captureMuxFromSystemAPIRun drives NewSystemAPI.Run with a fake server that +// captures the registered mux. Caller cancels ctx to trigger shutdown. +func captureMuxFromSystemAPIRun(t *testing.T, env *envfakes.FakeProxyEnvironment, + lbFake *lbfakes.FakeOriginLoadBalancer, ctx context.Context, + opts ...func(*systemAPI)) (*http.ServeMux, *fakeHTTPProxyServer, *systemAPI) { + t.Helper() + + fakeSrv := newFakeHTTPProxyServer() + var capturedMux *http.ServeMux + + baseOpts := []func(*systemAPI){ + func(s *systemAPI) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + mux := http.NewServeMux() + capturedMux = mux + return fakeSrv, mux + } + }, + } + srv := NewSystemAPI(env, lbFake, 50*time.Millisecond, append(baseOpts, opts...)...) + + if err := srv.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedMux == nil { + t.Fatal("newServer was not called by Run") + } + return capturedMux, fakeSrv, srv +} + +// ============================================================================= +// NewHTTPAPIProxyServer +// ============================================================================= + +func TestHTTPAPIProxyServer_New_StoresFieldsAndDefaultsSeams(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + rtc := &fakeWebRTCProxyServer{} + timeout := 2 * time.Second + srv := NewHTTPAPIProxyServer(env, timeout, rtc).(*httpAPIProxyServer) + + if srv.environment != env { + t.Error("environment not stored") + } + if srv.rtc != rtc { + t.Error("rtc not stored") + } + if srv.gracefulQuitTimeout != timeout { + t.Errorf("gracefulQuitTimeout = %v, want %v", srv.gracefulQuitTimeout, timeout) + } + if srv.shutdown == nil { + t.Error("shutdown seam should default to non-nil") + } + if srv.newServer == nil { + t.Error("newServer seam should default to non-nil") + } +} + +func TestHTTPAPIProxyServer_New_AppliesOpts(t *testing.T) { + var called bool + srv := NewHTTPAPIProxyServer(&envfakes.FakeProxyEnvironment{}, time.Second, + &fakeWebRTCProxyServer{}, + func(s *httpAPIProxyServer) { called = true }).(*httpAPIProxyServer) + if !called { + t.Fatal("opt was not invoked") + } + if srv.shutdown == nil { + t.Error("default seams should still be set when opt doesn't override them") + } +} + +func TestHTTPAPIProxyServer_New_OptCanOverrideAllSeams(t *testing.T) { + customShutdown := func(context.Context) error { return errors.New("custom") } + customNewServer := func(string) (httpServer, *http.ServeMux) { return nil, nil } + + srv := NewHTTPAPIProxyServer(&envfakes.FakeProxyEnvironment{}, time.Second, + &fakeWebRTCProxyServer{}, + func(s *httpAPIProxyServer) { + s.shutdown = customShutdown + s.newServer = customNewServer + }).(*httpAPIProxyServer) + + if err := srv.shutdown(context.Background()); err == nil || err.Error() != "custom" { + t.Errorf("custom shutdown not applied: %v", err) + } + // Pointer comparison on func values isn't supported by ==; call the value + // and observe the override via behavior. + if got, _ := srv.newServer(""); got != nil { + t.Error("custom newServer not applied") + } +} + +// ============================================================================= +// httpAPIProxyServer — default factory behavior +// ============================================================================= + +func TestHTTPAPIProxyServer_DefaultNewServer_BuildsRealServerAndMux(t *testing.T) { + srv := NewHTTPAPIProxyServer(&envfakes.FakeProxyEnvironment{}, time.Second, + &fakeWebRTCProxyServer{}).(*httpAPIProxyServer) + + got, mux := srv.newServer(":12321") + if mux == nil { + t.Fatal("mux is nil") + } + real, ok := got.(*http.Server) + if !ok { + t.Fatalf("expected *http.Server, got %T", got) + } + if real.Addr != ":12321" { + t.Errorf("Addr = %q, want :12321", real.Addr) + } + if real.Handler != mux { + t.Error("Handler should be the returned mux") + } +} + +func TestHTTPAPIProxyServer_DefaultShutdown_DelegatesToServer(t *testing.T) { + fakeSrv := newFakeHTTPProxyServer() + srv := NewHTTPAPIProxyServer(&envfakes.FakeProxyEnvironment{}, time.Second, + &fakeWebRTCProxyServer{}).(*httpAPIProxyServer) + srv.server = fakeSrv // simulate what Run() would assign + + if err := srv.shutdown(context.Background()); err != nil { + t.Fatalf("shutdown: %v", err) + } + if fakeSrv.shutdownCalls.Load() != 1 { + t.Fatalf("shutdown was not delegated to server, calls=%d", fakeSrv.shutdownCalls.Load()) + } +} + +// ============================================================================= +// httpAPIProxyServer — Close +// ============================================================================= + +func TestHTTPAPIProxyServer_Close_InvokesShutdownWithDeadline(t *testing.T) { + var gotCtx context.Context + var calls int + srv := NewHTTPAPIProxyServer(&envfakes.FakeProxyEnvironment{}, 50*time.Millisecond, + &fakeWebRTCProxyServer{}, + func(s *httpAPIProxyServer) { + s.shutdown = func(ctx context.Context) error { + gotCtx = ctx + calls++ + return nil + } + }).(*httpAPIProxyServer) + + if err := srv.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + if calls != 1 { + t.Fatalf("shutdown calls = %d, want 1", calls) + } + if _, ok := gotCtx.Deadline(); !ok { + t.Error("Close should pass a deadline-bearing ctx to shutdown") + } +} + +// ============================================================================= +// httpAPIProxyServer — Run lifecycle +// ============================================================================= + +func TestHTTPAPIProxyServer_Run_AddrWithoutColonPrependsIt(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns("11985") + + var capturedAddr string + fakeSrv := newFakeHTTPProxyServer() + srvIface := NewHTTPAPIProxyServer(env, 50*time.Millisecond, &fakeWebRTCProxyServer{}, + func(s *httpAPIProxyServer) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + capturedAddr = addr + return fakeSrv, http.NewServeMux() + } + }) + defer srvIface.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := srvIface.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedAddr != ":11985" { + t.Fatalf("newServer addr = %q, want :11985", capturedAddr) + } +} + +func TestHTTPAPIProxyServer_Run_AddrWithColonUnchanged(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns("127.0.0.1:9999") + + var capturedAddr string + fakeSrv := newFakeHTTPProxyServer() + srvIface := NewHTTPAPIProxyServer(env, 50*time.Millisecond, &fakeWebRTCProxyServer{}, + func(s *httpAPIProxyServer) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + capturedAddr = addr + return fakeSrv, http.NewServeMux() + } + }) + defer srvIface.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := srvIface.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedAddr != "127.0.0.1:9999" { + t.Fatalf("newServer addr = %q", capturedAddr) + } +} + +func TestHTTPAPIProxyServer_Run_CtxCancelTriggersShutdown(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, fakeSrv, _ := captureMuxFromHTTPAPIRun(t, env, &fakeWebRTCProxyServer{}, ctx) + + deadline := time.Now().Add(time.Second) + for fakeSrv.listenCalls.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(time.Millisecond) + } + if fakeSrv.listenCalls.Load() == 0 { + t.Fatal("ListenAndServe goroutine did not start") + } + + cancel() + + deadline = time.Now().Add(time.Second) + for fakeSrv.shutdownCalls.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(time.Millisecond) + } + if fakeSrv.shutdownCalls.Load() == 0 { + t.Fatal("Shutdown was not invoked after ctx cancel") + } +} + +// ============================================================================= +// httpAPIProxyServer — handler dispatch +// ============================================================================= + +func TestHTTPAPIProxyServer_Run_HandlerVersionsReturnsJSON(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, &fakeWebRTCProxyServer{}, ctx) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/versions", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var body map[string]string + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("json: %v\nbody=%s", err, rec.Body.String()) + } + if body["signature"] == "" { + t.Error("signature should be populated") + } + if body["version"] == "" { + t.Error("version should be populated") + } +} + +func TestHTTPAPIProxyServer_Run_HandlerWHIPDelegatesToRTC(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{whipResponseBody: "ok-whip"} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/whip/?app=live&stream=s", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rtc.whipCalls.Load() != 1 { + t.Fatalf("HandleApiForWHIP calls = %d, want 1", rtc.whipCalls.Load()) + } + if rtc.whepCalls.Load() != 0 { + t.Errorf("HandleApiForWHEP should not be invoked") + } + if !bytes.Equal(rec.Body.Bytes(), []byte("ok-whip")) { + t.Errorf("body = %q, want ok-whip", rec.Body.String()) + } +} + +func TestHTTPAPIProxyServer_Run_HandlerLegacyPublishRoutesToWHIP(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/publish/", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rtc.whipCalls.Load() != 1 { + t.Fatalf("HandleApiForWHIP via /rtc/v1/publish/ calls = %d, want 1", rtc.whipCalls.Load()) + } + if rtc.whepCalls.Load() != 0 { + t.Errorf("HandleApiForWHEP should not be invoked via /rtc/v1/publish/") + } +} + +func TestHTTPAPIProxyServer_Run_HandlerWHIPErrorInvokesApiError(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{whipReturn: errors.New("boom-whip")} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/whip/", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want 500", rec.Code) + } + if !bytes.Contains(rec.Body.Bytes(), []byte("boom-whip")) { + t.Errorf("body = %q, expected to contain error message", rec.Body.String()) + } +} + +func TestHTTPAPIProxyServer_Run_HandlerWHEPDelegatesToRTC(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{whepResponseBody: "ok-whep"} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/whep/", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rtc.whepCalls.Load() != 1 { + t.Fatalf("HandleApiForWHEP calls = %d, want 1", rtc.whepCalls.Load()) + } + if rtc.whipCalls.Load() != 0 { + t.Errorf("HandleApiForWHIP should not be invoked") + } + if !bytes.Equal(rec.Body.Bytes(), []byte("ok-whep")) { + t.Errorf("body = %q, want ok-whep", rec.Body.String()) + } +} + +func TestHTTPAPIProxyServer_Run_HandlerLegacyPlayRoutesToWHEP(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/play/", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rtc.whepCalls.Load() != 1 { + t.Fatalf("HandleApiForWHEP via /rtc/v1/play/ calls = %d, want 1", rtc.whepCalls.Load()) + } + if rtc.whipCalls.Load() != 0 { + t.Errorf("HandleApiForWHIP should not be invoked via /rtc/v1/play/") + } +} + +func TestHTTPAPIProxyServer_Run_HandlerWHEPErrorInvokesApiError(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.HttpAPIReturns(":0") + rtc := &fakeWebRTCProxyServer{whepReturn: errors.New("boom-whep")} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromHTTPAPIRun(t, env, rtc, ctx) + + req := httptest.NewRequest(http.MethodPost, "/rtc/v1/whep/", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want 500", rec.Code) + } + if !bytes.Contains(rec.Body.Bytes(), []byte("boom-whep")) { + t.Errorf("body = %q", rec.Body.String()) + } +} + +// ============================================================================= +// NewSystemAPI +// ============================================================================= + +func TestSystemAPI_New_StoresFieldsAndDefaultsSeams(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + lbFake := &lbfakes.FakeOriginLoadBalancer{} + timeout := 2 * time.Second + srv := NewSystemAPI(env, lbFake, timeout) + + if srv.environment != env { + t.Error("environment not stored") + } + if srv.loadBalancer != lbFake { + t.Error("loadBalancer not stored") + } + if srv.gracefulQuitTimeout != timeout { + t.Errorf("gracefulQuitTimeout = %v, want %v", srv.gracefulQuitTimeout, timeout) + } + if srv.shutdown == nil { + t.Error("shutdown seam should default to non-nil") + } + if srv.newServer == nil { + t.Error("newServer seam should default to non-nil") + } +} + +func TestSystemAPI_New_AppliesOpts(t *testing.T) { + var called bool + srv := NewSystemAPI(&envfakes.FakeProxyEnvironment{}, &lbfakes.FakeOriginLoadBalancer{}, + time.Second, func(s *systemAPI) { called = true }) + if !called { + t.Fatal("opt was not invoked") + } + if srv.shutdown == nil { + t.Error("default seams should still be set when opt doesn't override them") + } +} + +func TestSystemAPI_New_OptCanOverrideAllSeams(t *testing.T) { + customShutdown := func(context.Context) error { return errors.New("custom") } + customNewServer := func(string) (httpServer, *http.ServeMux) { return nil, nil } + + srv := NewSystemAPI(&envfakes.FakeProxyEnvironment{}, &lbfakes.FakeOriginLoadBalancer{}, + time.Second, func(s *systemAPI) { + s.shutdown = customShutdown + s.newServer = customNewServer + }) + + if err := srv.shutdown(context.Background()); err == nil || err.Error() != "custom" { + t.Errorf("custom shutdown not applied: %v", err) + } + if got, _ := srv.newServer(""); got != nil { + t.Error("custom newServer not applied") + } +} + +// ============================================================================= +// systemAPI — default factory behavior +// ============================================================================= + +func TestSystemAPI_DefaultNewServer_BuildsRealServerAndMux(t *testing.T) { + srv := NewSystemAPI(&envfakes.FakeProxyEnvironment{}, &lbfakes.FakeOriginLoadBalancer{}, time.Second) + + got, mux := srv.newServer(":12321") + if mux == nil { + t.Fatal("mux is nil") + } + real, ok := got.(*http.Server) + if !ok { + t.Fatalf("expected *http.Server, got %T", got) + } + if real.Addr != ":12321" { + t.Errorf("Addr = %q, want :12321", real.Addr) + } + if real.Handler != mux { + t.Error("Handler should be the returned mux") + } +} + +func TestSystemAPI_DefaultShutdown_DelegatesToServer(t *testing.T) { + fakeSrv := newFakeHTTPProxyServer() + srv := NewSystemAPI(&envfakes.FakeProxyEnvironment{}, &lbfakes.FakeOriginLoadBalancer{}, time.Second) + srv.server = fakeSrv + + if err := srv.shutdown(context.Background()); err != nil { + t.Fatalf("shutdown: %v", err) + } + if fakeSrv.shutdownCalls.Load() != 1 { + t.Fatalf("shutdown was not delegated, calls=%d", fakeSrv.shutdownCalls.Load()) + } +} + +// ============================================================================= +// systemAPI — Close +// ============================================================================= + +func TestSystemAPI_Close_InvokesShutdownWithDeadline(t *testing.T) { + var gotCtx context.Context + var calls int + srv := NewSystemAPI(&envfakes.FakeProxyEnvironment{}, &lbfakes.FakeOriginLoadBalancer{}, + 50*time.Millisecond, func(s *systemAPI) { + s.shutdown = func(ctx context.Context) error { + gotCtx = ctx + calls++ + return nil + } + }) + + if err := srv.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + if calls != 1 { + t.Fatalf("shutdown calls = %d, want 1", calls) + } + if _, ok := gotCtx.Deadline(); !ok { + t.Error("Close should pass a deadline-bearing ctx to shutdown") + } +} + +// ============================================================================= +// systemAPI — Run lifecycle +// ============================================================================= + +func TestSystemAPI_Run_AddrWithoutColonPrependsIt(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns("12025") + + var capturedAddr string + fakeSrv := newFakeHTTPProxyServer() + srv := NewSystemAPI(env, &lbfakes.FakeOriginLoadBalancer{}, 50*time.Millisecond, + func(s *systemAPI) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + capturedAddr = addr + return fakeSrv, http.NewServeMux() + } + }) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := srv.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedAddr != ":12025" { + t.Fatalf("newServer addr = %q, want :12025", capturedAddr) + } +} + +func TestSystemAPI_Run_AddrWithColonUnchanged(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns("127.0.0.1:9999") + + var capturedAddr string + fakeSrv := newFakeHTTPProxyServer() + srv := NewSystemAPI(env, &lbfakes.FakeOriginLoadBalancer{}, 50*time.Millisecond, + func(s *systemAPI) { + s.newServer = func(addr string) (httpServer, *http.ServeMux) { + capturedAddr = addr + return fakeSrv, http.NewServeMux() + } + }) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := srv.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if capturedAddr != "127.0.0.1:9999" { + t.Fatalf("newServer addr = %q", capturedAddr) + } +} + +func TestSystemAPI_Run_CtxCancelTriggersShutdown(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, fakeSrv, _ := captureMuxFromSystemAPIRun(t, env, &lbfakes.FakeOriginLoadBalancer{}, ctx) + + deadline := time.Now().Add(time.Second) + for fakeSrv.listenCalls.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(time.Millisecond) + } + if fakeSrv.listenCalls.Load() == 0 { + t.Fatal("ListenAndServe goroutine did not start") + } + + cancel() + + deadline = time.Now().Add(time.Second) + for fakeSrv.shutdownCalls.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(time.Millisecond) + } + if fakeSrv.shutdownCalls.Load() == 0 { + t.Fatal("Shutdown was not invoked after ctx cancel") + } +} + +// ============================================================================= +// systemAPI — handler dispatch +// ============================================================================= + +func TestSystemAPI_Run_HandlerVersionsReturnsJSON(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromSystemAPIRun(t, env, &lbfakes.FakeOriginLoadBalancer{}, ctx) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/versions", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var body map[string]string + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("json: %v\nbody=%s", err, rec.Body.String()) + } + if body["signature"] == "" { + t.Error("signature should be populated") + } + if body["version"] == "" { + t.Error("version should be populated") + } +} + +// validRegisterBody returns the JSON body for a happy-path /api/v1/srs/register call. +func validRegisterBody(t *testing.T) io.Reader { + t.Helper() + b, err := json.Marshal(map[string]any{ + "ip": "1.2.3.4", + "server": "srv-abc", + "service": "svc-1", + "pid": "12345", + "rtmp": []string{"1935"}, + "http": []string{"8080"}, + "api": []string{"1985"}, + "srt": []string{"10080"}, + "rtc": []string{"8000"}, + "device_id": "dev-x", + }) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return bytes.NewReader(b) +} + +func TestSystemAPI_Run_HandlerRegisterHappyPathCallsUpdate(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + lbFake := &lbfakes.FakeOriginLoadBalancer{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromSystemAPIRun(t, env, lbFake, ctx) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/srs/register", validRegisterBody(t)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if lbFake.UpdateCallCount() != 1 { + t.Fatalf("Update calls = %d, want 1", lbFake.UpdateCallCount()) + } + _, server := lbFake.UpdateArgsForCall(0) + if server.IP != "1.2.3.4" { + t.Errorf("IP = %q", server.IP) + } + if server.ServerID != "srv-abc" { + t.Errorf("ServerID = %q", server.ServerID) + } + if server.ServiceID != "svc-1" { + t.Errorf("ServiceID = %q", server.ServiceID) + } + if server.PID != "12345" { + t.Errorf("PID = %q", server.PID) + } + if got := server.RTMP; len(got) != 1 || got[0] != "1935" { + t.Errorf("RTMP = %v", got) + } + if got := server.HTTP; len(got) != 1 || got[0] != "8080" { + t.Errorf("HTTP = %v", got) + } + if got := server.API; len(got) != 1 || got[0] != "1985" { + t.Errorf("API = %v", got) + } + if got := server.SRT; len(got) != 1 || got[0] != "10080" { + t.Errorf("SRT = %v", got) + } + if got := server.RTC; len(got) != 1 || got[0] != "8000" { + t.Errorf("RTC = %v", got) + } + if server.DeviceID != "dev-x" { + t.Errorf("DeviceID = %q", server.DeviceID) + } + if server.UpdatedAt.IsZero() { + t.Error("UpdatedAt should be set") + } +} + +func TestSystemAPI_Run_HandlerRegisterParseBodyError(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + lbFake := &lbfakes.FakeOriginLoadBalancer{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromSystemAPIRun(t, env, lbFake, ctx) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/srs/register", + bytes.NewReader([]byte("not json"))) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if lbFake.UpdateCallCount() != 0 { + t.Fatalf("Update should not be called on parse body err, calls = %d", lbFake.UpdateCallCount()) + } + if !bytes.Contains(rec.Body.Bytes(), []byte("parse body")) { + t.Errorf("body = %q, expected parse body error", rec.Body.String()) + } +} + +// registerWithField returns a body with one field replaced. Other mandatory +// fields default to valid values so only the field under test triggers an +// error. +func registerWithField(t *testing.T, field string, value any) io.Reader { + t.Helper() + m := map[string]any{ + "ip": "1.2.3.4", + "server": "srv-abc", + "service": "svc-1", + "pid": "12345", + "rtmp": []string{"1935"}, + } + m[field] = value + b, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return bytes.NewReader(b) +} + +func TestSystemAPI_Run_HandlerRegisterValidationErrors(t *testing.T) { + cases := []struct { + name string + field string + value any + wantErrText string + }{ + {"empty-ip", "ip", "", "empty ip"}, + {"empty-server", "server", "", "empty server"}, + {"empty-service", "service", "", "empty service"}, + {"empty-pid", "pid", "", "empty pid"}, + {"empty-rtmp", "rtmp", []string{}, "empty rtmp"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + lbFake := &lbfakes.FakeOriginLoadBalancer{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromSystemAPIRun(t, env, lbFake, ctx) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/srs/register", + registerWithField(t, tc.field, tc.value)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if lbFake.UpdateCallCount() != 0 { + t.Errorf("Update should not be called when %s is invalid, calls = %d", + tc.field, lbFake.UpdateCallCount()) + } + if !bytes.Contains(rec.Body.Bytes(), []byte(tc.wantErrText)) { + t.Errorf("body = %q, expected to contain %q", rec.Body.String(), tc.wantErrText) + } + }) + } +} + +func TestSystemAPI_Run_HandlerRegisterLoadBalancerUpdateError(t *testing.T) { + env := &envfakes.FakeProxyEnvironment{} + env.SystemAPIReturns(":0") + lbFake := &lbfakes.FakeOriginLoadBalancer{} + lbFake.UpdateReturns(errors.New("lb-update-fail")) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux, _, _ := captureMuxFromSystemAPIRun(t, env, lbFake, ctx) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/srs/register", validRegisterBody(t)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if lbFake.UpdateCallCount() != 1 { + t.Fatalf("Update calls = %d, want 1", lbFake.UpdateCallCount()) + } + if !bytes.Contains(rec.Body.Bytes(), []byte("lb-update-fail")) { + t.Errorf("body = %q, expected lb error", rec.Body.String()) + } +} diff --git a/internal/proxy/gen.go b/internal/proxy/gen.go new file mode 100644 index 000000000..dc2013c1c --- /dev/null +++ b/internal/proxy/gen.go @@ -0,0 +1,9 @@ +// Copyright (c) 2026 Winlin +// +// SPDX-License-Identifier: MIT +package proxy + +//go:generate go tool counterfeiter -o proxyfakes/fake_rtmp_proxy_server.go . RTMPProxyServer +//go:generate go tool counterfeiter -o proxyfakes/fake_http_stream_proxy_server.go . HTTPStreamProxyServer +//go:generate go tool counterfeiter -o proxyfakes/fake_http_api_proxy_server.go . HTTPAPIProxyServer +//go:generate go tool counterfeiter -o proxyfakes/fake_web_rtc_proxy_server.go . WebRTCProxyServer diff --git a/internal/proxy/proxyfakes/fake_http_api_proxy_server.go b/internal/proxy/proxyfakes/fake_http_api_proxy_server.go new file mode 100644 index 000000000..a16710ff9 --- /dev/null +++ b/internal/proxy/proxyfakes/fake_http_api_proxy_server.go @@ -0,0 +1,172 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package proxyfakes + +import ( + "context" + "srsx/internal/proxy" + "sync" +) + +type FakeHTTPAPIProxyServer struct { + CloseStub func() error + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + closeReturns struct { + result1 error + } + closeReturnsOnCall map[int]struct { + result1 error + } + RunStub func(context.Context) error + runMutex sync.RWMutex + runArgsForCall []struct { + arg1 context.Context + } + runReturns struct { + result1 error + } + runReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeHTTPAPIProxyServer) Close() error { + fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fakeReturns := fake.closeReturns + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeHTTPAPIProxyServer) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeHTTPAPIProxyServer) CloseCalls(stub func() error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *FakeHTTPAPIProxyServer) CloseReturns(result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + fake.closeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPAPIProxyServer) CloseReturnsOnCall(i int, result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPAPIProxyServer) Run(arg1 context.Context) error { + fake.runMutex.Lock() + ret, specificReturn := fake.runReturnsOnCall[len(fake.runArgsForCall)] + fake.runArgsForCall = append(fake.runArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.RunStub + fakeReturns := fake.runReturns + fake.recordInvocation("Run", []interface{}{arg1}) + fake.runMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeHTTPAPIProxyServer) RunCallCount() int { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + return len(fake.runArgsForCall) +} + +func (fake *FakeHTTPAPIProxyServer) RunCalls(stub func(context.Context) error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = stub +} + +func (fake *FakeHTTPAPIProxyServer) RunArgsForCall(i int) context.Context { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + argsForCall := fake.runArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeHTTPAPIProxyServer) RunReturns(result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + fake.runReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPAPIProxyServer) RunReturnsOnCall(i int, result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + if fake.runReturnsOnCall == nil { + fake.runReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.runReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPAPIProxyServer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeHTTPAPIProxyServer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ proxy.HTTPAPIProxyServer = new(FakeHTTPAPIProxyServer) diff --git a/internal/proxy/proxyfakes/fake_http_stream_proxy_server.go b/internal/proxy/proxyfakes/fake_http_stream_proxy_server.go new file mode 100644 index 000000000..7b6f71010 --- /dev/null +++ b/internal/proxy/proxyfakes/fake_http_stream_proxy_server.go @@ -0,0 +1,172 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package proxyfakes + +import ( + "context" + "srsx/internal/proxy" + "sync" +) + +type FakeHTTPStreamProxyServer struct { + CloseStub func() error + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + closeReturns struct { + result1 error + } + closeReturnsOnCall map[int]struct { + result1 error + } + RunStub func(context.Context) error + runMutex sync.RWMutex + runArgsForCall []struct { + arg1 context.Context + } + runReturns struct { + result1 error + } + runReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeHTTPStreamProxyServer) Close() error { + fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fakeReturns := fake.closeReturns + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeHTTPStreamProxyServer) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeHTTPStreamProxyServer) CloseCalls(stub func() error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *FakeHTTPStreamProxyServer) CloseReturns(result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + fake.closeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPStreamProxyServer) CloseReturnsOnCall(i int, result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPStreamProxyServer) Run(arg1 context.Context) error { + fake.runMutex.Lock() + ret, specificReturn := fake.runReturnsOnCall[len(fake.runArgsForCall)] + fake.runArgsForCall = append(fake.runArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.RunStub + fakeReturns := fake.runReturns + fake.recordInvocation("Run", []interface{}{arg1}) + fake.runMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeHTTPStreamProxyServer) RunCallCount() int { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + return len(fake.runArgsForCall) +} + +func (fake *FakeHTTPStreamProxyServer) RunCalls(stub func(context.Context) error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = stub +} + +func (fake *FakeHTTPStreamProxyServer) RunArgsForCall(i int) context.Context { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + argsForCall := fake.runArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeHTTPStreamProxyServer) RunReturns(result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + fake.runReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPStreamProxyServer) RunReturnsOnCall(i int, result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + if fake.runReturnsOnCall == nil { + fake.runReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.runReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeHTTPStreamProxyServer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeHTTPStreamProxyServer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ proxy.HTTPStreamProxyServer = new(FakeHTTPStreamProxyServer) diff --git a/internal/proxy/proxyfakes/fake_rtmp_proxy_server.go b/internal/proxy/proxyfakes/fake_rtmp_proxy_server.go new file mode 100644 index 000000000..44d20294d --- /dev/null +++ b/internal/proxy/proxyfakes/fake_rtmp_proxy_server.go @@ -0,0 +1,172 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package proxyfakes + +import ( + "context" + "srsx/internal/proxy" + "sync" +) + +type FakeRTMPProxyServer struct { + CloseStub func() error + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + closeReturns struct { + result1 error + } + closeReturnsOnCall map[int]struct { + result1 error + } + RunStub func(context.Context) error + runMutex sync.RWMutex + runArgsForCall []struct { + arg1 context.Context + } + runReturns struct { + result1 error + } + runReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeRTMPProxyServer) Close() error { + fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fakeReturns := fake.closeReturns + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRTMPProxyServer) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeRTMPProxyServer) CloseCalls(stub func() error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *FakeRTMPProxyServer) CloseReturns(result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + fake.closeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRTMPProxyServer) CloseReturnsOnCall(i int, result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeRTMPProxyServer) Run(arg1 context.Context) error { + fake.runMutex.Lock() + ret, specificReturn := fake.runReturnsOnCall[len(fake.runArgsForCall)] + fake.runArgsForCall = append(fake.runArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.RunStub + fakeReturns := fake.runReturns + fake.recordInvocation("Run", []interface{}{arg1}) + fake.runMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRTMPProxyServer) RunCallCount() int { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + return len(fake.runArgsForCall) +} + +func (fake *FakeRTMPProxyServer) RunCalls(stub func(context.Context) error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = stub +} + +func (fake *FakeRTMPProxyServer) RunArgsForCall(i int) context.Context { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + argsForCall := fake.runArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRTMPProxyServer) RunReturns(result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + fake.runReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRTMPProxyServer) RunReturnsOnCall(i int, result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + if fake.runReturnsOnCall == nil { + fake.runReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.runReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeRTMPProxyServer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeRTMPProxyServer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ proxy.RTMPProxyServer = new(FakeRTMPProxyServer) diff --git a/internal/proxy/proxyfakes/fake_web_rtc_proxy_server.go b/internal/proxy/proxyfakes/fake_web_rtc_proxy_server.go new file mode 100644 index 000000000..5401d5fed --- /dev/null +++ b/internal/proxy/proxyfakes/fake_web_rtc_proxy_server.go @@ -0,0 +1,325 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package proxyfakes + +import ( + "context" + "net/http" + "srsx/internal/proxy" + "sync" +) + +type FakeWebRTCProxyServer struct { + CloseStub func() error + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + closeReturns struct { + result1 error + } + closeReturnsOnCall map[int]struct { + result1 error + } + HandleApiForWHEPStub func(context.Context, http.ResponseWriter, *http.Request) error + handleApiForWHEPMutex sync.RWMutex + handleApiForWHEPArgsForCall []struct { + arg1 context.Context + arg2 http.ResponseWriter + arg3 *http.Request + } + handleApiForWHEPReturns struct { + result1 error + } + handleApiForWHEPReturnsOnCall map[int]struct { + result1 error + } + HandleApiForWHIPStub func(context.Context, http.ResponseWriter, *http.Request) error + handleApiForWHIPMutex sync.RWMutex + handleApiForWHIPArgsForCall []struct { + arg1 context.Context + arg2 http.ResponseWriter + arg3 *http.Request + } + handleApiForWHIPReturns struct { + result1 error + } + handleApiForWHIPReturnsOnCall map[int]struct { + result1 error + } + RunStub func(context.Context) error + runMutex sync.RWMutex + runArgsForCall []struct { + arg1 context.Context + } + runReturns struct { + result1 error + } + runReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeWebRTCProxyServer) Close() error { + fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fakeReturns := fake.closeReturns + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeWebRTCProxyServer) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeWebRTCProxyServer) CloseCalls(stub func() error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *FakeWebRTCProxyServer) CloseReturns(result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + fake.closeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) CloseReturnsOnCall(i int, result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEP(arg1 context.Context, arg2 http.ResponseWriter, arg3 *http.Request) error { + fake.handleApiForWHEPMutex.Lock() + ret, specificReturn := fake.handleApiForWHEPReturnsOnCall[len(fake.handleApiForWHEPArgsForCall)] + fake.handleApiForWHEPArgsForCall = append(fake.handleApiForWHEPArgsForCall, struct { + arg1 context.Context + arg2 http.ResponseWriter + arg3 *http.Request + }{arg1, arg2, arg3}) + stub := fake.HandleApiForWHEPStub + fakeReturns := fake.handleApiForWHEPReturns + fake.recordInvocation("HandleApiForWHEP", []interface{}{arg1, arg2, arg3}) + fake.handleApiForWHEPMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEPCallCount() int { + fake.handleApiForWHEPMutex.RLock() + defer fake.handleApiForWHEPMutex.RUnlock() + return len(fake.handleApiForWHEPArgsForCall) +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEPCalls(stub func(context.Context, http.ResponseWriter, *http.Request) error) { + fake.handleApiForWHEPMutex.Lock() + defer fake.handleApiForWHEPMutex.Unlock() + fake.HandleApiForWHEPStub = stub +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEPArgsForCall(i int) (context.Context, http.ResponseWriter, *http.Request) { + fake.handleApiForWHEPMutex.RLock() + defer fake.handleApiForWHEPMutex.RUnlock() + argsForCall := fake.handleApiForWHEPArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEPReturns(result1 error) { + fake.handleApiForWHEPMutex.Lock() + defer fake.handleApiForWHEPMutex.Unlock() + fake.HandleApiForWHEPStub = nil + fake.handleApiForWHEPReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHEPReturnsOnCall(i int, result1 error) { + fake.handleApiForWHEPMutex.Lock() + defer fake.handleApiForWHEPMutex.Unlock() + fake.HandleApiForWHEPStub = nil + if fake.handleApiForWHEPReturnsOnCall == nil { + fake.handleApiForWHEPReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleApiForWHEPReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIP(arg1 context.Context, arg2 http.ResponseWriter, arg3 *http.Request) error { + fake.handleApiForWHIPMutex.Lock() + ret, specificReturn := fake.handleApiForWHIPReturnsOnCall[len(fake.handleApiForWHIPArgsForCall)] + fake.handleApiForWHIPArgsForCall = append(fake.handleApiForWHIPArgsForCall, struct { + arg1 context.Context + arg2 http.ResponseWriter + arg3 *http.Request + }{arg1, arg2, arg3}) + stub := fake.HandleApiForWHIPStub + fakeReturns := fake.handleApiForWHIPReturns + fake.recordInvocation("HandleApiForWHIP", []interface{}{arg1, arg2, arg3}) + fake.handleApiForWHIPMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIPCallCount() int { + fake.handleApiForWHIPMutex.RLock() + defer fake.handleApiForWHIPMutex.RUnlock() + return len(fake.handleApiForWHIPArgsForCall) +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIPCalls(stub func(context.Context, http.ResponseWriter, *http.Request) error) { + fake.handleApiForWHIPMutex.Lock() + defer fake.handleApiForWHIPMutex.Unlock() + fake.HandleApiForWHIPStub = stub +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIPArgsForCall(i int) (context.Context, http.ResponseWriter, *http.Request) { + fake.handleApiForWHIPMutex.RLock() + defer fake.handleApiForWHIPMutex.RUnlock() + argsForCall := fake.handleApiForWHIPArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIPReturns(result1 error) { + fake.handleApiForWHIPMutex.Lock() + defer fake.handleApiForWHIPMutex.Unlock() + fake.HandleApiForWHIPStub = nil + fake.handleApiForWHIPReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) HandleApiForWHIPReturnsOnCall(i int, result1 error) { + fake.handleApiForWHIPMutex.Lock() + defer fake.handleApiForWHIPMutex.Unlock() + fake.HandleApiForWHIPStub = nil + if fake.handleApiForWHIPReturnsOnCall == nil { + fake.handleApiForWHIPReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleApiForWHIPReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) Run(arg1 context.Context) error { + fake.runMutex.Lock() + ret, specificReturn := fake.runReturnsOnCall[len(fake.runArgsForCall)] + fake.runArgsForCall = append(fake.runArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.RunStub + fakeReturns := fake.runReturns + fake.recordInvocation("Run", []interface{}{arg1}) + fake.runMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeWebRTCProxyServer) RunCallCount() int { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + return len(fake.runArgsForCall) +} + +func (fake *FakeWebRTCProxyServer) RunCalls(stub func(context.Context) error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = stub +} + +func (fake *FakeWebRTCProxyServer) RunArgsForCall(i int) context.Context { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + argsForCall := fake.runArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeWebRTCProxyServer) RunReturns(result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + fake.runReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) RunReturnsOnCall(i int, result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + if fake.runReturnsOnCall == nil { + fake.runReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.runReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeWebRTCProxyServer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeWebRTCProxyServer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ proxy.WebRTCProxyServer = new(FakeWebRTCProxyServer)