Codex: Refine logger API.

This commit is contained in:
winlin 2026-04-26 21:14:41 -04:00
parent 34e705dd1c
commit 9dc4735662
15 changed files with 173 additions and 201 deletions

View File

@ -221,7 +221,7 @@ The next-generation server (`cmd/` + `internal/`) is written in Go and maintaine
`internal/lb` — Load balancer abstraction and two implementations. Defines `SRSLoadBalancer` interface and core types in `lb.go` (Initialize, Update, Pick, HLS/WebRTC state management) and `SRSServer` struct representing a backend origin (IP, listen endpoints for RTMP/HTTP/API/SRT/RTC, heartbeat tracking). **Memory LB** (`mem.go`) — in-memory using `sync.Map`, sticky random pick per stream URL, single-proxy deployment. **Redis LB** (`redis.go`) — Redis-backed shared state with TTL-based expiration, enables multi-proxy horizontal scaling behind a network load balancer. Also includes a debug helper (`debug.go`) that creates a fake backend from env vars when `PROXY_DEFAULT_BACKEND_ENABLED=on` for development without real SRS registration.
`internal/logger` — Structured logging with context IDs. Four log levels: Verbose (discarded), Debug (stdout), Warn (stderr), Error (stderr). Emits JSON via `log/slog` with `pid` and `cid` attributes. Each connection/request gets a unique 7-char hex context ID for log correlation, stored in `context.Context`.
`internal/logger` — Structured logging with context IDs. Four log levels: Debug/Info (stdout), Warn/Error (stderr). Emits JSON via `log/slog` with `pid` and `cid` attributes. Each connection/request gets a unique 7-char hex context ID for log correlation, stored in `context.Context`.
`internal/env` — Environment-based configuration. All settings via env vars (or `.env` file parsed by an in-tree custom parser — no third-party dep; supports comments, `export` prefix, quoted values, escape sequences, and inline comments). Exposes a `ProxyEnvironment` interface (with a counterfeiter-generated fake in `envfakes/` for downstream tests) with methods for each config value. Default ports: RTMP=11935, HTTP API=11985, HTTP Stream=18080, WebRTC=18000, SRT=20080, System API=12025. Timeouts: grace=20s, force=30s. Supports Redis config and default backend config for debugging.

View File

@ -29,7 +29,7 @@ type proxyBootstrap struct{}
// Returns any error encountered during startup.
func (b *proxyBootstrap) Start(ctx context.Context) error {
ctx = logger.WithContext(ctx)
logger.Df(ctx, "%v-Proxy/%v started", version.Signature(), version.Version())
logger.Debug(ctx, "%v-Proxy/%v started", version.Signature(), version.Version())
// Install signals.
ctx, cancel := context.WithCancel(ctx)
@ -38,11 +38,11 @@ func (b *proxyBootstrap) Start(ctx context.Context) error {
// Run the main loop, ignore the user cancel error.
err := b.run(ctx)
if err != nil && ctx.Err() != context.Canceled {
logger.Ef(ctx, "main: %+v", err)
logger.Error(ctx, "main: %+v", err)
return err
}
logger.Df(ctx, "%v done", version.Signature())
logger.Debug(ctx, "%v done", version.Signature())
return nil
}

View File

@ -14,7 +14,7 @@ import (
func HandleGoPprof(ctx context.Context, environment env.ProxyEnvironment) {
if addr := environment.GoPprof(); addr != "" {
go func() {
logger.Df(ctx, "Start Go pprof at %v", addr)
logger.Debug(ctx, "Start Go pprof at %v", addr)
http.ListenAndServe(addr, nil)
}()
}

6
internal/env/env.go vendored
View File

@ -177,7 +177,7 @@ func loadEnvFile(ctx context.Context) error {
envMap, err := parseEnvFile(".env")
if err != nil {
if os.IsNotExist(err) {
logger.Df(ctx, "no .env file found, skipping")
logger.Debug(ctx, "no .env file found, skipping")
return nil
}
return errors.Wrapf(err, "load .env file")
@ -190,7 +190,7 @@ func loadEnvFile(ctx context.Context) error {
}
}
logger.Df(ctx, "successfully loaded .env file")
logger.Debug(ctx, "successfully loaded .env file")
return nil
}
@ -315,7 +315,7 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
// Default backend udp srt port, for debugging.
setEnvDefault("PROXY_DEFAULT_BACKEND_SRT", "10080")
logger.Df(ctx, "load .env as GO_PPROF=%v, "+
logger.Debug(ctx, "load .env as GO_PPROF=%v, "+
"PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+
"PROXY_WEBRTC_SERVER=%v, PROXY_SRT_SERVER=%v, "+

View File

@ -65,12 +65,12 @@ func (v *MemoryLoadBalancer) Initialize(ctx context.Context) error {
return
case <-time.After(30 * time.Second):
if err := v.Update(ctx, server); err != nil {
logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err)
logger.Warn(ctx, "update default SRS %+v failed, %+v", server, err)
}
}
}
}()
logger.Df(ctx, "MemoryLB: Initialize default SRS media server, %+v", server)
logger.Debug(ctx, "MemoryLB: Initialize default SRS media server, %+v", server)
}
return nil
}

View File

@ -50,7 +50,7 @@ func (v *RedisLoadBalancer) Initialize(ctx context.Context) error {
if err := rdb.Ping(ctx).Err(); err != nil {
return errors.Wrapf(err, "unable to connect to redis %v", rdb.String())
}
logger.Df(ctx, "RedisLB: connected to redis %v ok", rdb.String())
logger.Debug(ctx, "RedisLB: connected to redis %v ok", rdb.String())
server, err := NewDefaultSRSForDebugging(v.environment)
if err != nil {
@ -70,12 +70,12 @@ func (v *RedisLoadBalancer) Initialize(ctx context.Context) error {
return
case <-time.After(30 * time.Second):
if err := v.Update(ctx, server); err != nil {
logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err)
logger.Warn(ctx, "update default SRS %+v failed, %+v", server, err)
}
}
}
}()
logger.Df(ctx, "RedisLB: Initialize default SRS media server, %+v", server)
logger.Debug(ctx, "RedisLB: Initialize default SRS media server, %+v", server)
}
return nil
}

View File

@ -9,10 +9,13 @@ import (
"io"
"log/slog"
"os"
"strings"
"srsx/internal/version"
)
type logger interface {
Printf(ctx context.Context, format string, v ...any)
Log(ctx context.Context, msg string, args ...any)
}
type loggerPlus struct {
@ -28,95 +31,73 @@ func newLoggerPlus(opts ...func(*loggerPlus)) *loggerPlus {
return v
}
func (v *loggerPlus) Printf(ctx context.Context, f string, a ...any) {
attrs := []slog.Attr{slog.Int("pid", os.Getpid())}
if cid := ContextID(ctx); cid != "" {
attrs = append(attrs, slog.String("cid", cid))
func (v *loggerPlus) Log(ctx context.Context, msg string, args ...any) {
attrs := []any{
"pid", os.Getpid(),
"version", version.Version(),
}
v.logger.LogAttrs(ctx, v.level, fmt.Sprintf(f, a...), attrs...)
}
var verboseLogger logger
if cid := ContextID(ctx); cid != "" {
attrs = append(attrs, "cid", cid)
}
func Vf(ctx context.Context, format string, a ...any) {
verboseLogger.Printf(ctx, format, a...)
// Keep compatibility with the old *f call sites while exposing the new
// slog-style API. New code should pass structured key/value args.
if len(args) > 0 && strings.Contains(msg, "%") {
msg = fmt.Sprintf(msg, args...)
args = nil
}
attrs = append(attrs, args...)
v.logger.Log(ctx, v.level, msg, attrs...)
}
var debugLogger logger
func Df(ctx context.Context, format string, a ...any) {
debugLogger.Printf(ctx, format, a...)
func Debug(ctx context.Context, msg string, args ...any) {
debugLogger.Log(ctx, msg, args...)
}
var infoLogger logger
func Info(ctx context.Context, msg string, args ...any) {
infoLogger.Log(ctx, msg, args...)
}
var warnLogger logger
func Wf(ctx context.Context, format string, a ...any) {
warnLogger.Printf(ctx, format, a...)
func Warn(ctx context.Context, msg string, args ...any) {
warnLogger.Log(ctx, msg, args...)
}
var errorLogger logger
func Ef(ctx context.Context, format string, a ...any) {
errorLogger.Printf(ctx, format, a...)
func Error(ctx context.Context, msg string, args ...any) {
errorLogger.Log(ctx, msg, args...)
}
const (
levelVerb slog.Level = slog.LevelDebug - 4
levelDebug slog.Level = slog.LevelDebug
levelWarn slog.Level = slog.LevelWarn
levelError slog.Level = slog.LevelError
)
// newJSONLogger builds a slog.Logger that writes JSON records to w, renders the
// time in UTC, and maps our custom levels to short lowercase labels.
// newJSONLogger builds a slog.Logger that writes JSON records to w.
func newJSONLogger(w io.Writer) *slog.Logger {
h := slog.NewJSONHandler(w, &slog.HandlerOptions{
Level: levelVerb,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if len(groups) != 0 {
return a
}
switch a.Key {
case slog.TimeKey:
return slog.Time(slog.TimeKey, a.Value.Time().UTC())
case slog.LevelKey:
return slog.String(slog.LevelKey, levelLabel(a.Value.Any().(slog.Level)))
}
return a
},
Level: slog.LevelDebug,
})
return slog.New(h)
}
func levelLabel(l slog.Level) string {
switch l {
case levelVerb:
return "verb"
case levelDebug:
return "debug"
case levelWarn:
return "warn"
case levelError:
return "error"
}
return l.String()
}
func init() {
verboseLogger = newLoggerPlus(func(l *loggerPlus) {
l.logger = newJSONLogger(io.Discard)
l.level = levelVerb
})
debugLogger = newLoggerPlus(func(l *loggerPlus) {
l.logger = newJSONLogger(os.Stdout)
l.level = levelDebug
l.level = slog.LevelDebug
})
infoLogger = newLoggerPlus(func(l *loggerPlus) {
l.logger = newJSONLogger(os.Stdout)
l.level = slog.LevelInfo
})
warnLogger = newLoggerPlus(func(l *loggerPlus) {
l.logger = newJSONLogger(os.Stderr)
l.level = levelWarn
l.level = slog.LevelWarn
})
errorLogger = newLoggerPlus(func(l *loggerPlus) {
l.logger = newJSONLogger(os.Stderr)
l.level = levelError
l.level = slog.LevelError
})
}

View File

@ -10,7 +10,6 @@ import (
"io"
"log/slog"
"os"
"strings"
"testing"
"time"
)
@ -31,39 +30,15 @@ func bufLoggerPlus(w io.Writer, level slog.Level) *loggerPlus {
})
}
func TestLevelLabel_Known(t *testing.T) {
cases := map[slog.Level]string{
levelVerb: "verb",
levelDebug: "debug",
levelWarn: "warn",
levelError: "error",
}
for lvl, want := range cases {
if got := levelLabel(lvl); got != want {
t.Errorf("levelLabel(%v) = %q, want %q", lvl, got, want)
}
}
}
func TestLevelLabel_UnknownFallsBackToString(t *testing.T) {
got := levelLabel(slog.Level(99))
if got == "" {
t.Fatalf("levelLabel(99) returned empty")
}
if got == "verb" || got == "debug" || got == "warn" || got == "error" {
t.Fatalf("levelLabel(99) = %q, want slog.Level.String() form", got)
}
}
func TestPrintf_EmitsAllFields(t *testing.T) {
func TestLog_EmitsAllFields(t *testing.T) {
var buf bytes.Buffer
lp := bufLoggerPlus(&buf, levelDebug)
lp := bufLoggerPlus(&buf, slog.LevelDebug)
ctx := withContextID(context.Background(), "abc1234")
lp.Printf(ctx, "hello %s %d", "world", 42)
lp.Log(ctx, "hello %s %d", "world", 42)
m := decodeLine(t, buf.Bytes())
if m["level"] != "debug" {
t.Errorf("level = %v, want debug", m["level"])
if m["level"] != "DEBUG" {
t.Errorf("level = %v, want DEBUG", m["level"])
}
if m["msg"] != "hello world 42" {
t.Errorf("msg = %v, want %q", m["msg"], "hello world 42")
@ -76,40 +51,56 @@ func TestPrintf_EmitsAllFields(t *testing.T) {
t.Errorf("pid = %v, want %d", m["pid"], os.Getpid())
}
ts, ok := m["time"].(string)
if !ok || !strings.HasSuffix(ts, "Z") {
t.Errorf("time = %v, want UTC suffix Z", m["time"])
if !ok {
t.Fatalf("time = %v, want string", m["time"])
}
if _, err := time.Parse(time.RFC3339Nano, ts); err != nil {
t.Errorf("time %q not RFC3339Nano: %v", ts, err)
}
}
func TestPrintf_OmitsCIDWhenAbsent(t *testing.T) {
func TestLog_OmitsCIDWhenAbsent(t *testing.T) {
var buf bytes.Buffer
bufLoggerPlus(&buf, levelWarn).Printf(context.Background(), "no cid here")
bufLoggerPlus(&buf, slog.LevelWarn).Log(context.Background(), "no cid here")
m := decodeLine(t, buf.Bytes())
if v, present := m["cid"]; present {
t.Errorf("cid should be absent, got %v", v)
}
if m["level"] != "warn" {
t.Errorf("level = %v, want warn", m["level"])
if m["level"] != "WARN" {
t.Errorf("level = %v, want WARN", m["level"])
}
}
func TestPrintf_AllLevelsMapToLabel(t *testing.T) {
func TestLog_EmitsStructuredArgs(t *testing.T) {
var buf bytes.Buffer
bufLoggerPlus(&buf, slog.LevelInfo).Log(context.Background(), "hello", "stream", "live/livestream", "retry", 2)
m := decodeLine(t, buf.Bytes())
if m["msg"] != "hello" {
t.Errorf("msg = %v, want hello", m["msg"])
}
if m["stream"] != "live/livestream" {
t.Errorf("stream = %v, want live/livestream", m["stream"])
}
if retry, ok := m["retry"].(float64); !ok || retry != 2 {
t.Errorf("retry = %v, want 2", m["retry"])
}
}
func TestLog_AllLevelsMapToLabel(t *testing.T) {
cases := []struct {
level slog.Level
label string
}{
{levelVerb, "verb"},
{levelDebug, "debug"},
{levelWarn, "warn"},
{levelError, "error"},
{slog.LevelInfo, "INFO"},
{slog.LevelDebug, "DEBUG"},
{slog.LevelWarn, "WARN"},
{slog.LevelError, "ERROR"},
}
for _, tc := range cases {
var buf bytes.Buffer
bufLoggerPlus(&buf, tc.level).Printf(context.Background(), "hi")
bufLoggerPlus(&buf, tc.level).Log(context.Background(), "hi")
m := decodeLine(t, buf.Bytes())
if m["level"] != tc.label {
t.Errorf("level(%v) rendered as %v, want %q", tc.level, m["level"], tc.label)
@ -120,7 +111,7 @@ func TestPrintf_AllLevelsMapToLabel(t *testing.T) {
func TestNewJSONLogger_GroupedAttrsPassThrough(t *testing.T) {
var buf bytes.Buffer
lg := newJSONLogger(&buf)
lg.LogAttrs(context.Background(), levelDebug, "grouped",
lg.LogAttrs(context.Background(), slog.LevelDebug, "grouped",
slog.Group("meta", slog.String("inner", "v")))
m := decodeLine(t, buf.Bytes())
@ -134,22 +125,22 @@ func TestNewJSONLogger_GroupedAttrsPassThrough(t *testing.T) {
}
func TestPackageWrappers_RouteToRightLogger(t *testing.T) {
origV, origD, origW, origE := verboseLogger, debugLogger, warnLogger, errorLogger
origI, origD, origW, origE := infoLogger, debugLogger, warnLogger, errorLogger
t.Cleanup(func() {
verboseLogger, debugLogger, warnLogger, errorLogger = origV, origD, origW, origE
infoLogger, debugLogger, warnLogger, errorLogger = origI, origD, origW, origE
})
vBuf, dBuf, wBuf, eBuf := &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}
verboseLogger = bufLoggerPlus(vBuf, levelVerb)
debugLogger = bufLoggerPlus(dBuf, levelDebug)
warnLogger = bufLoggerPlus(wBuf, levelWarn)
errorLogger = bufLoggerPlus(eBuf, levelError)
iBuf, dBuf, wBuf, eBuf := &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}
infoLogger = bufLoggerPlus(iBuf, slog.LevelInfo)
debugLogger = bufLoggerPlus(dBuf, slog.LevelDebug)
warnLogger = bufLoggerPlus(wBuf, slog.LevelWarn)
errorLogger = bufLoggerPlus(eBuf, slog.LevelError)
ctx := context.Background()
Vf(ctx, "v=%d", 1)
Df(ctx, "d=%d", 2)
Wf(ctx, "w=%d", 3)
Ef(ctx, "e=%d", 4)
Info(ctx, "v=%d", 1)
Debug(ctx, "d=%d", 2)
Warn(ctx, "w=%d", 3)
Error(ctx, "e=%d", 4)
checks := []struct {
name string
@ -157,10 +148,10 @@ func TestPackageWrappers_RouteToRightLogger(t *testing.T) {
label string
msg string
}{
{"Vf", vBuf, "verb", "v=1"},
{"Df", dBuf, "debug", "d=2"},
{"Wf", wBuf, "warn", "w=3"},
{"Ef", eBuf, "error", "e=4"},
{"Info", iBuf, "INFO", "v=1"},
{"Debug", dBuf, "DEBUG", "d=2"},
{"Warn", wBuf, "WARN", "w=3"},
{"Error", eBuf, "ERROR", "e=4"},
}
for _, c := range checks {
m := decodeLine(t, c.buf.Bytes())

View File

@ -63,7 +63,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "HTTP API server listen at %v", addr)
logger.Debug(ctx, "HTTP API server listen at %v", addr)
// Shutdown the server gracefully when quiting.
go func() {
@ -77,7 +77,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
}()
// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
logger.Debug(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
utils.ApiResponse(ctx, w, r, map[string]string{
"signature": version.Signature(),
@ -86,7 +86,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
})
// The WebRTC WHIP API handler.
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
logger.Debug(ctx, "Handle /rtc/v1/whip/ by %v", addr)
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
utils.ApiError(ctx, w, r, err)
@ -94,7 +94,7 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
})
// The WebRTC WHEP API handler.
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
logger.Debug(ctx, "Handle /rtc/v1/whep/ by %v", addr)
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
utils.ApiError(ctx, w, r, err)
@ -109,12 +109,12 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
err := v.server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
logger.Df(ctx, "HTTP API server done")
logger.Debug(ctx, "HTTP API server done")
} else if ctx.Err() != nil {
logger.Df(ctx, "HTTP API server done with context canceled")
logger.Debug(ctx, "HTTP API server done with context canceled")
} else {
// TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "HTTP API accept err %+v", err)
logger.Warn(ctx, "HTTP API accept err %+v", err)
}
}
}()
@ -163,7 +163,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "System API server listen at %v", addr)
logger.Debug(ctx, "System API server listen at %v", addr)
// Shutdown the server gracefully when quiting.
go func() {
@ -177,7 +177,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
}()
// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
logger.Debug(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
utils.ApiResponse(ctx, w, r, map[string]string{
"signature": version.Signature(),
@ -186,7 +186,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
})
// The register service for SRS media servers.
logger.Df(ctx, "Handle /api/v1/srs/register by %v", addr)
logger.Debug(ctx, "Handle /api/v1/srs/register by %v", addr)
mux.HandleFunc("/api/v1/srs/register", func(w http.ResponseWriter, r *http.Request) {
if err := func() error {
var deviceID, ip, serverID, serviceID, pid string
@ -247,7 +247,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
return errors.Wrapf(err, "update SRS server %+v", server)
}
logger.Df(ctx, "Register SRS media server, %+v", server)
logger.Debug(ctx, "Register SRS media server, %+v", server)
return nil
}(); err != nil {
utils.ApiError(ctx, w, r, err)
@ -271,12 +271,12 @@ func (v *systemAPI) Run(ctx context.Context) error {
err := v.server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
logger.Df(ctx, "System API server done")
logger.Debug(ctx, "System API server done")
} else if ctx.Err() != nil {
logger.Df(ctx, "System API server done with context canceled")
logger.Debug(ctx, "System API server done with context canceled")
} else {
// TODO: If System API server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "System API accept err %+v", err)
logger.Warn(ctx, "System API accept err %+v", err)
}
}
}()

View File

@ -64,7 +64,7 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "HTTP Stream server listen at %v", addr)
logger.Debug(ctx, "HTTP Stream server listen at %v", addr)
// Shutdown the server gracefully when quiting.
go func() {
@ -78,7 +78,7 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
}()
// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
logger.Debug(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
type Response struct {
Code int `json:"code"`
@ -108,11 +108,11 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
}
staticServer = http.FileServer(http.Dir(staticFiles))
logger.Df(ctx, "Handle static files at %v", staticFiles)
logger.Debug(ctx, "Handle static files at %v", staticFiles)
}
// The default handler, for both static web server and streaming server.
logger.Df(ctx, "Handle / by %v", addr)
logger.Debug(ctx, "Handle / by %v", addr)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// For HLS streaming, we will proxy the request to the streaming server.
if strings.HasSuffix(r.URL.Path, ".m3u8") {
@ -169,12 +169,12 @@ func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
err := v.server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
logger.Df(ctx, "HTTP Stream server done")
logger.Debug(ctx, "HTTP Stream server done")
} else if ctx.Err() != nil {
logger.Df(ctx, "HTTP Stream server done with context canceled")
logger.Debug(ctx, "HTTP Stream server done with context canceled")
} else {
// TODO: If HTTP Stream server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "HTTP Stream accept err %+v", err)
logger.Warn(ctx, "HTTP Stream accept err %+v", err)
}
}
}()
@ -208,7 +208,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request)
if err := v.serve(ctx, w, r); err != nil {
utils.ApiError(ctx, w, r, err)
} else {
logger.Df(ctx, "HTTP client done")
logger.Debug(ctx, "HTTP client done")
}
}
@ -220,7 +220,7 @@ func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter,
// Build the stream URL in vhost/app/stream schema.
unifiedURL, fullURL := utils.ConvertURLToStreamURL(r)
logger.Df(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL)
logger.Debug(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL)
streamURL, err := utils.BuildStreamURL(unifiedURL)
if err != nil {
@ -278,7 +278,7 @@ func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.Respons
}
}
logger.Df(ctx, "HTTP start streaming")
logger.Debug(ctx, "HTTP start streaming")
// Proxy the stream from backend to client.
if _, err := io.Copy(w, resp.Body); err != nil {
@ -332,7 +332,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := v.serve(v.ctx, w, r); err != nil {
utils.ApiError(v.ctx, w, r, err)
} else {
logger.Df(v.ctx, "HLS client %v for %v with %v done",
logger.Debug(v.ctx, "HLS client %v for %v with %v done",
v.SRSProxyBackendHLSID, v.StreamURL, r.URL.Path)
}
}

View File

@ -82,7 +82,7 @@ func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseW
// Build the stream URL in vhost/app/stream schema.
unifiedURL, fullURL := utils.ConvertURLToStreamURL(r)
logger.Df(ctx, "Got WebRTC WHIP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
logger.Debug(ctx, "Got WebRTC WHIP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
streamURL, err := utils.BuildStreamURL(unifiedURL)
if err != nil {
@ -119,7 +119,7 @@ func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseW
// Build the stream URL in vhost/app/stream schema.
unifiedURL, fullURL := utils.ConvertURLToStreamURL(r)
logger.Df(ctx, "Got WebRTC WHEP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
logger.Debug(ctx, "Got WebRTC WHEP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
streamURL, err := utils.BuildStreamURL(unifiedURL)
if err != nil {
@ -234,7 +234,7 @@ func (v *srsWebRTCServer) proxyApiToBackend(
return errors.Wrapf(err, "write local sdp answer %v", localSDPAnswer)
}
logger.Df(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB",
logger.Debug(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB",
len(localSDPAnswer), localICEUfrag, len(localICEPwd))
return nil
}
@ -256,7 +256,7 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error {
return errors.Wrapf(err, "listen udp %v", saddr)
}
v.listener = listener
logger.Df(ctx, "WebRTC server listen at %v", saddr)
logger.Debug(ctx, "WebRTC server listen at %v", saddr)
// Consume all messages from UDP media transport.
v.wg.Add(1)
@ -269,17 +269,17 @@ func (v *srsWebRTCServer) Run(ctx context.Context) error {
if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error.
if ctx.Err() != nil || utils.IsClosedNetworkError(err) {
logger.Df(ctx, "WebRTC server done")
logger.Debug(ctx, "WebRTC server done")
return
}
// TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "WebRTC read from udp failed, err=%+v", err)
logger.Warn(ctx, "WebRTC read from udp failed, err=%+v", err)
time.Sleep(1 * time.Second)
continue
}
if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil {
logger.Wf(ctx, "WebRTC handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
logger.Warn(ctx, "WebRTC handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
}
}
}()
@ -312,7 +312,7 @@ func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr
return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username)
} else {
connection = s.(*RTCConnection).Initialize(ctx, v.listener)
logger.Df(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL)
logger.Debug(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL)
}
// Cache connection for fast search.
@ -418,13 +418,13 @@ func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
n, _, err := v.backendUDP.ReadFromUDP(buf)
if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Wf(ctx, "read from backend failed, err=%v", err)
logger.Warn(ctx, "read from backend failed, err=%v", err)
break
}
if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Wf(ctx, "write to client failed, err=%v", err)
logger.Warn(ctx, "write to client failed, err=%v", err)
break
}
}

View File

@ -65,7 +65,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
return errors.Wrapf(err, "listen rtmp addr %v", addr)
}
v.listener = listener
logger.Df(ctx, "RTMP server listen at %v", addr)
logger.Debug(ctx, "RTMP server listen at %v", addr)
v.wg.Add(1)
go func() {
@ -76,10 +76,10 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error.
if ctx.Err() != nil || utils.IsClosedNetworkError(err) {
logger.Df(ctx, "RTMP server done")
logger.Debug(ctx, "RTMP server done")
} else {
// TODO: If RTMP server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "RTMP server accept err %+v", err)
logger.Warn(ctx, "RTMP server accept err %+v", err)
}
return
}
@ -91,9 +91,9 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
handleErr := func(err error) {
if utils.IsPeerClosedError(err) || utils.IsClosedNetworkError(err) {
logger.Df(ctx, "RTMP connection closed")
logger.Debug(ctx, "RTMP connection closed")
} else {
logger.Wf(ctx, "RTMP serve err %+v", err)
logger.Warn(ctx, "RTMP serve err %+v", err)
}
}
@ -101,7 +101,7 @@ func (v *srsRTMPServer) Run(ctx context.Context) error {
if err := rc.serve(ctx, conn); err != nil {
handleErr(err)
} else {
logger.Df(ctx, "RTMP client done")
logger.Debug(ctx, "RTMP client done")
}
}(logger.WithContext(ctx), conn)
}
@ -128,7 +128,7 @@ func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection {
}
func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr())
logger.Debug(ctx, "Got RTMP client from %v", conn.RemoteAddr())
// If any goroutine quit, cancel another one.
parentCtx := ctx
@ -168,7 +168,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
}
client := rtmp.NewProtocol(conn)
logger.Df(ctx, "RTMP simple handshake done")
logger.Debug(ctx, "RTMP simple handshake done")
// Expect RTMP connect command with tcUrl.
var connectReq *rtmp.ConnectAppPacket
@ -209,7 +209,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
}
tcUrl := connectReq.TcUrl()
logger.Df(ctx, "RTMP connect app %v", tcUrl)
logger.Debug(ctx, "RTMP connect app %v", tcUrl)
// Expect RTMP command to identify the client, a publisher or viewer.
var currentStreamID, nextStreamID int
@ -285,7 +285,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
// Update the stream ID for next request.
currentStreamID = nextStreamID
}
logger.Df(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v",
logger.Debug(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v",
tcUrl, streamName, currentStreamID, clientType)
// Find a backend SRS server to proxy the RTMP stream.
@ -333,7 +333,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
return errors.Wrapf(err, "start play")
}
}
logger.Df(ctx, "RTMP start streaming")
logger.Debug(ctx, "RTMP start streaming")
// For all proxy goroutines.
var wg sync.WaitGroup
@ -352,7 +352,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
if err != nil {
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
//logger.Debug(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
// TODO: Update the stream ID if not the same.
if err := client.WriteMessage(ctx, m); err != nil {
@ -375,7 +375,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
if err != nil {
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
//logger.Debug(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
// TODO: Update the stream ID if not the same.
if err := backend.client.WriteMessage(ctx, m); err != nil {
@ -392,7 +392,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
if r0 != nil {
// If backend connection closed normally, treat as normal disconnection
if utils.IsClosedNetworkError(r0) || utils.IsPeerClosedError(r0) {
logger.Df(ctx, "RTMP backend disconnected")
logger.Debug(ctx, "RTMP backend disconnected")
return nil
}
return errors.Wrapf(r0, "proxy backend->client")
@ -400,7 +400,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
if r1 != nil {
// If client connection closed normally, treat as normal disconnection
if utils.IsClosedNetworkError(r1) || utils.IsPeerClosedError(r1) {
logger.Df(ctx, "RTMP client disconnected")
logger.Debug(ctx, "RTMP client disconnected")
return nil
}
return errors.Wrapf(r1, "proxy client->backend")
@ -495,7 +495,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str
if _, err = hs.ReadC2S2(c); err != nil {
return errors.Wrapf(err, "read c2")
}
logger.Df(ctx, "backend simple handshake done, server=%v", addr)
logger.Debug(ctx, "backend simple handshake done, server=%v", addr)
if err := hs.WriteC2S2(c, hs.C1S1()); err != nil {
return errors.Wrapf(err, "write c2")
@ -515,7 +515,7 @@ func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName str
if _, err := rtmp.ExpectPacket(ctx, client, &connectAppRes); err != nil {
return errors.Wrapf(err, "expect connect app res")
}
logger.Df(ctx, "backend connect RTMP app, tcUrl=%v, id=%v", tcUrl, connectAppRes.SrsID())
logger.Debug(ctx, "backend connect RTMP app, tcUrl=%v, id=%v", tcUrl, connectAppRes.SrsID())
}
// Play or view RTMP stream with server.
@ -615,7 +615,7 @@ func (v *RTMPClientToBackend) publish(ctx context.Context, client *rtmp.Protocol
break
}
}
logger.Df(ctx, "backend publish stream=%v, sid=%v", streamName, currentStreamID)
logger.Debug(ctx, "backend publish stream=%v, sid=%v", streamName, currentStreamID)
return nil
}

View File

@ -78,7 +78,7 @@ func (v *srsSRTServer) Run(ctx context.Context) error {
return errors.Wrapf(err, "listen udp %v", saddr)
}
v.listener = listener
logger.Df(ctx, "SRT server listen at %v", saddr)
logger.Debug(ctx, "SRT server listen at %v", saddr)
// Consume all messages from UDP media transport.
v.wg.Add(1)
@ -91,17 +91,17 @@ func (v *srsSRTServer) Run(ctx context.Context) error {
if err != nil {
// If context is canceled or connection is closed, exit gracefully without logging error.
if ctx.Err() != nil || utils.IsClosedNetworkError(err) {
logger.Df(ctx, "SRT server done")
logger.Debug(ctx, "SRT server done")
return
}
// TODO: If SRT server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "SRT read from udp failed, err=%+v", err)
logger.Warn(ctx, "SRT read from udp failed, err=%+v", err)
time.Sleep(1 * time.Second)
continue
}
if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil {
logger.Wf(ctx, "SRT handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
logger.Warn(ctx, "SRT handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
}
}
}()
@ -132,7 +132,7 @@ func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, d
ctx = conn.ctx
if !ok {
logger.Df(ctx, "Create new SRT connection skt=%v", socketID)
logger.Debug(ctx, "Create new SRT connection skt=%v", socketID)
}
if newSocketID, err := conn.HandlePacket(pkt, addr, data); err != nil {
@ -213,7 +213,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
if pkt.SynCookie == 0 {
// Save handshake 0 packet.
v.handshake0 = pkt
logger.Df(ctx, "SRT Handshake 0: %v", v.handshake0)
logger.Debug(ctx, "SRT Handshake 0: %v", v.handshake0)
// Response handshake 1.
v.handshake1 = &SRTHandshakePacket{
@ -234,7 +234,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
SynCookie: 0x418d5e4e,
PeerIP: net.ParseIP("127.0.0.1"),
}
logger.Df(ctx, "SRT Handshake 1: %v", v.handshake1)
logger.Debug(ctx, "SRT Handshake 1: %v", v.handshake1)
if b, err := v.handshake1.MarshalBinary(); err != nil {
return errors.Wrapf(err, "marshal handshake 1")
@ -254,7 +254,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
// Save handshake packet.
v.handshake2 = pkt
logger.Df(ctx, "SRT Handshake 2: %v, sid=%v", v.handshake2, streamID)
logger.Debug(ctx, "SRT Handshake 2: %v, sid=%v", v.handshake2, streamID)
// Start the UDP proxy to backend.
if err := v.connectBackend(ctx, streamID); err != nil {
@ -272,7 +272,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
} else if _, err = v.backendUDP.Write(b); err != nil {
return errors.Wrapf(err, "write handshake 0")
}
logger.Df(ctx, "Proxy send handshake 0: %v", v.handshake0)
logger.Debug(ctx, "Proxy send handshake 0: %v", v.handshake0)
// Read handshake 1 from backend server.
b := make([]byte, 4096)
@ -282,7 +282,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
} else if err := handshake1p.UnmarshalBinary(b[:nn]); err != nil {
return errors.Wrapf(err, "unmarshal handshake 1")
}
logger.Df(ctx, "Proxy got handshake 1: %v", handshake1p)
logger.Debug(ctx, "Proxy got handshake 1: %v", handshake1p)
// Proxy handshake 2 to backend server.
handshake2p := *v.handshake2
@ -292,7 +292,7 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
} else if _, err = v.backendUDP.Write(b); err != nil {
return errors.Wrapf(err, "write handshake 2")
}
logger.Df(ctx, "Proxy send handshake 2: %v", handshake2p)
logger.Debug(ctx, "Proxy send handshake 2: %v", handshake2p)
// Read handshake 3 from backend server.
handshake3p := &SRTHandshakePacket{}
@ -301,13 +301,13 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
} else if err := handshake3p.UnmarshalBinary(b[:nn]); err != nil {
return errors.Wrapf(err, "unmarshal handshake 3")
}
logger.Df(ctx, "Proxy got handshake 3: %v", handshake3p)
logger.Debug(ctx, "Proxy got handshake 3: %v", handshake3p)
// Response handshake 3 to client.
v.handshake3 = &*handshake3p
v.handshake3.SynCookie = v.handshake1.SynCookie
v.socketID = handshake3p.SRTSocketID
logger.Df(ctx, "Handshake 3: %v", v.handshake3)
logger.Debug(ctx, "Handshake 3: %v", v.handshake3)
if b, err := v.handshake3.MarshalBinary(); err != nil {
return errors.Wrapf(err, "marshal handshake 3")
@ -322,12 +322,12 @@ func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePa
nn, err := v.backendUDP.Read(b)
if err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Wf(ctx, "read from backend failed, err=%v", err)
logger.Warn(ctx, "read from backend failed, err=%v", err)
return
}
if _, err = v.listenerUDP.WriteToUDP(b[:nn], addr); err != nil {
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
logger.Wf(ctx, "write to client failed, err=%v", err)
logger.Warn(ctx, "write to client failed, err=%v", err)
return
}
}

View File

@ -27,7 +27,7 @@ func InstallSignals(ctx context.Context, cancel context.CancelFunc) {
go func() {
for s := range sc {
logger.Df(ctx, "Got signal %v", s)
logger.Debug(ctx, "Got signal %v", s)
cancel()
}
}()
@ -45,7 +45,7 @@ func InstallForceQuit(ctx context.Context, environment env.ProxyEnvironment) err
go func() {
<-ctx.Done()
time.Sleep(forceTimeout)
logger.Wf(ctx, "Force to exit by timeout")
logger.Warn(ctx, "Force to exit by timeout")
osExit(1)
}()
return nil

View File

@ -40,7 +40,7 @@ func ApiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, da
}
func ApiError(ctx context.Context, w http.ResponseWriter, r *http.Request, err error) {
logger.Wf(ctx, "HTTP API error %+v", err)
logger.Warn(ctx, "HTTP API error %+v", err)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "%v\n", err)