Claude: Add HTTP proxy seams and unit tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
winlin 2026-05-16 21:27:24 -04:00
parent f42921d7b1
commit 0a18a4a13b
2 changed files with 1381 additions and 22 deletions

View File

@ -7,7 +7,7 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
@ -31,32 +31,95 @@ type HTTPStreamProxyServer interface {
Close() error
}
// httpServer is the minimal contract of an HTTP server that httpStreamProxyServer drives.
// *http.Server satisfies it. Tests may supply a fake that does not bind a real port.
type httpServer interface {
ListenAndServe() error
Shutdown(ctx context.Context) error
}
// buildBackendHTTPURL composes the backend HTTP URL for a request path, targeting
// the given backend IP and port. Callers append query strings separately when needed.
func buildBackendHTTPURL(ip string, port int, path string) string {
return fmt.Sprintf("http://%v:%v%s", ip, port, path)
}
type httpStreamProxyServer struct {
// The environment interface.
environment env.ProxyEnvironment
// The load balancer for origin servers.
loadBalancer lb.OriginLoadBalancer
// The underlayer HTTP server.
server *http.Server
server httpServer
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg stdSync.WaitGroup
// shutdown gracefully shuts down the underlying HTTP server. Defaults to
// v.server.Shutdown; tests may override via a functional option to verify
// the shutdown contract without binding a real socket.
shutdown func(ctx context.Context) error
// newServer constructs the underlying HTTP server bound to addr and the
// ServeMux that handlers are registered on. Defaults to a real http.Server
// and ServeMux; tests may override via a functional option to supply a fake
// server that does not bind a real port.
newServer func(addr string) (httpServer, *http.ServeMux)
// newHLSStream constructs a per-stream HLS playback object for the given
// stream URL pair. Defaults to newHLSPlayStream pre-wired with this server's
// load balancer and a fresh SPBHID; tests may override via a functional option.
newHLSStream func(streamURL, fullURL string) *hlsPlayStream
// newFlvTsConn constructs a per-request HTTP-FLV/TS connection bound to ctx.
// Defaults to newHTTPFlvTsConnection pre-wired with this server's load
// balancer; tests may override via a functional option.
newFlvTsConn func(ctx context.Context) *httpFlvTsConnection
}
func NewHTTPStreamProxyServer(environment env.ProxyEnvironment, loadBalancer lb.OriginLoadBalancer, gracefulQuitTimeout time.Duration) HTTPStreamProxyServer {
func NewHTTPStreamProxyServer(environment env.ProxyEnvironment, loadBalancer lb.OriginLoadBalancer, gracefulQuitTimeout time.Duration, opts ...func(*httpStreamProxyServer)) HTTPStreamProxyServer {
v := &httpStreamProxyServer{
environment: environment,
loadBalancer: loadBalancer,
gracefulQuitTimeout: gracefulQuitTimeout,
}
// Default shutdown: delegate to the underlying http.Server. The closure
// captures v rather than v.server so the dereference happens at call time,
// after Run() has assigned v.server.
v.shutdown = func(ctx context.Context) error {
return v.server.Shutdown(ctx)
}
// Default newServer: a real http.Server and ServeMux pair.
v.newServer = func(addr string) (httpServer, *http.ServeMux) {
mux := http.NewServeMux()
return &http.Server{Addr: addr, Handler: mux}, mux
}
// Default newHLSStream: a real hlsPlayStream wired with the server's load
// balancer and a fresh SPBHID for this stream.
v.newHLSStream = func(streamURL, fullURL string) *hlsPlayStream {
return newHLSPlayStream(func(s *hlsPlayStream) {
s.loadBalancer = v.loadBalancer
s.SRSProxyBackendHLSID = logger.GenerateContextID()
s.StreamURL, s.FullURL = streamURL, fullURL
})
}
// Default newFlvTsConn: a real httpFlvTsConnection wired with the server's
// load balancer and the given ctx.
v.newFlvTsConn = func(ctx context.Context) *httpFlvTsConnection {
return newHTTPFlvTsConnection(func(c *httpFlvTsConnection) {
c.ctx = ctx
c.loadBalancer = v.loadBalancer
})
}
for _, opt := range opts {
opt(v)
}
return v
}
func (v *httpStreamProxyServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
v.shutdown(ctx)
v.wg.Wait()
return nil
@ -70,8 +133,8 @@ func (v *httpStreamProxyServer) Run(ctx context.Context) error {
}
// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
server, mux := v.newServer(addr)
v.server = server
logger.Debug(ctx, "HTTP Stream server listen at %v", addr)
// Shutdown the server gracefully when quiting.
@ -82,7 +145,7 @@ func (v *httpStreamProxyServer) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
v.shutdown(ctx)
}()
// The basic version handler, also can be used as health check API.
@ -131,11 +194,11 @@ func (v *httpStreamProxyServer) Run(ctx context.Context) error {
return
}
stream, _ := v.loadBalancer.LoadOrStoreHLS(ctx, streamURL, newHLSPlayStream(func(s *hlsPlayStream) {
s.loadBalancer = v.loadBalancer
s.SRSProxyBackendHLSID = logger.GenerateContextID()
s.StreamURL, s.FullURL = streamURL, fullURL
}))
stream, err := v.loadBalancer.LoadOrStoreHLS(ctx, streamURL, v.newHLSStream(streamURL, fullURL))
if err != nil {
http.Error(w, fmt.Sprintf("load or store hls %v", streamURL), http.StatusBadRequest)
return
}
stream.Initialize(ctx).(*hlsPlayStream).ServeHTTP(w, r)
return
@ -155,10 +218,7 @@ func (v *httpStreamProxyServer) Run(ctx context.Context) error {
}
// Use HTTP pseudo streaming to proxy the request.
newHTTPFlvTsConnection(func(c *httpFlvTsConnection) {
c.ctx = ctx
c.loadBalancer = v.loadBalancer
}).ServeHTTP(w, r)
v.newFlvTsConn(ctx).ServeHTTP(w, r)
return
}
@ -203,10 +263,15 @@ type httpFlvTsConnection struct {
ctx context.Context
// The load balancer for origin servers.
loadBalancer lb.OriginLoadBalancer
// buildBackendURL composes the backend HTTP URL for a request path. Defaults
// to buildBackendHTTPURL; tests may override via a functional option.
buildBackendURL func(ip string, port int, path string) string
}
func newHTTPFlvTsConnection(opts ...func(*httpFlvTsConnection)) *httpFlvTsConnection {
v := &httpFlvTsConnection{}
v := &httpFlvTsConnection{
buildBackendURL: buildBackendHTTPURL,
}
for _, opt := range opts {
opt(v)
}
@ -266,7 +331,7 @@ func (v *httpFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons
}
// Connect to backend SRS server via HTTP client.
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
backendURL := v.buildBackendURL(backend.IP, httpPort, r.URL.Path)
req, err := http.NewRequestWithContext(ctx, r.Method, backendURL, nil)
if err != nil {
return errors.Wrapf(err, "create request to %v", backendURL)
@ -319,10 +384,15 @@ type hlsPlayStream struct {
StreamURL string `json:"stream_url"`
// The full request URL for HLS streaming
FullURL string `json:"full_url"`
// buildBackendURL composes the backend HTTP URL for a request path. Defaults
// to buildBackendHTTPURL; tests may override via a functional option.
buildBackendURL func(ip string, port int, path string) string `json:"-"`
}
func newHLSPlayStream(opts ...func(*hlsPlayStream)) *hlsPlayStream {
v := &hlsPlayStream{}
v := &hlsPlayStream{
buildBackendURL: buildBackendHTTPURL,
}
for _, opt := range opts {
opt(v)
}
@ -375,7 +445,7 @@ func (v *hlsPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *htt
func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *lb.OriginServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no rtmp server")
return errors.Errorf("no http server")
}
var httpPort int
@ -386,7 +456,7 @@ func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWrite
}
// Connect to backend SRS server via HTTP client.
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
backendURL := v.buildBackendURL(backend.IP, httpPort, r.URL.Path)
if r.URL.RawQuery != "" {
backendURL += "?" + r.URL.RawQuery
}
@ -425,7 +495,7 @@ func (v *hlsPlayStream) serveByBackend(ctx context.Context, w http.ResponseWrite
// Read all content of m3u8, append the stream ID to ts URL. Note that we only append stream ID to ts
// URL, to identify the stream to specified backend server. The spbhid is the SRS Proxy Backend HLS ID.
b, err := ioutil.ReadAll(resp.Body)
b, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrapf(err, "read stream from %v", backendURL)
}

1289
internal/proxy/http_test.go Normal file

File diff suppressed because it is too large Load Diff