Claude: RTMP: Add protocol unit-test suite and harden packet unmarshal.

Implements the Go RTMP stack unit-test plan (internal/rtmp), cross-referenced
to the C++ srs_utest_manual_protocol/protocol2/rtmp suites, and fixes the one
panic class the fuzz targets exposed.

Tests added (rtmp_test.go):
- TestReadMessageInterleavedMultiStream    interleaved multi-cid reassembly
- TestReadMessageLargeChunkStreamID        2-/3-byte cid full-message read
- TestProtocolWritePacketReadMessageRoundTrip  encoder<->decoder, 14 pkts x 4 sizes
- TestReadMessageTimestampDiscontinuity    backward/forward jump, 31-bit wrap
- TestReadWriteLargePayloadChunkBoundaries chunk-boundary stress
- TestGoldenWireBytes                       golden bytes for headers + controls
- FuzzReadMessage / FuzzDecodeMessage / FuzzPacketUnmarshal  untrusted-input fuzz
- TestPacketUnmarshalAdversarialInputs      resource-safety / truncation
- TestProtocolTransactionMapConcurrency     -race on the transaction map

Hardening (rtmp.go): the fuzz targets found that the variantCallPacket family
counted a stale optional-field default (CommandObject/Args pre-set to Null by a
New*Packet constructor) when the wire data was exhausted, so Size() overran the
caller's p = p[Size():] advance and panicked on truncated, untrusted input.
Reset those fields to nil before the presence check, and route the embedded
advances in CallPacket/CreateStreamResPacket/PublishPacket/PlayPacket through a
bounds-checked advanceBytes helper so any future Size()/consumed mismatch becomes
a clean error instead of a slice-out-of-range panic.

Verified: full proxy unit suite passes; TestProtocolTransactionMapConcurrency
clean under go test -race -count=3; all proxy E2E scripts pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
winlin 2026-05-28 21:09:27 -04:00
parent abfb0cd8ae
commit 41528168e4
2 changed files with 1291 additions and 4 deletions

View File

@ -1306,6 +1306,12 @@ func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
}
p = p[v.TransactionID.Size():]
// Reset the optional command object before deciding whether it is present.
// A New*Packet constructor may have pre-set it to a default (e.g. Null), but
// when the wire data is exhausted here the object is absent. Leaving the stale
// default would make Size() count bytes that were never parsed, overflowing the
// caller's p = p[Size():] advance on truncated, untrusted input.
v.CommandObject = nil
if len(p) > 0 {
if v.CommandObject, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery command object")
@ -1376,14 +1382,32 @@ func (v *CallPacket) Size() int {
return size
}
// advanceBytes returns p[n:] after verifying n lies within p. Packet
// UnmarshalBinary advances its cursor by each embedded field's decoded Size();
// on untrusted wire input a malformed length can make Size() exceed the bytes
// actually present, so this guard turns a slice-out-of-range panic into a clean
// error. See the RTMP test plan, P8 (adversarial resource-safety).
func advanceBytes(p []byte, n int) ([]byte, error) {
if n < 0 || n > len(p) {
return nil, errors.Errorf("advance %v exceeds remaining %v bytes", n, len(p))
}
return p[n:], nil
}
func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
p := data
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
// Reset the optional args before deciding whether they are present, for the
// same reason as variantCallPacket.CommandObject: a stale constructor default
// would be counted by Size() and overflow a later advance.
v.Args = nil
if len(p) > 0 {
if v.Args, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery args")
@ -1459,7 +1483,9 @@ func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
if err = v.StreamID.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal sid")
@ -1509,7 +1535,9 @@ func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil {
@ -1569,7 +1597,9 @@ func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil {

File diff suppressed because it is too large Load Diff