diff --git a/.openclaw/memory/srs-codebase-map.md b/.openclaw/memory/srs-codebase-map.md index 123a373fa..3f100c956 100644 --- a/.openclaw/memory/srs-codebase-map.md +++ b/.openclaw/memory/srs-codebase-map.md @@ -221,7 +221,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine `internal/lb` — Load balancer abstraction and two implementations. Defines `SRSLoadBalancer` interface and core types in `lb.go` (Initialize, Update, Pick, HLS/WebRTC state management) and `SRSServer` struct representing a backend origin (IP, listen endpoints for RTMP/HTTP/API/SRT/RTC, heartbeat tracking). **Memory LB** (`mem.go`) — in-memory using `sync.Map`, sticky random pick per stream URL, single-proxy deployment. **Redis LB** (`redis.go`) — Redis-backed shared state with TTL-based expiration, enables multi-proxy horizontal scaling behind a network load balancer. Also includes a debug helper (`debug.go`) that creates a fake backend from env vars when `PROXY_DEFAULT_BACKEND_ENABLED=on` for development without real SRS registration. -`internal/logger` — Structured logging with context IDs. Four log levels: Verbose (discarded), Debug (stdout), Warn (stderr), Error (stderr). Emits JSON via `log/slog` with `pid` and `cid` attributes. Each connection/request gets a unique 7-char hex context ID for log correlation, stored in `context.Context`. +`internal/logger` — Structured logging with context IDs. Four log levels: Debug/Info (stdout), Warn/Error (stderr). Emits JSON via `log/slog` with `pid` and `cid` attributes. Each connection/request gets a unique 7-char hex context ID for log correlation, stored in `context.Context`. `internal/env` — Environment-based configuration. All settings via env vars (or `.env` file parsed by an in-tree custom parser — no third-party dep; supports comments, `export` prefix, quoted values, escape sequences, and inline comments). Exposes a `ProxyEnvironment` interface (with a counterfeiter-generated fake in `envfakes/` for downstream tests) with methods for each config value. Default ports: RTMP=11935, HTTP API=11985, HTTP Stream=18080, WebRTC=18000, SRT=20080, System API=12025. Timeouts: grace=20s, force=30s. Supports Redis config and default backend config for debugging. diff --git a/internal/bootstrap/proxy.go b/internal/bootstrap/proxy.go index 4173670cf..5bd8fd7e1 100644 --- a/internal/bootstrap/proxy.go +++ b/internal/bootstrap/proxy.go @@ -29,7 +29,7 @@ type proxyBootstrap struct{} // Returns any error encountered during startup. func (b *proxyBootstrap) Start(ctx context.Context) error { ctx = logger.WithContext(ctx) - logger.Df(ctx, "%v-Proxy/%v started", version.Signature(), version.Version()) + logger.Debug(ctx, "%v-Proxy/%v started", version.Signature(), version.Version()) // Install signals. ctx, cancel := context.WithCancel(ctx) @@ -38,11 +38,11 @@ func (b *proxyBootstrap) Start(ctx context.Context) error { // Run the main loop, ignore the user cancel error. err := b.run(ctx) if err != nil && ctx.Err() != context.Canceled { - logger.Ef(ctx, "main: %+v", err) + logger.Error(ctx, "main: %+v", err) return err } - logger.Df(ctx, "%v done", version.Signature()) + logger.Debug(ctx, "%v done", version.Signature()) return nil } diff --git a/internal/debug/pprof.go b/internal/debug/pprof.go index 7142e86fa..b89923e38 100644 --- a/internal/debug/pprof.go +++ b/internal/debug/pprof.go @@ -14,7 +14,7 @@ import ( func HandleGoPprof(ctx context.Context, environment env.ProxyEnvironment) { if addr := environment.GoPprof(); addr != "" { go func() { - logger.Df(ctx, "Start Go pprof at %v", addr) + logger.Debug(ctx, "Start Go pprof at %v", addr) http.ListenAndServe(addr, nil) }() } diff --git a/internal/env/env.go b/internal/env/env.go index 5d6d88928..c7b51e72b 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -177,7 +177,7 @@ func loadEnvFile(ctx context.Context) error { envMap, err := parseEnvFile(".env") if err != nil { if os.IsNotExist(err) { - logger.Df(ctx, "no .env file found, skipping") + logger.Debug(ctx, "no .env file found, skipping") return nil } return errors.Wrapf(err, "load .env file") @@ -190,7 +190,7 @@ func loadEnvFile(ctx context.Context) error { } } - logger.Df(ctx, "successfully loaded .env file") + logger.Debug(ctx, "successfully loaded .env file") return nil } @@ -315,7 +315,7 @@ func buildDefaultEnvironmentVariables(ctx context.Context) { // Default backend udp srt port, for debugging. setEnvDefault("PROXY_DEFAULT_BACKEND_SRT", "10080") - logger.Df(ctx, "load .env as GO_PPROF=%v, "+ + logger.Debug(ctx, "load .env as GO_PPROF=%v, "+ "PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+ "PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+ "PROXY_WEBRTC_SERVER=%v, PROXY_SRT_SERVER=%v, "+ diff --git a/internal/lb/mem.go b/internal/lb/mem.go index 6b4389315..57b4c88b4 100644 --- a/internal/lb/mem.go +++ b/internal/lb/mem.go @@ -65,12 +65,12 @@ func (v *MemoryLoadBalancer) Initialize(ctx context.Context) error { return case <-time.After(30 * time.Second): if err := v.Update(ctx, server); err != nil { - logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err) + logger.Warn(ctx, "update default SRS %+v failed, %+v", server, err) } } } }() - logger.Df(ctx, "MemoryLB: Initialize default SRS media server, %+v", server) + logger.Debug(ctx, "MemoryLB: Initialize default SRS media server, %+v", server) } return nil } diff --git a/internal/lb/redis.go b/internal/lb/redis.go index 26395f5d6..d47bf8982 100644 --- a/internal/lb/redis.go +++ b/internal/lb/redis.go @@ -50,7 +50,7 @@ func (v *RedisLoadBalancer) Initialize(ctx context.Context) error { if err := rdb.Ping(ctx).Err(); err != nil { return errors.Wrapf(err, "unable to connect to redis %v", rdb.String()) } - logger.Df(ctx, "RedisLB: connected to redis %v ok", rdb.String()) + logger.Debug(ctx, "RedisLB: connected to redis %v ok", rdb.String()) server, err := NewDefaultSRSForDebugging(v.environment) if err != nil { @@ -70,12 +70,12 @@ func (v *RedisLoadBalancer) Initialize(ctx context.Context) error { return case <-time.After(30 * time.Second): if err := v.Update(ctx, server); err != nil { - logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err) + logger.Warn(ctx, "update default SRS %+v failed, %+v", server, err) } } } }() - logger.Df(ctx, "RedisLB: Initialize default SRS media server, %+v", server) + logger.Debug(ctx, "RedisLB: Initialize default SRS media server, %+v", server) } return nil } diff --git a/internal/logger/log.go b/internal/logger/log.go index f710653e5..ab16feef6 100644 --- a/internal/logger/log.go +++ b/internal/logger/log.go @@ -9,10 +9,13 @@ import ( "io" "log/slog" "os" + "strings" + + "srsx/internal/version" ) type logger interface { - Printf(ctx context.Context, format string, v ...any) + Log(ctx context.Context, msg string, args ...any) } type loggerPlus struct { @@ -28,95 +31,73 @@ func newLoggerPlus(opts ...func(*loggerPlus)) *loggerPlus { return v } -func (v *loggerPlus) Printf(ctx context.Context, f string, a ...any) { - attrs := []slog.Attr{slog.Int("pid", os.Getpid())} - if cid := ContextID(ctx); cid != "" { - attrs = append(attrs, slog.String("cid", cid)) +func (v *loggerPlus) Log(ctx context.Context, msg string, args ...any) { + attrs := []any{ + "pid", os.Getpid(), + "version", version.Version(), } - v.logger.LogAttrs(ctx, v.level, fmt.Sprintf(f, a...), attrs...) -} -var verboseLogger logger + if cid := ContextID(ctx); cid != "" { + attrs = append(attrs, "cid", cid) + } -func Vf(ctx context.Context, format string, a ...any) { - verboseLogger.Printf(ctx, format, a...) + // Keep compatibility with the old *f call sites while exposing the new + // slog-style API. New code should pass structured key/value args. + if len(args) > 0 && strings.Contains(msg, "%") { + msg = fmt.Sprintf(msg, args...) + args = nil + } + attrs = append(attrs, args...) + v.logger.Log(ctx, v.level, msg, attrs...) } var debugLogger logger -func Df(ctx context.Context, format string, a ...any) { - debugLogger.Printf(ctx, format, a...) +func Debug(ctx context.Context, msg string, args ...any) { + debugLogger.Log(ctx, msg, args...) +} + +var infoLogger logger + +func Info(ctx context.Context, msg string, args ...any) { + infoLogger.Log(ctx, msg, args...) } var warnLogger logger -func Wf(ctx context.Context, format string, a ...any) { - warnLogger.Printf(ctx, format, a...) +func Warn(ctx context.Context, msg string, args ...any) { + warnLogger.Log(ctx, msg, args...) } var errorLogger logger -func Ef(ctx context.Context, format string, a ...any) { - errorLogger.Printf(ctx, format, a...) +func Error(ctx context.Context, msg string, args ...any) { + errorLogger.Log(ctx, msg, args...) } -const ( - levelVerb slog.Level = slog.LevelDebug - 4 - levelDebug slog.Level = slog.LevelDebug - levelWarn slog.Level = slog.LevelWarn - levelError slog.Level = slog.LevelError -) - -// newJSONLogger builds a slog.Logger that writes JSON records to w, renders the -// time in UTC, and maps our custom levels to short lowercase labels. +// newJSONLogger builds a slog.Logger that writes JSON records to w. func newJSONLogger(w io.Writer) *slog.Logger { h := slog.NewJSONHandler(w, &slog.HandlerOptions{ - Level: levelVerb, - ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { - if len(groups) != 0 { - return a - } - switch a.Key { - case slog.TimeKey: - return slog.Time(slog.TimeKey, a.Value.Time().UTC()) - case slog.LevelKey: - return slog.String(slog.LevelKey, levelLabel(a.Value.Any().(slog.Level))) - } - return a - }, + Level: slog.LevelDebug, }) return slog.New(h) } -func levelLabel(l slog.Level) string { - switch l { - case levelVerb: - return "verb" - case levelDebug: - return "debug" - case levelWarn: - return "warn" - case levelError: - return "error" - } - return l.String() -} - func init() { - verboseLogger = newLoggerPlus(func(l *loggerPlus) { - l.logger = newJSONLogger(io.Discard) - l.level = levelVerb - }) debugLogger = newLoggerPlus(func(l *loggerPlus) { l.logger = newJSONLogger(os.Stdout) - l.level = levelDebug + l.level = slog.LevelDebug + }) + infoLogger = newLoggerPlus(func(l *loggerPlus) { + l.logger = newJSONLogger(os.Stdout) + l.level = slog.LevelInfo }) warnLogger = newLoggerPlus(func(l *loggerPlus) { l.logger = newJSONLogger(os.Stderr) - l.level = levelWarn + l.level = slog.LevelWarn }) errorLogger = newLoggerPlus(func(l *loggerPlus) { l.logger = newJSONLogger(os.Stderr) - l.level = levelError + l.level = slog.LevelError }) } diff --git a/internal/logger/log_test.go b/internal/logger/log_test.go index 626cb73f2..90be1f5d2 100644 --- a/internal/logger/log_test.go +++ b/internal/logger/log_test.go @@ -10,7 +10,6 @@ import ( "io" "log/slog" "os" - "strings" "testing" "time" ) @@ -31,39 +30,15 @@ func bufLoggerPlus(w io.Writer, level slog.Level) *loggerPlus { }) } -func TestLevelLabel_Known(t *testing.T) { - cases := map[slog.Level]string{ - levelVerb: "verb", - levelDebug: "debug", - levelWarn: "warn", - levelError: "error", - } - for lvl, want := range cases { - if got := levelLabel(lvl); got != want { - t.Errorf("levelLabel(%v) = %q, want %q", lvl, got, want) - } - } -} - -func TestLevelLabel_UnknownFallsBackToString(t *testing.T) { - got := levelLabel(slog.Level(99)) - if got == "" { - t.Fatalf("levelLabel(99) returned empty") - } - if got == "verb" || got == "debug" || got == "warn" || got == "error" { - t.Fatalf("levelLabel(99) = %q, want slog.Level.String() form", got) - } -} - -func TestPrintf_EmitsAllFields(t *testing.T) { +func TestLog_EmitsAllFields(t *testing.T) { var buf bytes.Buffer - lp := bufLoggerPlus(&buf, levelDebug) + lp := bufLoggerPlus(&buf, slog.LevelDebug) ctx := withContextID(context.Background(), "abc1234") - lp.Printf(ctx, "hello %s %d", "world", 42) + lp.Log(ctx, "hello %s %d", "world", 42) m := decodeLine(t, buf.Bytes()) - if m["level"] != "debug" { - t.Errorf("level = %v, want debug", m["level"]) + if m["level"] != "DEBUG" { + t.Errorf("level = %v, want DEBUG", m["level"]) } if m["msg"] != "hello world 42" { t.Errorf("msg = %v, want %q", m["msg"], "hello world 42") @@ -76,40 +51,56 @@ func TestPrintf_EmitsAllFields(t *testing.T) { t.Errorf("pid = %v, want %d", m["pid"], os.Getpid()) } ts, ok := m["time"].(string) - if !ok || !strings.HasSuffix(ts, "Z") { - t.Errorf("time = %v, want UTC suffix Z", m["time"]) + if !ok { + t.Fatalf("time = %v, want string", m["time"]) } if _, err := time.Parse(time.RFC3339Nano, ts); err != nil { t.Errorf("time %q not RFC3339Nano: %v", ts, err) } } -func TestPrintf_OmitsCIDWhenAbsent(t *testing.T) { +func TestLog_OmitsCIDWhenAbsent(t *testing.T) { var buf bytes.Buffer - bufLoggerPlus(&buf, levelWarn).Printf(context.Background(), "no cid here") + bufLoggerPlus(&buf, slog.LevelWarn).Log(context.Background(), "no cid here") m := decodeLine(t, buf.Bytes()) if v, present := m["cid"]; present { t.Errorf("cid should be absent, got %v", v) } - if m["level"] != "warn" { - t.Errorf("level = %v, want warn", m["level"]) + if m["level"] != "WARN" { + t.Errorf("level = %v, want WARN", m["level"]) } } -func TestPrintf_AllLevelsMapToLabel(t *testing.T) { +func TestLog_EmitsStructuredArgs(t *testing.T) { + var buf bytes.Buffer + bufLoggerPlus(&buf, slog.LevelInfo).Log(context.Background(), "hello", "stream", "live/livestream", "retry", 2) + + m := decodeLine(t, buf.Bytes()) + if m["msg"] != "hello" { + t.Errorf("msg = %v, want hello", m["msg"]) + } + if m["stream"] != "live/livestream" { + t.Errorf("stream = %v, want live/livestream", m["stream"]) + } + if retry, ok := m["retry"].(float64); !ok || retry != 2 { + t.Errorf("retry = %v, want 2", m["retry"]) + } +} + +func TestLog_AllLevelsMapToLabel(t *testing.T) { cases := []struct { level slog.Level label string }{ - {levelVerb, "verb"}, - {levelDebug, "debug"}, - {levelWarn, "warn"}, - {levelError, "error"}, + {slog.LevelInfo, "INFO"}, + {slog.LevelDebug, "DEBUG"}, + {slog.LevelWarn, "WARN"}, + {slog.LevelError, "ERROR"}, } for _, tc := range cases { var buf bytes.Buffer - bufLoggerPlus(&buf, tc.level).Printf(context.Background(), "hi") + bufLoggerPlus(&buf, tc.level).Log(context.Background(), "hi") m := decodeLine(t, buf.Bytes()) if m["level"] != tc.label { t.Errorf("level(%v) rendered as %v, want %q", tc.level, m["level"], tc.label) @@ -120,7 +111,7 @@ func TestPrintf_AllLevelsMapToLabel(t *testing.T) { func TestNewJSONLogger_GroupedAttrsPassThrough(t *testing.T) { var buf bytes.Buffer lg := newJSONLogger(&buf) - lg.LogAttrs(context.Background(), levelDebug, "grouped", + lg.LogAttrs(context.Background(), slog.LevelDebug, "grouped", slog.Group("meta", slog.String("inner", "v"))) m := decodeLine(t, buf.Bytes()) @@ -134,22 +125,22 @@ func TestNewJSONLogger_GroupedAttrsPassThrough(t *testing.T) { } func TestPackageWrappers_RouteToRightLogger(t *testing.T) { - origV, origD, origW, origE := verboseLogger, debugLogger, warnLogger, errorLogger + origI, origD, origW, origE := infoLogger, debugLogger, warnLogger, errorLogger t.Cleanup(func() { - verboseLogger, debugLogger, warnLogger, errorLogger = origV, origD, origW, origE + infoLogger, debugLogger, warnLogger, errorLogger = origI, origD, origW, origE }) - vBuf, dBuf, wBuf, eBuf := &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{} - verboseLogger = bufLoggerPlus(vBuf, levelVerb) - debugLogger = bufLoggerPlus(dBuf, levelDebug) - warnLogger = bufLoggerPlus(wBuf, levelWarn) - errorLogger = bufLoggerPlus(eBuf, levelError) + iBuf, dBuf, wBuf, eBuf := &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{} + infoLogger = bufLoggerPlus(iBuf, slog.LevelInfo) + debugLogger = bufLoggerPlus(dBuf, slog.LevelDebug) + warnLogger = bufLoggerPlus(wBuf, slog.LevelWarn) + errorLogger = bufLoggerPlus(eBuf, slog.LevelError) ctx := context.Background() - Vf(ctx, "v=%d", 1) - Df(ctx, "d=%d", 2) - Wf(ctx, "w=%d", 3) - Ef(ctx, "e=%d", 4) + Info(ctx, "v=%d", 1) + Debug(ctx, "d=%d", 2) + Warn(ctx, "w=%d", 3) + Error(ctx, "e=%d", 4) checks := []struct { name string @@ -157,10 +148,10 @@ func TestPackageWrappers_RouteToRightLogger(t *testing.T) { label string msg string }{ - {"Vf", vBuf, "verb", "v=1"}, - {"Df", dBuf, "debug", "d=2"}, - {"Wf", wBuf, "warn", "w=3"}, - {"Ef", eBuf, "error", "e=4"}, + {"Info", iBuf, "INFO", "v=1"}, + {"Debug", dBuf, "DEBUG", "d=2"}, + {"Warn", wBuf, "WARN", "w=3"}, + {"Error", eBuf, "ERROR", "e=4"}, } for _, c := range checks { m := decodeLine(t, c.buf.Bytes()) diff --git a/internal/protocol/api.go b/internal/protocol/api.go index 3a013493f..d1f3ef26d 100644 --- a/internal/protocol/api.go +++ b/internal/protocol/api.go @@ -63,7 +63,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error { // Create server and handler. mux := http.NewServeMux() v.server = &http.Server{Addr: addr, Handler: mux} - logger.Df(ctx, "HTTP API server listen at %v", addr) + logger.Debug(ctx, "HTTP API server listen at %v", addr) // Shutdown the server gracefully when quiting. go func() { @@ -77,7 +77,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error { }() // The basic version handler, also can be used as health check API. - logger.Df(ctx, "Handle /api/v1/versions by %v", addr) + logger.Debug(ctx, "Handle /api/v1/versions by %v", addr) mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) { utils.ApiResponse(ctx, w, r, map[string]string{ "signature": version.Signature(), @@ -86,7 +86,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error { }) // The WebRTC WHIP API handler. - logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr) + logger.Debug(ctx, "Handle /rtc/v1/whip/ by %v", addr) mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) { if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil { utils.ApiError(ctx, w, r, err) @@ -94,7 +94,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error { }) // The WebRTC WHEP API handler. - logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr) + logger.Debug(ctx, "Handle /rtc/v1/whep/ by %v", addr) mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) { if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil { utils.ApiError(ctx, w, r, err) @@ -109,12 +109,12 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error { err := v.server.ListenAndServe() if err != nil { if err == http.ErrServerClosed { - logger.Df(ctx, "HTTP API server done") + logger.Debug(ctx, "HTTP API server done") } else if ctx.Err() != nil { - logger.Df(ctx, "HTTP API server done with context canceled") + logger.Debug(ctx, "HTTP API server done with context canceled") } else { // TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "HTTP API accept err %+v", err) + logger.Warn(ctx, "HTTP API accept err %+v", err) } } }() @@ -163,7 +163,7 @@ func (v *systemAPI) Run(ctx context.Context) error { // Create server and handler. mux := http.NewServeMux() v.server = &http.Server{Addr: addr, Handler: mux} - logger.Df(ctx, "System API server listen at %v", addr) + logger.Debug(ctx, "System API server listen at %v", addr) // Shutdown the server gracefully when quiting. go func() { @@ -177,7 +177,7 @@ func (v *systemAPI) Run(ctx context.Context) error { }() // The basic version handler, also can be used as health check API. - logger.Df(ctx, "Handle /api/v1/versions by %v", addr) + logger.Debug(ctx, "Handle /api/v1/versions by %v", addr) mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) { utils.ApiResponse(ctx, w, r, map[string]string{ "signature": version.Signature(), @@ -186,7 +186,7 @@ func (v *systemAPI) Run(ctx context.Context) error { }) // The register service for SRS media servers. - logger.Df(ctx, "Handle /api/v1/srs/register by %v", addr) + logger.Debug(ctx, "Handle /api/v1/srs/register by %v", addr) mux.HandleFunc("/api/v1/srs/register", func(w http.ResponseWriter, r *http.Request) { if err := func() error { var deviceID, ip, serverID, serviceID, pid string @@ -247,7 +247,7 @@ func (v *systemAPI) Run(ctx context.Context) error { return errors.Wrapf(err, "update SRS server %+v", server) } - logger.Df(ctx, "Register SRS media server, %+v", server) + logger.Debug(ctx, "Register SRS media server, %+v", server) return nil }(); err != nil { utils.ApiError(ctx, w, r, err) @@ -271,12 +271,12 @@ func (v *systemAPI) Run(ctx context.Context) error { err := v.server.ListenAndServe() if err != nil { if err == http.ErrServerClosed { - logger.Df(ctx, "System API server done") + logger.Debug(ctx, "System API server done") } else if ctx.Err() != nil { - logger.Df(ctx, "System API server done with context canceled") + logger.Debug(ctx, "System API server done with context canceled") } else { // TODO: If System API server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "System API accept err %+v", err) + logger.Warn(ctx, "System API accept err %+v", err) } } }() diff --git a/internal/protocol/http.go b/internal/protocol/http.go index 9112a573f..a145c551e 100644 --- a/internal/protocol/http.go +++ b/internal/protocol/http.go @@ -64,7 +64,7 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { // Create server and handler. mux := http.NewServeMux() v.server = &http.Server{Addr: addr, Handler: mux} - logger.Df(ctx, "HTTP Stream server listen at %v", addr) + logger.Debug(ctx, "HTTP Stream server listen at %v", addr) // Shutdown the server gracefully when quiting. go func() { @@ -78,7 +78,7 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { }() // The basic version handler, also can be used as health check API. - logger.Df(ctx, "Handle /api/v1/versions by %v", addr) + logger.Debug(ctx, "Handle /api/v1/versions by %v", addr) mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) { type Response struct { Code int `json:"code"` @@ -108,11 +108,11 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { } staticServer = http.FileServer(http.Dir(staticFiles)) - logger.Df(ctx, "Handle static files at %v", staticFiles) + logger.Debug(ctx, "Handle static files at %v", staticFiles) } // The default handler, for both static web server and streaming server. - logger.Df(ctx, "Handle / by %v", addr) + logger.Debug(ctx, "Handle / by %v", addr) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { // For HLS streaming, we will proxy the request to the streaming server. if strings.HasSuffix(r.URL.Path, ".m3u8") { @@ -169,12 +169,12 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error { err := v.server.ListenAndServe() if err != nil { if err == http.ErrServerClosed { - logger.Df(ctx, "HTTP Stream server done") + logger.Debug(ctx, "HTTP Stream server done") } else if ctx.Err() != nil { - logger.Df(ctx, "HTTP Stream server done with context canceled") + logger.Debug(ctx, "HTTP Stream server done with context canceled") } else { // TODO: If HTTP Stream server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "HTTP Stream accept err %+v", err) + logger.Warn(ctx, "HTTP Stream accept err %+v", err) } } }() @@ -208,7 +208,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) if err := v.serve(ctx, w, r); err != nil { utils.ApiError(ctx, w, r, err) } else { - logger.Df(ctx, "HTTP client done") + logger.Debug(ctx, "HTTP client done") } } @@ -220,7 +220,7 @@ func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, // Build the stream URL in vhost/app/stream schema. unifiedURL, fullURL := utils.ConvertURLToStreamURL(r) - logger.Df(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL) + logger.Debug(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL) streamURL, err := utils.BuildStreamURL(unifiedURL) if err != nil { @@ -278,7 +278,7 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons } } - logger.Df(ctx, "HTTP start streaming") + logger.Debug(ctx, "HTTP start streaming") // Proxy the stream from backend to client. if _, err := io.Copy(w, resp.Body); err != nil { @@ -332,7 +332,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := v.serve(v.ctx, w, r); err != nil { utils.ApiError(v.ctx, w, r, err) } else { - logger.Df(v.ctx, "HLS client %v for %v with %v done", + logger.Debug(v.ctx, "HLS client %v for %v with %v done", v.SRSProxyBackendHLSID, v.StreamURL, r.URL.Path) } } diff --git a/internal/protocol/rtc.go b/internal/protocol/rtc.go index 22a6b6e0c..51792f9ca 100644 --- a/internal/protocol/rtc.go +++ b/internal/protocol/rtc.go @@ -82,7 +82,7 @@ func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseW // Build the stream URL in vhost/app/stream schema. unifiedURL, fullURL := utils.ConvertURLToStreamURL(r) - logger.Df(ctx, "Got WebRTC WHIP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL) + logger.Debug(ctx, "Got WebRTC WHIP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL) streamURL, err := utils.BuildStreamURL(unifiedURL) if err != nil { @@ -119,7 +119,7 @@ func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseW // Build the stream URL in vhost/app/stream schema. unifiedURL, fullURL := utils.ConvertURLToStreamURL(r) - logger.Df(ctx, "Got WebRTC WHEP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL) + logger.Debug(ctx, "Got WebRTC WHEP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL) streamURL, err := utils.BuildStreamURL(unifiedURL) if err != nil { @@ -234,7 +234,7 @@ func (v *srsWebRTCServer) proxyApiToBackend( return errors.Wrapf(err, "write local sdp answer %v", localSDPAnswer) } - logger.Df(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB", + logger.Debug(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB", len(localSDPAnswer), localICEUfrag, len(localICEPwd)) return nil } @@ -256,7 +256,7 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error { return errors.Wrapf(err, "listen udp %v", saddr) } v.listener = listener - logger.Df(ctx, "WebRTC server listen at %v", saddr) + logger.Debug(ctx, "WebRTC server listen at %v", saddr) // Consume all messages from UDP media transport. v.wg.Add(1) @@ -269,17 +269,17 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error { if err != nil { // If context is canceled or connection is closed, exit gracefully without logging error. if ctx.Err() != nil || utils.IsClosedNetworkError(err) { - logger.Df(ctx, "WebRTC server done") + logger.Debug(ctx, "WebRTC server done") return } // TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "WebRTC read from udp failed, err=%+v", err) + logger.Warn(ctx, "WebRTC read from udp failed, err=%+v", err) time.Sleep(1 * time.Second) continue } if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil { - logger.Wf(ctx, "WebRTC handle udp %vB failed, addr=%v, err=%+v", n, caddr, err) + logger.Warn(ctx, "WebRTC handle udp %vB failed, addr=%v, err=%+v", n, caddr, err) } } }() @@ -312,7 +312,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username) } else { connection = s.(*RTCConnection).Initialize(ctx, v.listener) - logger.Df(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL) + logger.Debug(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL) } // Cache connection for fast search. @@ -418,13 +418,13 @@ func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error { n, _, err := v.backendUDP.ReadFromUDP(buf) if err != nil { // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "read from backend failed, err=%v", err) + logger.Warn(ctx, "read from backend failed, err=%v", err) break } if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil { // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "write to client failed, err=%v", err) + logger.Warn(ctx, "write to client failed, err=%v", err) break } } diff --git a/internal/protocol/rtmp.go b/internal/protocol/rtmp.go index 978335034..d5c554b7f 100644 --- a/internal/protocol/rtmp.go +++ b/internal/protocol/rtmp.go @@ -65,7 +65,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { return errors.Wrapf(err, "listen rtmp addr %v", addr) } v.listener = listener - logger.Df(ctx, "RTMP server listen at %v", addr) + logger.Debug(ctx, "RTMP server listen at %v", addr) v.wg.Add(1) go func() { @@ -76,10 +76,10 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { if err != nil { // If context is canceled or connection is closed, exit gracefully without logging error. if ctx.Err() != nil || utils.IsClosedNetworkError(err) { - logger.Df(ctx, "RTMP server done") + logger.Debug(ctx, "RTMP server done") } else { // TODO: If RTMP server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "RTMP server accept err %+v", err) + logger.Warn(ctx, "RTMP server accept err %+v", err) } return } @@ -91,9 +91,9 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { handleErr := func(err error) { if utils.IsPeerClosedError(err) || utils.IsClosedNetworkError(err) { - logger.Df(ctx, "RTMP connection closed") + logger.Debug(ctx, "RTMP connection closed") } else { - logger.Wf(ctx, "RTMP serve err %+v", err) + logger.Warn(ctx, "RTMP serve err %+v", err) } } @@ -101,7 +101,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error { if err := rc.serve(ctx, conn); err != nil { handleErr(err) } else { - logger.Df(ctx, "RTMP client done") + logger.Debug(ctx, "RTMP client done") } }(logger.WithContext(ctx), conn) } @@ -128,7 +128,7 @@ func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection { } func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { - logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr()) + logger.Debug(ctx, "Got RTMP client from %v", conn.RemoteAddr()) // If any goroutine quit, cancel another one. parentCtx := ctx @@ -168,7 +168,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { } client := rtmp.NewProtocol(conn) - logger.Df(ctx, "RTMP simple handshake done") + logger.Debug(ctx, "RTMP simple handshake done") // Expect RTMP connect command with tcUrl. var connectReq *rtmp.ConnectAppPacket @@ -209,7 +209,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { } tcUrl := connectReq.TcUrl() - logger.Df(ctx, "RTMP connect app %v", tcUrl) + logger.Debug(ctx, "RTMP connect app %v", tcUrl) // Expect RTMP command to identify the client, a publisher or viewer. var currentStreamID, nextStreamID int @@ -285,7 +285,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { // Update the stream ID for next request. currentStreamID = nextStreamID } - logger.Df(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v", + logger.Debug(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v", tcUrl, streamName, currentStreamID, clientType) // Find a backend SRS server to proxy the RTMP stream. @@ -333,7 +333,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { return errors.Wrapf(err, "start play") } } - logger.Df(ctx, "RTMP start streaming") + logger.Debug(ctx, "RTMP start streaming") // For all proxy goroutines. var wg sync.WaitGroup @@ -352,7 +352,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { if err != nil { return errors.Wrapf(err, "read message") } - //logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) + //logger.Debug(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) // TODO: Update the stream ID if not the same. if err := client.WriteMessage(ctx, m); err != nil { @@ -375,7 +375,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { if err != nil { return errors.Wrapf(err, "read message") } - //logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) + //logger.Debug(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) // TODO: Update the stream ID if not the same. if err := backend.client.WriteMessage(ctx, m); err != nil { @@ -392,7 +392,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { if r0 != nil { // If backend connection closed normally, treat as normal disconnection if utils.IsClosedNetworkError(r0) || utils.IsPeerClosedError(r0) { - logger.Df(ctx, "RTMP backend disconnected") + logger.Debug(ctx, "RTMP backend disconnected") return nil } return errors.Wrapf(r0, "proxy backend->client") @@ -400,7 +400,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { if r1 != nil { // If client connection closed normally, treat as normal disconnection if utils.IsClosedNetworkError(r1) || utils.IsPeerClosedError(r1) { - logger.Df(ctx, "RTMP client disconnected") + logger.Debug(ctx, "RTMP client disconnected") return nil } return errors.Wrapf(r1, "proxy client->backend") @@ -495,7 +495,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str if _, err = hs.ReadC2S2(c); err != nil { return errors.Wrapf(err, "read c2") } - logger.Df(ctx, "backend simple handshake done, server=%v", addr) + logger.Debug(ctx, "backend simple handshake done, server=%v", addr) if err := hs.WriteC2S2(c, hs.C1S1()); err != nil { return errors.Wrapf(err, "write c2") @@ -515,7 +515,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str if _, err := rtmp.ExpectPacket(ctx, client, &connectAppRes); err != nil { return errors.Wrapf(err, "expect connect app res") } - logger.Df(ctx, "backend connect RTMP app, tcUrl=%v, id=%v", tcUrl, connectAppRes.SrsID()) + logger.Debug(ctx, "backend connect RTMP app, tcUrl=%v, id=%v", tcUrl, connectAppRes.SrsID()) } // Play or view RTMP stream with server. @@ -615,7 +615,7 @@ func (v *RTMPClientToBackend) publish(ctx context.Context, client *rtmp.Protocol break } } - logger.Df(ctx, "backend publish stream=%v, sid=%v", streamName, currentStreamID) + logger.Debug(ctx, "backend publish stream=%v, sid=%v", streamName, currentStreamID) return nil } diff --git a/internal/protocol/srt.go b/internal/protocol/srt.go index e6a877620..cc9324f69 100644 --- a/internal/protocol/srt.go +++ b/internal/protocol/srt.go @@ -78,7 +78,7 @@ func (v *srsSRTServer) Run(ctx context.Context) error { return errors.Wrapf(err, "listen udp %v", saddr) } v.listener = listener - logger.Df(ctx, "SRT server listen at %v", saddr) + logger.Debug(ctx, "SRT server listen at %v", saddr) // Consume all messages from UDP media transport. v.wg.Add(1) @@ -91,17 +91,17 @@ func (v *srsSRTServer) Run(ctx context.Context) error { if err != nil { // If context is canceled or connection is closed, exit gracefully without logging error. if ctx.Err() != nil || utils.IsClosedNetworkError(err) { - logger.Df(ctx, "SRT server done") + logger.Debug(ctx, "SRT server done") return } // TODO: If SRT server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "SRT read from udp failed, err=%+v", err) + logger.Warn(ctx, "SRT read from udp failed, err=%+v", err) time.Sleep(1 * time.Second) continue } if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil { - logger.Wf(ctx, "SRT handle udp %vB failed, addr=%v, err=%+v", n, caddr, err) + logger.Warn(ctx, "SRT handle udp %vB failed, addr=%v, err=%+v", n, caddr, err) } } }() @@ -132,7 +132,7 @@ func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, d ctx = conn.ctx if !ok { - logger.Df(ctx, "Create new SRT connection skt=%v", socketID) + logger.Debug(ctx, "Create new SRT connection skt=%v", socketID) } if newSocketID, err := conn.HandlePacket(pkt, addr, data); err != nil { @@ -213,7 +213,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa if pkt.SynCookie == 0 { // Save handshake 0 packet. v.handshake0 = pkt - logger.Df(ctx, "SRT Handshake 0: %v", v.handshake0) + logger.Debug(ctx, "SRT Handshake 0: %v", v.handshake0) // Response handshake 1. v.handshake1 = &SRTHandshakePacket{ @@ -234,7 +234,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa SynCookie: 0x418d5e4e, PeerIP: net.ParseIP("127.0.0.1"), } - logger.Df(ctx, "SRT Handshake 1: %v", v.handshake1) + logger.Debug(ctx, "SRT Handshake 1: %v", v.handshake1) if b, err := v.handshake1.MarshalBinary(); err != nil { return errors.Wrapf(err, "marshal handshake 1") @@ -254,7 +254,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa // Save handshake packet. v.handshake2 = pkt - logger.Df(ctx, "SRT Handshake 2: %v, sid=%v", v.handshake2, streamID) + logger.Debug(ctx, "SRT Handshake 2: %v, sid=%v", v.handshake2, streamID) // Start the UDP proxy to backend. if err := v.connectBackend(ctx, streamID); err != nil { @@ -272,7 +272,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa } else if _, err = v.backendUDP.Write(b); err != nil { return errors.Wrapf(err, "write handshake 0") } - logger.Df(ctx, "Proxy send handshake 0: %v", v.handshake0) + logger.Debug(ctx, "Proxy send handshake 0: %v", v.handshake0) // Read handshake 1 from backend server. b := make([]byte, 4096) @@ -282,7 +282,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa } else if err := handshake1p.UnmarshalBinary(b[:nn]); err != nil { return errors.Wrapf(err, "unmarshal handshake 1") } - logger.Df(ctx, "Proxy got handshake 1: %v", handshake1p) + logger.Debug(ctx, "Proxy got handshake 1: %v", handshake1p) // Proxy handshake 2 to backend server. handshake2p := *v.handshake2 @@ -292,7 +292,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa } else if _, err = v.backendUDP.Write(b); err != nil { return errors.Wrapf(err, "write handshake 2") } - logger.Df(ctx, "Proxy send handshake 2: %v", handshake2p) + logger.Debug(ctx, "Proxy send handshake 2: %v", handshake2p) // Read handshake 3 from backend server. handshake3p := &SRTHandshakePacket{} @@ -301,13 +301,13 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa } else if err := handshake3p.UnmarshalBinary(b[:nn]); err != nil { return errors.Wrapf(err, "unmarshal handshake 3") } - logger.Df(ctx, "Proxy got handshake 3: %v", handshake3p) + logger.Debug(ctx, "Proxy got handshake 3: %v", handshake3p) // Response handshake 3 to client. v.handshake3 = &*handshake3p v.handshake3.SynCookie = v.handshake1.SynCookie v.socketID = handshake3p.SRTSocketID - logger.Df(ctx, "Handshake 3: %v", v.handshake3) + logger.Debug(ctx, "Handshake 3: %v", v.handshake3) if b, err := v.handshake3.MarshalBinary(); err != nil { return errors.Wrapf(err, "marshal handshake 3") @@ -322,12 +322,12 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa nn, err := v.backendUDP.Read(b) if err != nil { // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "read from backend failed, err=%v", err) + logger.Warn(ctx, "read from backend failed, err=%v", err) return } if _, err = v.listenerUDP.WriteToUDP(b[:nn], addr); err != nil { // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "write to client failed, err=%v", err) + logger.Warn(ctx, "write to client failed, err=%v", err) return } } diff --git a/internal/signal/signal.go b/internal/signal/signal.go index 37855e4aa..b8930480b 100644 --- a/internal/signal/signal.go +++ b/internal/signal/signal.go @@ -27,7 +27,7 @@ func InstallSignals(ctx context.Context, cancel context.CancelFunc) { go func() { for s := range sc { - logger.Df(ctx, "Got signal %v", s) + logger.Debug(ctx, "Got signal %v", s) cancel() } }() @@ -45,7 +45,7 @@ func InstallForceQuit(ctx context.Context, environment env.ProxyEnvironment) err go func() { <-ctx.Done() time.Sleep(forceTimeout) - logger.Wf(ctx, "Force to exit by timeout") + logger.Warn(ctx, "Force to exit by timeout") osExit(1) }() return nil diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 4b86edda0..367284a64 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -40,7 +40,7 @@ func ApiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, da } func ApiError(ctx context.Context, w http.ResponseWriter, r *http.Request, err error) { - logger.Wf(ctx, "HTTP API error %+v", err) + logger.Warn(ctx, "HTTP API error %+v", err) w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, "%v\n", err)