diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 988804b3e..2eaa12cbe 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -136,12 +136,21 @@ func newSettings() *settings { // The chunk stream which transport a message once. type chunkStream struct { - format formatType - cid chunkID - header messageHeader - message *message - count uint64 - extendedTimestamp bool + format formatType + cid chunkID + header messageHeader + message *message + count uint64 + + // Whether the chunk carries an extended timestamp, set when the (delta) timestamp in + // the message header equals 0xffffff. Type-3 continuation chunks inherit this from the + // preceding Type-0/1/2 chunk. + hasExtendedTimestamp bool + // The raw value last read from the extended timestamp field. Kept separately from + // header.Timestamp (the accumulated message timestamp) so we can both detect Type-3 + // chunks that omit the extended timestamp and use it as a delta for fmt=1/2 chunks. + // See readMessageHeader. + extendedTimestamp uint32 } func newChunkStream() *chunkStream { @@ -540,29 +549,7 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo // 0x00ffffff), this value MUST be 16777215, and the 'extended // timestamp header' MUST be present. Otherwise, this value SHOULD be // the entire delta. - chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp - if !chunk.extendedTimestamp { - // Extended timestamp: 0 or 4 bytes - // This field MUST be sent when the normal timsestamp is set to - // 0xffffff, it MUST NOT be sent if the normal timestamp is set to - // anything else. So for values less than 0xffffff the normal - // timestamp field SHOULD be used in which case the extended timestamp - // MUST NOT be present. For values greater than or equal to 0xffffff - // the normal timestamp field MUST NOT be used and MUST be set to - // 0xffffff and the extended timestamp MUST be sent. - if format == formatType0 { - // 6.1.2.1. Type 0 - // For a type-0 chunk, the absolute timestamp of the message is sent - // here. - chunk.header.Timestamp = uint64(chunk.header.timestampDelta) - } else { - // 6.1.2.2. Type 1 - // 6.1.2.3. Type 2 - // For a type-1 or type-2 chunk, the difference between the previous - // chunk's timestamp and the current chunk's timestamp is sent here. - chunk.header.Timestamp += uint64(chunk.header.timestampDelta) - } - } + chunk.hasExtendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp if format <= formatType1 { payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2]) @@ -585,27 +572,58 @@ func (v *protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, fo p = p[4:] } } - } else { - // Update the timestamp even fmt=3 for first chunk packet - if isFirstChunkOfMsg && !chunk.extendedTimestamp { - chunk.header.Timestamp += uint64(chunk.header.timestampDelta) - } } - // Read extended-timestamp - if chunk.extendedTimestamp { - var timestamp uint32 - if err = binary.Read(v.r, binary.BigEndian, ×tamp); err != nil { + // Read extended-timestamp, present when the (delta) timestamp in the message header is + // 0xffffff. Type-3 chunks inherit hasExtendedTimestamp from the preceding chunk. + if chunk.hasExtendedTimestamp { + // Peek instead of read, so the 4 bytes can be left in place when a sender omits the + // extended timestamp on a Type-3 chunk (see the detection below). + var b []byte + if b, err = v.r.Peek(4); err != nil { return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp) } // We always use 31bits timestamp, for some server may use 32bits extended timestamp. // @see https://github.com/ossrs/srs/issues/111 - timestamp &= 0x7fffffff + timestamp := binary.BigEndian.Uint32(b) & 0x7fffffff - // TODO: FIXME: Support detect the extended timestamp. + // For the RTMP v1 2009 version (6.1.3. Extended Timestamp), Type 3 chunks MUST NOT + // have this field. For the RTMP v1 2012 version (5.3.1.3. Extended Timestamp), it is + // present in Type 3 chunks when the most recent Type 0/1/2 chunk indicated one. + // + // FMLE/FMS/Flash Player follow the 2012 version and always send the extended + // timestamp in Type 3 chunks; librtmp/ffmpeg may not. So detect it: if this is not + // the first chunk of the message and the peeked value differs from the previously + // stored extended timestamp, the sender omitted it and these 4 bytes are payload, so + // leave them in the reader. Otherwise consume and store them. // @see http://blog.csdn.net/win_lin/article/details/13363699 + // @see https://github.com/veovera/enhanced-rtmp/issues/42 + if !isFirstChunkOfMsg && chunk.extendedTimestamp > 0 && chunk.extendedTimestamp != timestamp { + // No extended timestamp on this Type-3 chunk; the 4 bytes belong to the payload. + } else { + if _, err = v.r.Discard(4); err != nil { + return errors.Wrapf(err, "discard ext-ts, pkt-ts=%v", chunk.header.Timestamp) + } + chunk.extendedTimestamp = timestamp + } + } + + // Compute the message timestamp. The source is the extended timestamp when present, + // otherwise the 3-byte (delta) timestamp from the message header. + // + // fmt=0: the value is the absolute timestamp of the message. + // fmt=1/2 (and a fmt=3 first chunk continuing them): the value is a delta and is + // accumulated onto the previous timestamp. This is required when the delta is >= 0xffffff + // and is therefore carried in the extended timestamp. + timestamp := chunk.header.timestampDelta + if chunk.hasExtendedTimestamp { + timestamp = chunk.extendedTimestamp + } + if format == formatType0 { chunk.header.Timestamp = uint64(timestamp) + } else if isFirstChunkOfMsg { + chunk.header.Timestamp += uint64(timestamp) } // The extended-timestamp must be unsigned-int, diff --git a/internal/rtmp/rtmp_test.go b/internal/rtmp/rtmp_test.go index 9dc0013ca..09dc4c5f4 100644 --- a/internal/rtmp/rtmp_test.go +++ b/internal/rtmp/rtmp_test.go @@ -177,6 +177,60 @@ func TestReadMessageExtendedTimestampAndChunking(t *testing.T) { } } +func TestReadMessageExtendedTimestampAsDeltaForFmt1(t *testing.T) { + ctx := context.Background() + var in bytes.Buffer + // fmt0 cid=5, timestamp=10, len=1, type video, stream=1, payload AA. + in.Write([]byte{0x05, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x01, byte(MessageTypeVideo), 0x01, 0x00, 0x00, 0x00, 0xAA}) + // fmt1 cid=5, delta=0xffffff so the real delta is carried in the extended timestamp (=100), + // len=1, type video, payload BB. For fmt=1/2 the extended timestamp is a delta, so the + // message timestamp must accumulate: 10 + 100 = 110 (not be replaced by 100). + in.Write([]byte{0x45, 0xff, 0xff, 0xff, 0x00, 0x00, 0x01, byte(MessageTypeVideo)}) + binary.Write(&in, binary.BigEndian, uint32(100)) + in.Write([]byte{0xBB}) + + p := NewProtocol(&in).(*protocol) + for i, want := range []struct { + ts uint64 + pl []byte + }{ + {10, []byte{0xAA}}, + {110, []byte{0xBB}}, + } { + m, err := p.ReadMessage(ctx) + if err != nil { + t.Fatalf("ReadMessage #%v err=%v", i, err) + } + if m.Timestamp() != want.ts || !bytes.Equal(m.Payload(), want.pl) { + t.Fatalf("message #%v ts=%v payload=%x", i, m.Timestamp(), m.Payload()) + } + } +} + +func TestReadMessageType3OmitsExtendedTimestamp(t *testing.T) { + ctx := context.Background() + var in bytes.Buffer + // fmt0 cid=5, timestamp=0xffffff so an extended timestamp (=100) is present, len=8, + // type video, stream=1, with the first 4 payload bytes. + in.Write([]byte{0x05, 0xff, 0xff, 0xff, 0x00, 0x00, 0x08, byte(MessageTypeVideo), 0x01, 0x00, 0x00, 0x00}) + binary.Write(&in, binary.BigEndian, uint32(100)) + in.Write([]byte{0x01, 0x02, 0x03, 0x04}) + // fmt3 continuation from a librtmp/ffmpeg-style sender that omits the extended timestamp. + // The next 4 bytes are payload, not an extended timestamp; the parser must detect the + // mismatch against the stored value (100) and treat them as payload, keeping ts=100. + in.Write([]byte{0xc5, 0x05, 0x06, 0x07, 0x08}) + + p := NewProtocol(&in).(*protocol) + p.input.opt.chunkSize = 4 + m, err := p.ReadMessage(ctx) + if err != nil { + t.Fatalf("ReadMessage err=%v", err) + } + if m.Timestamp() != 100 || !bytes.Equal(m.Payload(), []byte{1, 2, 3, 4, 5, 6, 7, 8}) { + t.Fatalf("ts=%v payload=%x", m.Timestamp(), m.Payload()) + } +} + func TestReadMessageHeaderErrors(t *testing.T) { ctx := context.Background() // Fresh non-zero chunk with fmt1 is rejected.