From 51aecb8fde638ce058d7600a0b48a972e7e4609d Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 27 Jan 2015 14:28:59 +0800 Subject: [PATCH] for #250, decode the ts packet header and adaptation field. --- trunk/src/app/srs_app_mpegts_udp.cpp | 41 ++-- trunk/src/app/srs_app_mpegts_udp.hpp | 6 +- trunk/src/kernel/srs_kernel_error.hpp | 3 + trunk/src/kernel/srs_kernel_ts.cpp | 257 +++++++++++++++++++++++++- trunk/src/kernel/srs_kernel_ts.hpp | 15 +- 5 files changed, 305 insertions(+), 17 deletions(-) diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 22b29843b..3a23a5daf 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -33,16 +33,21 @@ using namespace std; #include #include #include +#include +#include +#include #ifdef SRS_AUTO_STREAM_CASTER SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) { + stream = new SrsStream(); output = _srs_config->get_stream_caster_output(c); } SrsMpegtsOverUdp::~SrsMpegtsOverUdp() { + srs_freep(stream); } int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) @@ -59,22 +64,36 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) } srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); - // process each ts packet + // use stream to parse ts packet. for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) { - char* ts_packet = buf + i; - if ((ret = on_ts_packet(ts_packet)) != ERROR_SUCCESS) { - srs_warn("mpegts: ignore ts packet error. ret=%d", ret); - continue; + if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { + return ret; } + + // process each ts packet + if ((ret = on_ts_packet(stream)) != ERROR_SUCCESS) { + break; + } + srs_info("mpegts: parse ts packet completed"); + } + srs_info("mpegts: parse udp packet completed"); + + return ret; +} + +int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + SrsTsPacket* packet = new SrsTsPacket(); + SrsAutoFree(SrsTsPacket, packet); + + if ((ret = packet->decode(stream)) != ERROR_SUCCESS) { + srs_error("mpegts: decode ts packet failed. ret=%d", ret); + return ret; } return ret; } -int SrsMpegtsOverUdp::on_ts_packet(char* ts_packet) -{ - int ret = ERROR_SUCCESS; - return ret; -} - #endif diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index 80de64f05..4274a55ae 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class sockaddr_in; #include +class SrsStream; class SrsConfDirective; #ifdef SRS_AUTO_STREAM_CASTER @@ -43,6 +44,7 @@ class SrsConfDirective; class SrsMpegtsOverUdp { private: + SrsStream* stream; std::string output; public: SrsMpegtsOverUdp(SrsConfDirective* c); @@ -60,9 +62,9 @@ public: virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); private: /** - * when got a ts packet, in size TS_PACKET_SIZE. + * the stream contains the ts packet to parse. */ - virtual int on_ts_packet(char* ts_packet); + virtual int on_ts_packet(SrsStream* stream); }; #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 7c2677ef9..57f3eab4f 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -219,6 +219,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_MP3_DECODE_ERROR 4009 #define ERROR_STREAM_CASTER_ENGINE 4010 #define ERROR_STREAM_CASTER_PORT 4011 +#define ERROR_STREAM_CASTER_TS_HEADER 4012 +#define ERROR_STREAM_CASTER_TS_SYNC_BYTE 4013 +#define ERROR_STREAM_CASTER_TS_AF 4014 /** * whether the error code is an system control error. diff --git a/trunk/src/kernel/srs_kernel_ts.cpp b/trunk/src/kernel/srs_kernel_ts.cpp index 3d135b55a..8b698e852 100644 --- a/trunk/src/kernel/srs_kernel_ts.cpp +++ b/trunk/src/kernel/srs_kernel_ts.cpp @@ -38,6 +38,7 @@ using namespace std; #include #include #include +#include // in ms, for HLS aac sync time. #define SRS_CONF_DEFAULT_AAC_SYNC 100 @@ -418,8 +419,65 @@ SrsTsPacket::~SrsTsPacket() srs_freep(adaptation_field); } -SrsTsAdaptationField::SrsTsAdaptationField() +int SrsTsPacket::decode(SrsStream* stream) { + int ret = ERROR_SUCCESS; + + int pos = stream->pos(); + + // 4B ts packet header. + if (!stream->require(4)) { + ret = ERROR_STREAM_CASTER_TS_HEADER; + srs_error("ts: demux header failed. ret=%d", ret); + return ret; + } + + sync_byte = stream->read_1bytes(); + if (sync_byte != 0x47) { + ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE; + srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret); + return ret; + } + + int16_t pidv = stream->read_2bytes(); + transport_error_indicator = (pidv >> 15) & 0x01; + payload_unit_start_indicator = (pidv >> 14) & 0x01; + transport_priority = (pidv >> 13) & 0x01; + pid = (SrsTsPid)(pidv & 0x1FFF); + + int8_t ccv = stream->read_1bytes(); + transport_scrambling_control = (SrsTsScrambled)((ccv >> 6) & 0x03); + adaption_field_control = (SrsTsAdaptationFieldType)((ccv >> 4) & 0x03); + continuity_counter = (SrsTsPid)(ccv & 0x0F); + + // TODO: FIXME: create pids map when got new pid. + + srs_info("ts: header sync=%#x error=%d unit_start=%d priotiry=%d pid=%d scrambling=%d adaption=%d counter=%d", + sync_byte, transport_error_indicator, payload_unit_start_indicator, transport_priority, pid, + transport_scrambling_control, adaption_field_control, continuity_counter); + + // optional: adaptation field + if (adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) { + srs_freep(adaptation_field); + adaptation_field = new SrsTsAdaptationField(this); + + if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) { + srs_error("ts: demux af faield. ret=%d", ret); + return ret; + } + srs_verbose("ts: demux af ok."); + } + + // calc the user defined data size for payload. + int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos); + + return ret; +} + +SrsTsAdaptationField::SrsTsAdaptationField(SrsTsPacket* pkt) +{ + packet = pkt; + adaption_field_length = 0; discontinuity_indicator = 0; random_access_indicator = 0; @@ -456,6 +514,203 @@ SrsTsAdaptationField::SrsTsAdaptationField() SrsTsAdaptationField::~SrsTsAdaptationField() { + srs_freep(transport_private_data); +} + +int SrsTsAdaptationField::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(2)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af failed. ret=%d", ret); + return ret; + } + adaption_field_length = stream->read_1bytes(); + + // When the adaptation_field_control value is '11', the value of the adaptation_field_length shall + // be in the range 0 to 182. + if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret); + return ret; + } + // When the adaptation_field_control value is '10', the value of the adaptation_field_length shall + // be 183. + if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret); + return ret; + } + + // no adaptation field. + if (adaption_field_length == 0) { + srs_info("ts: demux af empty."); + return ret; + } + + // the adaptation field start at here. + int pos_af = stream->pos(); + int8_t tmpv = stream->read_1bytes(); + + discontinuity_indicator = (tmpv >> 7) & 0x01; + random_access_indicator = (tmpv >> 6) & 0x01; + elementary_stream_priority_indicator = (tmpv >> 5) & 0x01; + PCR_flag = (tmpv >> 4) & 0x01; + OPCR_flag = (tmpv >> 3) & 0x01; + splicing_point_flag = (tmpv >> 2) & 0x01; + transport_private_data_flag = (tmpv >> 1) & 0x01; + adaptation_field_extension_flag = (tmpv >> 0) & 0x01; + + if (PCR_flag) { + if (!stream->require(6)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af PCR_flag failed. ret=%d", ret); + return ret; + } + + char* pp = NULL; + char* p = stream->data(); + stream->skip(6); + + pp = (char*)&program_clock_reference_base; + pp[5] = *p++; + pp[4] = *p++; + pp[3] = *p++; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + + // @remark, use pcr base and ignore the extension + // @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370 + program_clock_reference_extension = program_clock_reference_base & 0x1ff; + program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL; + } + + if (OPCR_flag) { + if (!stream->require(6)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af OPCR_flag failed. ret=%d", ret); + return ret; + } + + char* pp = NULL; + char* p = stream->data(); + stream->skip(6); + + pp = (char*)&original_program_clock_reference_base; + pp[5] = *p++; + pp[4] = *p++; + pp[3] = *p++; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + + // @remark, use pcr base and ignore the extension + // @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370 + original_program_clock_reference_extension = program_clock_reference_base & 0x1ff; + original_program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL; + } + + if (splicing_point_flag) { + if (!stream->require(1)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af splicing_point_flag failed. ret=%d", ret); + return ret; + } + splice_countdown = stream->read_1bytes(); + } + + if (transport_private_data_flag) { + if (!stream->require(1)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret); + return ret; + } + transport_private_data_length = (u_int8_t)stream->read_1bytes(); + + if (!stream->require(transport_private_data_length)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret); + return ret; + } + srs_freep(transport_private_data); + transport_private_data = new char[transport_private_data_length]; + stream->read_bytes(transport_private_data, transport_private_data_length); + } + + if (adaptation_field_extension_flag) { + int pos_af_ext = stream->pos(); + + if (!stream->require(2)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af adaptation_field_extension_flag failed. ret=%d", ret); + return ret; + } + adaptation_field_extension_length = (u_int8_t)stream->read_1bytes(); + ltw_flag = stream->read_1bytes(); + + piecewise_rate_flag = (ltw_flag >> 6) & 0x01; + seamless_splice_flag = (ltw_flag >> 5) & 0x01; + ltw_flag = (ltw_flag >> 7) & 0x01; + + if (ltw_flag) { + if (!stream->require(2)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af ltw_flag failed. ret=%d", ret); + return ret; + } + ltw_offset = stream->read_2bytes(); + + ltw_valid_flag = (ltw_offset >> 15) &0x01; + ltw_offset &= 0x7FFF; + } + + if (piecewise_rate_flag) { + if (!stream->require(3)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af piecewise_rate_flag failed. ret=%d", ret); + return ret; + } + piecewise_rate = stream->read_3bytes(); + + piecewise_rate &= 0x3FFFFF; + } + + if (seamless_splice_flag) { + if (!stream->require(5)) { + ret = ERROR_STREAM_CASTER_TS_AF; + srs_error("ts: demux af seamless_splice_flag failed. ret=%d", ret); + return ret; + } + marker_bit0 = stream->read_1bytes(); + DTS_next_AU1 = stream->read_2bytes(); + DTS_next_AU2 = stream->read_2bytes(); + + splice_type = (marker_bit0 >> 4) & 0x0F; + DTS_next_AU0 = (marker_bit0 >> 1) & 0x07; + marker_bit0 &= 0x01; + + marker_bit1 = DTS_next_AU1 & 0x01; + DTS_next_AU1 = (DTS_next_AU1 >> 1) & 0x7FFF; + + marker_bit2 = DTS_next_AU2 & 0x01; + DTS_next_AU2 = (DTS_next_AU2 >> 1) & 0x7FFF; + } + + nb_af_ext_reserved = adaptation_field_extension_length - (stream->pos() - pos_af_ext); + stream->skip(nb_af_ext_reserved); + } + + nb_af_reserved = adaption_field_length - (stream->pos() - pos_af); + stream->skip(nb_af_reserved); + + srs_info("ts: af parsed, discontinuity=%d random=%d priority=%d PCR=%d OPCR=%d slicing=%d private=%d extension=%d/%d pcr=%"PRId64"/%d opcr=%"PRId64"/%d", + discontinuity_indicator, random_access_indicator, elementary_stream_priority_indicator, PCR_flag, OPCR_flag, splicing_point_flag, + transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base, + program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension); + + return ret; } SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w) diff --git a/trunk/src/kernel/srs_kernel_ts.hpp b/trunk/src/kernel/srs_kernel_ts.hpp index bf5c29f9f..22fb0199d 100644 --- a/trunk/src/kernel/srs_kernel_ts.hpp +++ b/trunk/src/kernel/srs_kernel_ts.hpp @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +class SrsStream; class SrsTsCache; class SrsTSMuxer; class SrsFileWriter; @@ -114,7 +115,7 @@ enum SrsTsAdaptationFieldType */ class SrsTsPacket { -private: +public: // 1B /** * The sync_byte is a fixed 8-bit field whose value is '0100 0111' (0x47). Sync_byte emulation in the choice of @@ -206,6 +207,11 @@ public: SrsTsPacket(); virtual ~SrsTsPacket(); public: + /** + * the stream contains only one ts packet. + * @remark we will consume all bytes in stream. + */ + virtual int decode(SrsStream* stream); }; /** @@ -215,7 +221,7 @@ public: */ class SrsTsAdaptationField { -private: +public: // 1B /** * The adaptation_field_length is an 8-bit field specifying the number of bytes in the @@ -500,10 +506,13 @@ private: * decoder. */ int nb_af_reserved; +private: + SrsTsPacket* packet; public: - SrsTsAdaptationField(); + SrsTsAdaptationField(SrsTsPacket* pkt); virtual ~SrsTsAdaptationField(); public: + virtual int decode(SrsStream* stream); }; /**