Claude: RTMP: Use extended timestamp as delta for chunk fmt=1/2 in proxy.
Port the C++ srs_protocol_rtmp_stack.cpp fix (#4356) to the Go proxy's RTMP parser in internal/rtmp. For chunk fmt=1/2 the extended timestamp encodes a timestamp delta, not an absolute timestamp. The parser previously assigned the extended value to the message timestamp unconditionally, so once a delta reached 0xffffff the DTS was miscomputed, and since audio and video deltas differ this could cause A/V desync. Changes: - Split chunkStream's single ext-ts bool into hasExtendedTimestamp (presence) plus a raw extendedTimestamp uint32, mirroring the C++ has_extended_timestamp_ / extended_timestamp_ fields. - Compute the message timestamp once: extended value when present, else the 3-byte header delta; assign it as absolute for fmt=0 and accumulate it as a delta for fmt=1/2 (and a fmt=3 first chunk). - Resolve the 'detect the extended timestamp' TODO: peek the 4 bytes and leave them as payload when a librtmp/ffmpeg-style sender omits the ext-ts on a Type-3 chunk (Go equivalent of the C++ skip(-4)). - Add unit tests for the fmt=1 delta-crossing-0xffffff case and the Type-3 omitted-ext-ts case. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8df9410880
commit
26803ac4f4
|
|
@ -136,12 +136,21 @@ func newSettings() *settings {
|
|||
|
||||
// The chunk stream which transport a message once.
|
||||
type chunkStream struct {
|
||||
format formatType
|
||||
cid chunkID
|
||||
header messageHeader
|
||||
message *message
|
||||
count uint64
|
||||
extendedTimestamp bool
|
||||
format formatType
|
||||
cid chunkID
|
||||
header messageHeader
|
||||
message *message
|
||||
count uint64
|
||||
|
||||
// Whether the chunk carries an extended timestamp, set when the (delta) timestamp in
|
||||
// the message header equals 0xffffff. Type-3 continuation chunks inherit this from the
|
||||
// preceding Type-0/1/2 chunk.
|
||||
hasExtendedTimestamp bool
|
||||
// The raw value last read from the extended timestamp field. Kept separately from
|
||||
// header.Timestamp (the accumulated message timestamp) so we can both detect Type-3
|
||||
// chunks that omit the extended timestamp and use it as a delta for fmt=1/2 chunks.
|
||||
// See readMessageHeader.
|
||||
extendedTimestamp uint32
|
||||
}
|
||||
|
||||
func newChunkStream() *chunkStream {
|
||||
|
|
@ -540,29 +549,7 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
|
|||
// 0x00ffffff), this value MUST be 16777215, and the 'extended
|
||||
// timestamp header' MUST be present. Otherwise, this value SHOULD be
|
||||
// the entire delta.
|
||||
chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
|
||||
if !chunk.extendedTimestamp {
|
||||
// Extended timestamp: 0 or 4 bytes
|
||||
// This field MUST be sent when the normal timsestamp is set to
|
||||
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
|
||||
// anything else. So for values less than 0xffffff the normal
|
||||
// timestamp field SHOULD be used in which case the extended timestamp
|
||||
// MUST NOT be present. For values greater than or equal to 0xffffff
|
||||
// the normal timestamp field MUST NOT be used and MUST be set to
|
||||
// 0xffffff and the extended timestamp MUST be sent.
|
||||
if format == formatType0 {
|
||||
// 6.1.2.1. Type 0
|
||||
// For a type-0 chunk, the absolute timestamp of the message is sent
|
||||
// here.
|
||||
chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
|
||||
} else {
|
||||
// 6.1.2.2. Type 1
|
||||
// 6.1.2.3. Type 2
|
||||
// For a type-1 or type-2 chunk, the difference between the previous
|
||||
// chunk's timestamp and the current chunk's timestamp is sent here.
|
||||
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
||||
}
|
||||
}
|
||||
chunk.hasExtendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
|
||||
|
||||
if format <= formatType1 {
|
||||
payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
|
||||
|
|
@ -585,27 +572,58 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo
|
|||
p = p[4:]
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Update the timestamp even fmt=3 for first chunk packet
|
||||
if isFirstChunkOfMsg && !chunk.extendedTimestamp {
|
||||
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
||||
}
|
||||
}
|
||||
|
||||
// Read extended-timestamp
|
||||
if chunk.extendedTimestamp {
|
||||
var timestamp uint32
|
||||
if err = binary.Read(v.r, binary.BigEndian, ×tamp); err != nil {
|
||||
// Read extended-timestamp, present when the (delta) timestamp in the message header is
|
||||
// 0xffffff. Type-3 chunks inherit hasExtendedTimestamp from the preceding chunk.
|
||||
if chunk.hasExtendedTimestamp {
|
||||
// Peek instead of read, so the 4 bytes can be left in place when a sender omits the
|
||||
// extended timestamp on a Type-3 chunk (see the detection below).
|
||||
var b []byte
|
||||
if b, err = v.r.Peek(4); err != nil {
|
||||
return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
|
||||
}
|
||||
|
||||
// We always use 31bits timestamp, for some server may use 32bits extended timestamp.
|
||||
// @see https://github.com/ossrs/srs/issues/111
|
||||
timestamp &= 0x7fffffff
|
||||
timestamp := binary.BigEndian.Uint32(b) & 0x7fffffff
|
||||
|
||||
// TODO: FIXME: Support detect the extended timestamp.
|
||||
// For the RTMP v1 2009 version (6.1.3. Extended Timestamp), Type 3 chunks MUST NOT
|
||||
// have this field. For the RTMP v1 2012 version (5.3.1.3. Extended Timestamp), it is
|
||||
// present in Type 3 chunks when the most recent Type 0/1/2 chunk indicated one.
|
||||
//
|
||||
// FMLE/FMS/Flash Player follow the 2012 version and always send the extended
|
||||
// timestamp in Type 3 chunks; librtmp/ffmpeg may not. So detect it: if this is not
|
||||
// the first chunk of the message and the peeked value differs from the previously
|
||||
// stored extended timestamp, the sender omitted it and these 4 bytes are payload, so
|
||||
// leave them in the reader. Otherwise consume and store them.
|
||||
// @see http://blog.csdn.net/win_lin/article/details/13363699
|
||||
// @see https://github.com/veovera/enhanced-rtmp/issues/42
|
||||
if !isFirstChunkOfMsg && chunk.extendedTimestamp > 0 && chunk.extendedTimestamp != timestamp {
|
||||
// No extended timestamp on this Type-3 chunk; the 4 bytes belong to the payload.
|
||||
} else {
|
||||
if _, err = v.r.Discard(4); err != nil {
|
||||
return errors.Wrapf(err, "discard ext-ts, pkt-ts=%v", chunk.header.Timestamp)
|
||||
}
|
||||
chunk.extendedTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the message timestamp. The source is the extended timestamp when present,
|
||||
// otherwise the 3-byte (delta) timestamp from the message header.
|
||||
//
|
||||
// fmt=0: the value is the absolute timestamp of the message.
|
||||
// fmt=1/2 (and a fmt=3 first chunk continuing them): the value is a delta and is
|
||||
// accumulated onto the previous timestamp. This is required when the delta is >= 0xffffff
|
||||
// and is therefore carried in the extended timestamp.
|
||||
timestamp := chunk.header.timestampDelta
|
||||
if chunk.hasExtendedTimestamp {
|
||||
timestamp = chunk.extendedTimestamp
|
||||
}
|
||||
if format == formatType0 {
|
||||
chunk.header.Timestamp = uint64(timestamp)
|
||||
} else if isFirstChunkOfMsg {
|
||||
chunk.header.Timestamp += uint64(timestamp)
|
||||
}
|
||||
|
||||
// The extended-timestamp must be unsigned-int,
|
||||
|
|
|
|||
|
|
@ -177,6 +177,60 @@ func TestReadMessageExtendedTimestampAndChunking(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestReadMessageExtendedTimestampAsDeltaForFmt1(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
var in bytes.Buffer
|
||||
// fmt0 cid=5, timestamp=10, len=1, type video, stream=1, payload AA.
|
||||
in.Write([]byte{0x05, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x01, byte(MessageTypeVideo), 0x01, 0x00, 0x00, 0x00, 0xAA})
|
||||
// fmt1 cid=5, delta=0xffffff so the real delta is carried in the extended timestamp (=100),
|
||||
// len=1, type video, payload BB. For fmt=1/2 the extended timestamp is a delta, so the
|
||||
// message timestamp must accumulate: 10 + 100 = 110 (not be replaced by 100).
|
||||
in.Write([]byte{0x45, 0xff, 0xff, 0xff, 0x00, 0x00, 0x01, byte(MessageTypeVideo)})
|
||||
binary.Write(&in, binary.BigEndian, uint32(100))
|
||||
in.Write([]byte{0xBB})
|
||||
|
||||
p := NewProtocol(&in).(*protocol)
|
||||
for i, want := range []struct {
|
||||
ts uint64
|
||||
pl []byte
|
||||
}{
|
||||
{10, []byte{0xAA}},
|
||||
{110, []byte{0xBB}},
|
||||
} {
|
||||
m, err := p.ReadMessage(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadMessage #%v err=%v", i, err)
|
||||
}
|
||||
if m.Timestamp() != want.ts || !bytes.Equal(m.Payload(), want.pl) {
|
||||
t.Fatalf("message #%v ts=%v payload=%x", i, m.Timestamp(), m.Payload())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadMessageType3OmitsExtendedTimestamp(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
var in bytes.Buffer
|
||||
// fmt0 cid=5, timestamp=0xffffff so an extended timestamp (=100) is present, len=8,
|
||||
// type video, stream=1, with the first 4 payload bytes.
|
||||
in.Write([]byte{0x05, 0xff, 0xff, 0xff, 0x00, 0x00, 0x08, byte(MessageTypeVideo), 0x01, 0x00, 0x00, 0x00})
|
||||
binary.Write(&in, binary.BigEndian, uint32(100))
|
||||
in.Write([]byte{0x01, 0x02, 0x03, 0x04})
|
||||
// fmt3 continuation from a librtmp/ffmpeg-style sender that omits the extended timestamp.
|
||||
// The next 4 bytes are payload, not an extended timestamp; the parser must detect the
|
||||
// mismatch against the stored value (100) and treat them as payload, keeping ts=100.
|
||||
in.Write([]byte{0xc5, 0x05, 0x06, 0x07, 0x08})
|
||||
|
||||
p := NewProtocol(&in).(*protocol)
|
||||
p.input.opt.chunkSize = 4
|
||||
m, err := p.ReadMessage(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadMessage err=%v", err)
|
||||
}
|
||||
if m.Timestamp() != 100 || !bytes.Equal(m.Payload(), []byte{1, 2, 3, 4, 5, 6, 7, 8}) {
|
||||
t.Fatalf("ts=%v payload=%x", m.Timestamp(), m.Payload())
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadMessageHeaderErrors(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// Fresh non-zero chunk with fmt1 is rejected.
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user