RTMP: Fix chunk timestamp/basic-header decoding and harden packet unmarshal. v8.0.3 (#4680)

Fixes three RTMP chunk-stream decoding bugs in the proxy and hardens AMF0 command-packet unmarshalling against malformed input, backed by a new protocol unit-test suite.

All changes are confined to the `internal/rtmp` package. No public API, log format, or emitted wire format changes — these are decode-correctness and robustness fixes only.

**3-byte chunk basic header decode (`readBasicHeader`) **

The 3-byte basic-header form (cid 64–65599) was selected by testing `cid == 1` *after* `cid` had already been overwritten with `64 + t`, so it was never detected. Capture the original marker before overwriting and test that instead.

**Extended-timestamp handling (`chunkStream`, `readMessageHeader`)**

- Use the extended timestamp as a delta for fmt=1/2 chunks (and a fmt=3 first chunk continuing them), required when the delta is ≥ `0xffffff`. Timestamp computation is unified into a single post-step: extended timestamp when present, otherwise the 3-byte header delta; fmt=0 absolute, fmt=1/2 accumulated.
- Detect Type-3 chunks that omit the extended timestamp. FMLE/FMS/Flash follow the RTMP 2012 spec and always send it on Type-3 chunks; librtmp/ffmpeg may not. Switched from an unconditional 4-byte read to `Peek` + conditional `Discard`: if the peeked value differs from the stored one on a non-first chunk, those 4 bytes are payload and are left in the reader.
- Split the single `extendedTimestamp` bool into `hasExtendedTimestamp` (bool) and `extendedTimestamp` (the last raw value, used for the detection above).

**Packet unmarshal hardening**
- Add an `advanceBytes(p, n)` helper that bounds-checks each `p = p[field.Size():]` advance, turning a slice-out-of-range panic into a clean error on truncated/untrusted input. Applied in `CallPacket`, `CreateStreamResPacket`, `PublishPacket`, and `PlayPacket`.
- Reset the optional `CommandObject` / `Args` to nil before probing for their presence, so a stale constructor default (e.g. Null) isn't counted by `Size()` and can't overflow a later advance.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Winlin 2026-05-29 07:17:32 -04:00 committed by GitHub
parent 8df9410880
commit 0f980d49a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1414 additions and 49 deletions

View File

@ -136,12 +136,21 @@ func newSettings() *settings {
// The chunk stream which transport a message once.
type chunkStream struct {
format formatType
cid chunkID
header messageHeader
message *message
count uint64
extendedTimestamp bool
format formatType
cid chunkID
header messageHeader
message *message
count uint64
// Whether the chunk carries an extended timestamp, set when the (delta) timestamp in
// the message header equals 0xffffff. Type-3 continuation chunks inherit this from the
// preceding Type-0/1/2 chunk.
hasExtendedTimestamp bool
// The raw value last read from the extended timestamp field. Kept separately from
// header.Timestamp (the accumulated message timestamp) so we can both detect Type-3
// chunks that omit the extended timestamp and use it as a delta for fmt=1/2 chunks.
// See readMessageHeader.
extendedTimestamp uint32
}
func newChunkStream() *chunkStream {
@ -540,29 +549,7 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
// 0x00ffffff), this value MUST be 16777215, and the 'extended
// timestamp header' MUST be present. Otherwise, this value SHOULD be
// the entire delta.
chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
if !chunk.extendedTimestamp {
// Extended timestamp: 0 or 4 bytes
// This field MUST be sent when the normal timsestamp is set to
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
// anything else. So for values less than 0xffffff the normal
// timestamp field SHOULD be used in which case the extended timestamp
// MUST NOT be present. For values greater than or equal to 0xffffff
// the normal timestamp field MUST NOT be used and MUST be set to
// 0xffffff and the extended timestamp MUST be sent.
if format == formatType0 {
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here.
chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
}
}
chunk.hasExtendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
if format <= formatType1 {
payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
@ -585,27 +572,58 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
p = p[4:]
}
}
} else {
// Update the timestamp even fmt=3 for first chunk packet
if isFirstChunkOfMsg && !chunk.extendedTimestamp {
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
}
}
// Read extended-timestamp
if chunk.extendedTimestamp {
var timestamp uint32
if err = binary.Read(v.r, binary.BigEndian, &timestamp); err != nil {
// Read extended-timestamp, present when the (delta) timestamp in the message header is
// 0xffffff. Type-3 chunks inherit hasExtendedTimestamp from the preceding chunk.
if chunk.hasExtendedTimestamp {
// Peek instead of read, so the 4 bytes can be left in place when a sender omits the
// extended timestamp on a Type-3 chunk (see the detection below).
var b []byte
if b, err = v.r.Peek(4); err != nil {
return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
}
// We always use 31bits timestamp, for some server may use 32bits extended timestamp.
// @see https://github.com/ossrs/srs/issues/111
timestamp &= 0x7fffffff
timestamp := binary.BigEndian.Uint32(b) & 0x7fffffff
// TODO: FIXME: Support detect the extended timestamp.
// For the RTMP v1 2009 version (6.1.3. Extended Timestamp), Type 3 chunks MUST NOT
// have this field. For the RTMP v1 2012 version (5.3.1.3. Extended Timestamp), it is
// present in Type 3 chunks when the most recent Type 0/1/2 chunk indicated one.
//
// FMLE/FMS/Flash Player follow the 2012 version and always send the extended
// timestamp in Type 3 chunks; librtmp/ffmpeg may not. So detect it: if this is not
// the first chunk of the message and the peeked value differs from the previously
// stored extended timestamp, the sender omitted it and these 4 bytes are payload, so
// leave them in the reader. Otherwise consume and store them.
// @see http://blog.csdn.net/win_lin/article/details/13363699
// @see https://github.com/veovera/enhanced-rtmp/issues/42
if !isFirstChunkOfMsg && chunk.extendedTimestamp > 0 && chunk.extendedTimestamp != timestamp {
// No extended timestamp on this Type-3 chunk; the 4 bytes belong to the payload.
} else {
if _, err = v.r.Discard(4); err != nil {
return errors.Wrapf(err, "discard ext-ts, pkt-ts=%v", chunk.header.Timestamp)
}
chunk.extendedTimestamp = timestamp
}
}
// Compute the message timestamp. The source is the extended timestamp when present,
// otherwise the 3-byte (delta) timestamp from the message header.
//
// fmt=0: the value is the absolute timestamp of the message.
// fmt=1/2 (and a fmt=3 first chunk continuing them): the value is a delta and is
// accumulated onto the previous timestamp. This is required when the delta is >= 0xffffff
// and is therefore carried in the extended timestamp.
timestamp := chunk.header.timestampDelta
if chunk.hasExtendedTimestamp {
timestamp = chunk.extendedTimestamp
}
if format == formatType0 {
chunk.header.Timestamp = uint64(timestamp)
} else if isFirstChunkOfMsg {
chunk.header.Timestamp += uint64(timestamp)
}
// The extended-timestamp must be unsigned-int,
@ -696,6 +714,11 @@ func (v *protocol) readBasicHeader(ctx context.Context) (format formatType, cid
return
}
// Here cid is 0 or 1: a marker selecting the 2B or 3B form, not the real cid. Keep it,
// because cid is overwritten below and the marker decides whether a third byte (the
// high-order part of the cid) follows. Do not test the overwritten cid for this.
marker := cid
// 64-319, 2B chunk header
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
@ -703,7 +726,7 @@ func (v *protocol) readBasicHeader(ctx context.Context) (format formatType, cid
cid = chunkID(64 + uint32(t))
// 64-65599, 3B chunk header
if cid == 1 {
if marker == 1 {
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
}
@ -1283,6 +1306,12 @@ func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
}
p = p[v.TransactionID.Size():]
// Reset the optional command object before deciding whether it is present.
// A New*Packet constructor may have pre-set it to a default (e.g. Null), but
// when the wire data is exhausted here the object is absent. Leaving the stale
// default would make Size() count bytes that were never parsed, overflowing the
// caller's p = p[Size():] advance on truncated, untrusted input.
v.CommandObject = nil
if len(p) > 0 {
if v.CommandObject, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery command object")
@ -1353,14 +1382,32 @@ func (v *CallPacket) Size() int {
return size
}
// advanceBytes returns p[n:] after verifying n lies within p. Packet
// UnmarshalBinary advances its cursor by each embedded field's decoded Size();
// on untrusted wire input a malformed length can make Size() exceed the bytes
// actually present, so this guard turns a slice-out-of-range panic into a clean
// error. See the RTMP test plan, P8 (adversarial resource-safety).
func advanceBytes(p []byte, n int) ([]byte, error) {
if n < 0 || n > len(p) {
return nil, errors.Errorf("advance %v exceeds remaining %v bytes", n, len(p))
}
return p[n:], nil
}
func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
p := data
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
// Reset the optional args before deciding whether they are present, for the
// same reason as variantCallPacket.CommandObject: a stale constructor default
// would be counted by Size() and overflow a later advance.
v.Args = nil
if len(p) > 0 {
if v.Args, err = Amf0Discovery(p); err != nil {
return errors.WithMessage(err, "discovery args")
@ -1436,7 +1483,9 @@ func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
if err = v.StreamID.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal sid")
@ -1486,7 +1535,9 @@ func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil {
@ -1546,7 +1597,9 @@ func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
return errors.WithMessage(err, "unmarshal call")
}
p = p[v.variantCallPacket.Size():]
if p, err = advanceBytes(p, v.variantCallPacket.Size()); err != nil {
return errors.WithMessage(err, "advance call")
}
v.StreamName = newAmf0String("")
if err = v.StreamName.UnmarshalBinary(p); err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -15,7 +15,7 @@ func VersionMinor() int {
}
func VersionRevision() int {
return 2
return 3
}
func Version() string {

View File

@ -143,7 +143,7 @@ Only after the user confirms the routing do you proceed to Step 2.
```
bash scripts/proxy-utest.sh --coverage
```
4. Run the proxy E2E tests:
4. Run **all** of the proxy E2E tests below — every one, not just the first. Run them one at a time (they bind fixed ports, so they cannot run in parallel), and do not stop early: a later test can fail even when the earlier ones pass.
- Single-origin RTMP proxy test (starts proxy + one SRS origin, publishes RTMP, verifies playback):
```
bash scripts/proxy-e2e-test.sh

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v8-changes"></a>
## SRS 8.0 Changelog
* v8.0, 2026-05-28, Merge [#4680](https://github.com/ossrs/srs/pull/4680): RTMP: Fix chunk timestamp/basic-header decoding and harden packet unmarshal. v8.0.3 (#4680)
* v8.0, 2026-05-19, Merge [#4678](https://github.com/ossrs/srs/pull/4678): Edge: Fix HTTP-FLV 404 and RTMP late-join missing sequence headers. v8.0.2 (#4678)
* v8.0, 2026-05-17, Merge [#4676](https://github.com/ossrs/srs/pull/4676): Proxy: Fix RTC/SRT reader goroutine leak; unwrap legacy WHEP JSON envelope; add WHEP pprof guide. v8.0.1 (#4676)
* v8.0, 2026-05-17, Init SRS 8.0, code Free. v8.0.0

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 8
#define VERSION_MINOR 0
#define VERSION_REVISION 2
#define VERSION_REVISION 3
#endif