Codex: Add RTMP API examples.
Add executable examples for the RTMP handshake and protocol workflow, covering packet decoding, expectation helpers, command responses, and raw message forwarding. Co-authored-by: chatgpt-codex-connector[bot] <199175422+chatgpt-codex-connector[bot]@users.noreply.github.com>
This commit is contained in:
parent
cb90bf1fec
commit
4b97841853
|
|
@ -301,6 +301,9 @@ The knowledge base (`memory/srs-*.md`) captures William's knowledge about SRS
|
|||
- `proxy-load-balancer.md` — Load balancer design: memory vs Redis implementations, stream-to-server mapping, server health via heartbeats, protocol-specific state
|
||||
- `proxy-origin-cluster.md` — Origin cluster tutorial: build proxy + SRS, configure multi-origin with proxy, stream publishing and playback verification
|
||||
|
||||
**Next-Generation Server API Examples** — Executable API documentation:
|
||||
- `internal/rtmp/example_test.go` — RTMP API examples: AMF0, handshake, and protocol workflow
|
||||
|
||||
## Testing and Verification Structure
|
||||
|
||||
How to verify SRS works correctly.
|
||||
|
|
|
|||
|
|
@ -4,7 +4,11 @@
|
|||
package rtmp_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"srsx/internal/rtmp"
|
||||
)
|
||||
|
|
@ -60,3 +64,250 @@ func ExampleAmf0Object() {
|
|||
// level: status
|
||||
// is number: false
|
||||
}
|
||||
|
||||
func ExampleNewHandshake() {
|
||||
client := rtmp.NewHandshake()
|
||||
server := rtmp.NewHandshake()
|
||||
|
||||
var clientToServer bytes.Buffer
|
||||
if err := client.WriteC0S0(&clientToServer); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := client.WriteC1S1(&clientToServer); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c0, err := server.ReadC0S0(&clientToServer)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
c1, err := server.ReadC1S1(&clientToServer)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var serverToClient bytes.Buffer
|
||||
if err := server.WriteC0S0(&serverToClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := server.WriteC1S1(&serverToClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := server.WriteC2S2(&serverToClient, c1); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s0, err := client.ReadC0S0(&serverToClient)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s1, err := client.ReadC1S1(&serverToClient)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s2, err := client.ReadC2S2(&serverToClient)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := client.WriteC2S2(&clientToServer, s1); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
c2, err := server.ReadC2S2(&clientToServer)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println("client version:", c0[0])
|
||||
fmt.Println("server version:", s0[0])
|
||||
fmt.Println("c1 bytes:", len(c1))
|
||||
fmt.Println("s1 bytes:", len(s1))
|
||||
fmt.Println("s2 echoes c1:", bytes.Equal(s2, c1))
|
||||
fmt.Println("c2 echoes s1:", bytes.Equal(c2, s1))
|
||||
fmt.Println("server cached c1:", bytes.Equal(server.C1S1(), c1))
|
||||
|
||||
// Output:
|
||||
// client version: 3
|
||||
// server version: 3
|
||||
// c1 bytes: 1536
|
||||
// s1 bytes: 1536
|
||||
// s2 echoes c1: true
|
||||
// c2 echoes s1: true
|
||||
// server cached c1: true
|
||||
}
|
||||
|
||||
func ExampleNewProtocol() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
clientConn, serverConn := net.Pipe()
|
||||
defer clientConn.Close()
|
||||
defer serverConn.Close()
|
||||
|
||||
client := rtmp.NewProtocol(clientConn)
|
||||
server := rtmp.NewProtocol(serverConn)
|
||||
|
||||
backendConn, upstreamConn := net.Pipe()
|
||||
defer backendConn.Close()
|
||||
defer upstreamConn.Close()
|
||||
|
||||
backend := rtmp.NewProtocol(backendConn)
|
||||
upstream := rtmp.NewProtocol(upstreamConn)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
// The server can read a raw message first, then decode it explicitly.
|
||||
m, err := server.ExpectMessage(ctx, rtmp.MessageTypeSetChunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pkt, err := server.DecodeMessage(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkSize := pkt.(*rtmp.SetChunkSize)
|
||||
|
||||
// ExpectPacket reads and decodes messages until it finds the requested packet type.
|
||||
var connectReq *rtmp.ConnectAppPacket
|
||||
if _, err := rtmp.ExpectPacket(ctx, server, &connectReq); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ack := rtmp.NewWindowAcknowledgementSize()
|
||||
ack.AckSize = 2500000
|
||||
if err := server.WritePacket(ctx, ack, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
serverChunk := rtmp.NewSetChunkSize()
|
||||
serverChunk.ChunkSize = chunkSize.ChunkSize
|
||||
if err := server.WritePacket(ctx, serverChunk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connectRes := rtmp.NewConnectAppResPacket(connectReq.TransactionID)
|
||||
connectRes.CommandObject.Set("fmsVer", rtmp.NewAmf0String("FMS/3,5,3,888"))
|
||||
connectRes.Args.Set("level", rtmp.NewAmf0String("status"))
|
||||
connectRes.Args.Set("code", rtmp.NewAmf0String("NetConnection.Connect.Success"))
|
||||
if err := server.WritePacket(ctx, connectRes, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var createStream *rtmp.CallPacket
|
||||
if _, err := rtmp.ExpectPacket(ctx, server, &createStream); err != nil {
|
||||
return err
|
||||
}
|
||||
createStreamRes := rtmp.NewCreateStreamResPacket(createStream.TransactionID)
|
||||
createStreamRes.SetStreamID(1)
|
||||
if err := server.WritePacket(ctx, createStreamRes, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For media forwarding, the proxy reads a complete RTMP message and writes
|
||||
// that same message to another protocol connection without decoding/repacking.
|
||||
publishMessage, err := server.ReadMessage(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := backend.WriteMessage(ctx, publishMessage); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case done <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
// Client runs the normal RTMP command workflow: configure chunk size,
|
||||
// connect to an app, create a stream, publish it, then verify the proxy
|
||||
// forwarded the publish message to the upstream side.
|
||||
if err := func() error {
|
||||
clientChunk := rtmp.NewSetChunkSize()
|
||||
clientChunk.ChunkSize = 128
|
||||
if err := client.WritePacket(ctx, clientChunk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connectReq := rtmp.NewConnectAppPacket()
|
||||
connectReq.CommandObject.Set("tcUrl", rtmp.NewAmf0String("rtmp://example.com/live"))
|
||||
if err := client.WritePacket(ctx, connectReq, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ackMessage, err := client.ExpectMessage(ctx, rtmp.MessageTypeWindowAcknowledgementSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackPacket, err := client.DecodeMessage(ackMessage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ack, ok := ackPacket.(*rtmp.WindowAcknowledgementSize)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected ack packet %T", ackPacket)
|
||||
}
|
||||
|
||||
var serverChunk *rtmp.SetChunkSize
|
||||
if _, err := rtmp.ExpectPacket(ctx, client, &serverChunk); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var connectRes *rtmp.ConnectAppResPacket
|
||||
if _, err := rtmp.ExpectPacket(ctx, client, &connectRes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createStream := rtmp.NewCreateStreamPacket()
|
||||
if err := client.WritePacket(ctx, createStream, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
var createStreamRes *rtmp.CreateStreamResPacket
|
||||
if _, err := rtmp.ExpectPacket(ctx, client, &createStreamRes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
publish := rtmp.NewPublishPacket()
|
||||
publish.TransactionID = 5
|
||||
publish.StreamName = rtmp.NewAmf0String("livestream")
|
||||
publish.StreamType = rtmp.NewAmf0String("live")
|
||||
if err := client.WritePacket(ctx, publish, int(createStreamRes.StreamID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var upstreamPublish *rtmp.PublishPacket
|
||||
if _, err := rtmp.ExpectPacket(ctx, upstream, &upstreamPublish); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
fmt.Println("ack size:", ack.AckSize)
|
||||
fmt.Println("chunk size:", serverChunk.ChunkSize)
|
||||
fmt.Println("connect:", rtmp.NewAmf0Converter(connectRes.Args.Get("code")).ToString().String())
|
||||
fmt.Println("stream id:", int(createStreamRes.StreamID))
|
||||
fmt.Println("forward publish:", upstreamPublish.StreamName.String())
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Output:
|
||||
// ack size: 2500000
|
||||
// chunk size: 128
|
||||
// connect: NetConnection.Connect.Success
|
||||
// stream id: 1
|
||||
// forward publish: livestream
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user