diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 297292b1d..90cc389c4 100644 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -22,16 +22,10 @@ srt_server { listen 10080; } +vhost sina.mobile.com.cn { +} + stats { network 0; disk sda sdb xvda xvdb; } -#vhost __defaultVhost__ { -# forward { -# enabled on; -# destination 172.16.43.153:19350; -# } -#} - -vhost sina.mobile.com.cn { -} diff --git a/trunk/configure b/trunk/configure index 5cfcc9151..058b1f992 100755 --- a/trunk/configure +++ b/trunk/configure @@ -211,7 +211,7 @@ PROTOCOL_OBJS="${MODULE_OBJS[@]}" MODULE_ID="SRT" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") ModuleLibIncs=(${SRS_OBJS_DIR}) -MODULE_FILES=("srt_server" "srt_handle" "srt_conn") +MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp") SRT_INCS="src/srt"; MODULE_DIR=${SRT_INCS} . auto/modules.sh SRT_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/src/kernel/srs_kernel_log.hpp b/trunk/src/kernel/srs_kernel_log.hpp index 1d7af37bb..670a5b9c6 100644 --- a/trunk/src/kernel/srs_kernel_log.hpp +++ b/trunk/src/kernel/srs_kernel_log.hpp @@ -142,5 +142,26 @@ extern ISrsThreadContext* _srs_context; #define srs_trace(msg, ...) (void)0 #endif +inline void srs_trace_data(const char* data_p, int len, const char* dscr) +{ + const int MAX = 256; + char debug_data[MAX]; + char debug_len = 0; + + debug_len += sprintf(debug_data + debug_len, "%s", dscr); + + for (int index = 0; index < len; index++) { + if (index % 16 == 0) { + debug_len += sprintf(debug_data + debug_len, "\r\n"); + } + debug_len += sprintf(debug_data + debug_len, " %02x", (unsigned char)data_p[index]); + if (index >= 32) { + break; + } + } + + srs_trace("%s", debug_data); + return; +} #endif diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp index 91d79c06c..168ca0769 100644 --- a/trunk/src/srt/srt_conn.hpp +++ b/trunk/src/srt/srt_conn.hpp @@ -1,5 +1,6 @@ #ifndef SRT_CONN_H #define SRT_CONN_H +#include "stringex.hpp" #include #include #include @@ -9,6 +10,7 @@ #include #include +#define ERR_SRT_MODE 0x00 #define PULL_SRT_MODE 0x01 #define PUSH_SRT_MODE 0x02 @@ -29,10 +31,13 @@ inline void get_streamid_info(const std::string& streamid, int& mode, std::strin mode = PULL_SRT_MODE; } else { mode_str = mode_str.substr(mode_pos+2); + mode_str = string_lower(mode_str); if (mode_str == "push") { mode = PUSH_SRT_MODE; - } else { + } else if(mode_str == "pull"){ mode = PULL_SRT_MODE; + } else { + mode = ERR_SRT_MODE; } } } diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp new file mode 100644 index 000000000..8cfec1864 --- /dev/null +++ b/trunk/src/srt/srt_data.cpp @@ -0,0 +1,24 @@ +#include "srt_data.hpp" + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) + ,_key_path(path) +{ + _data_p = new unsigned char[len]; + memcpy(_data_p, data_p, len); +} + +SRT_DATA_MSG::~SRT_DATA_MSG() { + delete _data_p; +} + +std::string SRT_DATA_MSG::get_path() { + return _key_path; +} + +unsigned int SRT_DATA_MSG::data_len() { + return _len; +} + +unsigned char* SRT_DATA_MSG::get_data() { + return _data_p; +} diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp new file mode 100644 index 000000000..a1a225b90 --- /dev/null +++ b/trunk/src/srt/srt_data.hpp @@ -0,0 +1,23 @@ +#ifndef SRT_DATA_H +#define SRT_DATA_H +#include +#include + +class SRT_DATA_MSG { +public: + SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); + ~SRT_DATA_MSG(); + + unsigned int data_len(); + unsigned char* get_data(); + std::string get_path(); + +private: + unsigned int _len; + unsigned char* _data_p; + std::string _key_path; +}; + +typedef std::shared_ptr SRT_DATA_MSG_PTR; + +#endif \ No newline at end of file diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 75495289a..60914e2a4 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -22,9 +22,10 @@ const long long CHECK_ALIVE_TIMEOUT = 15*1000; long long srt_now_ms = 0; -srt_handle::srt_handle():_last_timestamp(0) +srt_handle::srt_handle():_run_flag(false) + ,_last_timestamp(0) ,_last_check_alive_ts(0) { - + _srt2rtmp_ptr = std::make_shared(); } srt_handle::~srt_handle() { @@ -38,14 +39,18 @@ int srt_handle::start() { return -1; } + _run_flag = true; srs_trace("srt handle is starting..."); _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); + _srt2rtmp_ptr->start(); return 0; } void srt_handle::stop() { + _run_flag = false; _work_thread_ptr->join(); + _srt2rtmp_ptr->stop(); return; } @@ -219,7 +224,7 @@ void srt_handle::onwork() const int64_t DEF_TIMEOUT_INTERVAL = 30; srs_trace("srt handle epoll work is starting..."); - while(true) + while(_run_flag) { SRTSOCKET read_fds[SRT_FD_MAX]; SRTSOCKET write_fds[SRT_FD_MAX]; @@ -289,6 +294,8 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp return; } srt_conn_ptr->update_timestamp(srt_now_ms); + + _srt2rtmp_ptr->insert_data_message(data, ret, subpath); //send data to subscriber(players) //streamid, play map diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp index aed8168dd..dd9e0f926 100644 --- a/trunk/src/srt/srt_handle.hpp +++ b/trunk/src/srt/srt_handle.hpp @@ -10,6 +10,7 @@ #include #include "srt_conn.hpp" +#include "srt_to_rtmp.hpp" typedef struct { SRT_CONN_PTR conn_ptr; @@ -58,6 +59,7 @@ private: void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); private: + bool _run_flag; int _handle_pollid; std::unordered_map _conn_map;//save all srt connection: pull or push @@ -73,6 +75,8 @@ private: long long _last_timestamp; long long _last_check_alive_ts; + + SRT2RTMP_PTR _srt2rtmp_ptr; }; #endif //SRT_HANDLE_H \ No newline at end of file diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index d4f0214de..691abaa49 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -134,8 +134,13 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) { //add SRT_EPOLL_IN for information notify conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu - } else { + } else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) { conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR; + } else { + srs_trace("stream mode error, it shoulde be m=push or m=pull, streamid:%s", + srt_conn_ptr->get_streamid().c_str()); + srt_conn_ptr->close(); + return; } request_message_t msg = {srt_conn_ptr, conn_event}; handle_ptr->insert_message_queue(msg); diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp new file mode 100644 index 000000000..647931738 --- /dev/null +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -0,0 +1,427 @@ +#include "srt_to_rtmp.hpp" +#include +#include +#include +#include +#include +#include + +srt2rtmp::srt2rtmp():_run_flag(false) { + +} + +srt2rtmp::~srt2rtmp() { + +} + +void srt2rtmp::start() { + _run_flag = true; + + _thread_ptr = std::make_shared(&srt2rtmp::on_work, this); + + return; +} + +void srt2rtmp::stop() { + _run_flag = false; + + _thread_ptr->join(); + return; +} + +void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) { + std::unique_lock locker(_mutex); + if (!_run_flag) { + return; + } + SRT_DATA_MSG_PTR msg_ptr = std::make_shared(data_p, len, key_path); + _msg_queue.push(msg_ptr); + _notify_cond.notify_one(); + return; +} + +SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { + std::unique_lock locker(_mutex); + SRT_DATA_MSG_PTR msg_ptr; + + while (_msg_queue.empty() && _run_flag) { + _notify_cond.wait(locker); + } + + msg_ptr = _msg_queue.front(); + _msg_queue.pop(); + return msg_ptr; +} + +void srt2rtmp::on_work() { + while(_run_flag) { + SRT_DATA_MSG_PTR msg_ptr = get_data_message(); + + if (!msg_ptr) { + continue; + } + handle_ts_data(msg_ptr); + } +} + +void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { + RTMP_CLIENT_PTR rtmp_ptr; + auto iter = _rtmp_client_map.find(data_ptr->get_path()); + if (iter == _rtmp_client_map.end()) { + srs_trace("new rtmp client for srt upstream, key_path:%s", data_ptr->get_path().c_str()); + rtmp_ptr = std::make_shared(data_ptr->get_path()); + _rtmp_client_map.insert(std::make_pair(data_ptr->get_path(), rtmp_ptr)); + } else { + rtmp_ptr = iter->second; + } + + rtmp_ptr->receive_ts_data(data_ptr); + + return; +} + +rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) { + _ts_ctx_ptr = std::make_shared(); + _avc_ptr = std::make_shared(); + _aac_ptr = std::make_shared(); + + char url_sz[128]; + sprintf(url_sz, "rtmp://127.0.0.1/%s", key_path.c_str()); + _url = url_sz; + + _h264_sps_changed = false; + _h264_pps_changed = false; + _h264_sps_pps_sent = false; + srs_trace("rtmp client construct url:%s", url_sz); +} + +rtmp_client::~rtmp_client() { + +} + +void rtmp_client::close() { + if (!_rtmp_conn_ptr) { + return; + } + _rtmp_conn_ptr->close(); + _rtmp_conn_ptr = nullptr; +} + +srs_error_t rtmp_client::connect() { + srs_error_t err = srs_success; + srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; + srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + + if (_rtmp_conn_ptr.get() != nullptr) { + return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.", + _url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + _rtmp_conn_ptr = std::make_shared(_url, cto, sto); + + if ((err = _rtmp_conn_ptr->connect()) != srs_success) { + close(); + return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", + _url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + if ((err = _rtmp_conn_ptr->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { + close(); + return srs_error_wrap(err, "publish error, url:%s", _url.c_str()); + } + + return err; +} +void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { + SrsBuffer* buffer_p = new SrsBuffer((char*)data_ptr->get_data(), data_ptr->data_len()); + + FILE* file_p = fopen("1.ts", "ab+"); + if (file_p) { + fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p); + fclose(file_p); + } + //srs_trace_data((char*)data_ptr->get_data(), data_ptr->data_len(), "receive ts data"); + _ts_ctx_ptr->decode(buffer_p, this);//on_ts_message is the decode callback + return; +} + +srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { + srs_error_t err = srs_success; + + // TODO: FIMXE: there exists bug, see following comments. + // when sps or pps changed, update the sequence header, + // for the pps maybe not changed while sps changed. + // so, we must check when each video ts message frame parsed. + if (!_h264_sps_changed || !_h264_pps_changed) { + return err; + } + + // h264 raw to h264 packet. + std::string sh; + if ((err = _avc_ptr->mux_sequence_header(_h264_sps, _h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + // h264 packet to flv packet. + int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; + int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((err = _avc_ptr->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); + } + + // reset sps and pps. + _h264_sps_changed = false; + _h264_pps_changed = false; + _h264_sps_pps_sent = true; + + return err; +} + +srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) { + srs_error_t err = srs_success; + + // when sps or pps not sent, ignore the packet. + // @see https://github.com/ossrs/srs/issues/203 + if (!_h264_sps_pps_sent) { + return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps"); + } + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsVideoAvcFrameTypeKeyFrame; + } + + std::string ibp; + if ((err = _avc_ptr->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux frame"); + } + + int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; + char* flv = NULL; + int nb_flv = 0; + if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); +} + +srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { + srs_error_t err = srs_success; + + char* data = NULL; + int size = 0; + if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); + } + + return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); +} + +srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { + srs_error_t err = srs_success; + + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + SrsSharedPtrMessage* msg = NULL; + + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + srs_assert(msg); + + // send out encoded msg. + if ((err = _rtmp_conn_ptr->send_and_free_message(msg)) != srs_success) { + close(); + return srs_error_wrap(err, "send messages"); + } + + return err; +} + +srs_error_t rtmp_client::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) { + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + // ts tbn to flv tbn. + uint32_t dts = (uint32_t)(msg->dts / 90); + uint32_t pts = (uint32_t)(msg->dts / 90); + + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + if ((err = _avc_ptr->annexb_demux(avs, &frame, &frame_size)) != srs_success) { + return srs_error_wrap(err, "demux annexb"); + } + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // ignore the nalu type sps(7), pps(8), aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) { + continue; + } + + // for sps + if (_avc_ptr->is_sps(frame, frame_size)) { + std::string sps; + if ((err = _avc_ptr->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); + } + + if (_h264_sps == sps) { + continue; + } + _h264_sps_changed = true; + _h264_sps = sps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // for pps + if (_avc_ptr->is_pps(frame, frame_size)) { + std::string pps; + if ((err = _avc_ptr->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); + } + + if (_h264_pps == pps) { + continue; + } + _h264_pps_changed = true; + _h264_pps = pps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // ibp frame. + // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. + srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); + } + } + + return err; +} + +srs_error_t rtmp_client::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) { + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + // ts tbn to flv tbn. + uint32_t dts = (uint32_t)(msg->dts / 90); + + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + SrsRawAacStreamCodec codec; + if ((err = _aac_ptr->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) { + return srs_error_wrap(err, "demux adts"); + } + + // ignore invalid frame, + // * atleast 1bytes for aac to decode the data. + if (frame_size <= 0) { + continue; + } + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts); + + // generate sh. + if (_aac_specific_config.empty()) { + std::string sh; + if ((err = _aac_ptr->mux_sequence_header(&codec, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + _aac_specific_config = sh; + + codec.aac_packet_type = 0; + + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write raw audio frame"); + } + } + + // audio raw data. + codec.aac_packet_type = 1; + if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + + return err; +} + +srs_error_t rtmp_client::on_ts_message(SrsTsMessage* msg) { + srs_error_t err = srs_success; + + srs_trace("ts demux len:%d", msg->payload->length()); + // When the audio SID is private stream 1, we use common audio. + // @see https://github.com/ossrs/srs/issues/740 + if (msg->channel->apply == SrsTsPidApplyAudio && msg->sid == SrsTsPESStreamIdPrivateStream1) { + msg->sid = SrsTsPESStreamIdAudioCommon; + } + + // when not audio/video, or not adts/annexb format, donot support. + if (msg->stream_number() != 0) { + return srs_error_new(ERROR_STREAM_CASTER_TS_ES, "ts: unsupported stream format, sid=%#x(%s-%d)", + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); + } + + // check supported codec + if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) { + return srs_error_new(ERROR_STREAM_CASTER_TS_CODEC, "ts: unsupported stream codec=%d", msg->channel->stream); + } + + // parse the stream. + SrsBuffer avs(msg->payload->bytes(), msg->payload->length()); + + // publish audio or video. + if (msg->channel->stream == SrsTsStreamVideoH264) { + if ((err = on_ts_video(msg, &avs)) != srs_success) { + return srs_error_wrap(err, "ts: consume video"); + } + } + if (msg->channel->stream == SrsTsStreamAudioAAC) { + if ((err = on_ts_audio(msg, &avs)) != srs_success) { + return srs_error_wrap(err, "ts: consume audio"); + } + } + return err; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp new file mode 100644 index 000000000..55a9c2438 --- /dev/null +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -0,0 +1,93 @@ +#ifndef SRT_TO_RTMP_H +#define SRT_TO_RTMP_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "srt_data.hpp" + +#define SRT_VIDEO_MSG_TYPE 0x01 +#define SRT_AUDIO_MSG_TYPE 0x02 + +typedef std::shared_ptr RTMP_CONN_PTR; +typedef std::shared_ptr AVC_PTR; +typedef std::shared_ptr AAC_PTR; + +class rtmp_client : public ISrsTsHandler { +public: + rtmp_client(std::string key_path); + ~rtmp_client(); + + void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); + +private: + virtual srs_error_t on_ts_message(SrsTsMessage* msg); + srs_error_t connect(); + void close(); + +private: + srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); + srs_error_t on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs); + virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); + +private: + virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); + +private: + std::string _key_path; + std::string _url; + std::shared_ptr _ts_ctx_ptr; + +private: + AVC_PTR _avc_ptr; + std::string _h264_sps; + bool _h264_sps_changed; + std::string _h264_pps; + bool _h264_pps_changed; + bool _h264_sps_pps_sent; +private: + std::string _aac_specific_config; + AAC_PTR _aac_ptr; +private: + RTMP_CONN_PTR _rtmp_conn_ptr; +}; + +typedef std::shared_ptr RTMP_CLIENT_PTR; + +class srt2rtmp { +public: + srt2rtmp(); + virtual ~srt2rtmp(); + + void start(); + void stop(); + void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); + +private: + SRT_DATA_MSG_PTR get_data_message(); + void on_work(); + void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + +private: + std::shared_ptr _thread_ptr; + std::mutex _mutex; + std::condition_variable_any _notify_cond; + std::queue _msg_queue; + + std::unordered_map _rtmp_client_map; + bool _run_flag; +}; + +typedef std::shared_ptr SRT2RTMP_PTR; + +#endif \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp new file mode 100644 index 000000000..ba6dbb080 --- /dev/null +++ b/trunk/src/srt/ts_demux.cpp @@ -0,0 +1,274 @@ +#include "ts_demux.hpp" +#include + +ts_demux::ts_demux() { + +} + +ts_demux::~ts_demux() { + +} + +int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) { + + return 0; +} + +int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { + int pos = 0; + int npos = 0; + + ts_header_info._sync_byte = data_p[pos]; + pos++; + + ts_header_info._transport_error_indicator = (data_p[pos]&0x80)>>7; + ts_header_info._payload_unit_start_indicator = (data_p[pos]&0x40)>>6; + ts_header_info._transport_priority = (data_p[pos]&0x20)>>5; + ts_header_info._PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; + pos += 2; + + ts_header_info._transport_scrambling_control = (data_p[pos]&0xC0)>>6; + ts_header_info._adaptation_field_control = (data_p[pos]&0x30)>>4; + ts_header_info._continuity_counter = (data_p[pos]&0x0F); + pos++; + npos = pos; + + adaptation_field* field_p = &(ts_header_info._adaptation_field_info); + if( ts_header_info._adaptation_field_control == 2 + || ts_header_info._adaptation_field_control == 3 ){ + // adaptation_field() + field_p->_adaptation_field_length = data_p[pos]; + pos++; + + if( field_p->_adaptation_field_length > 0 ){ + field_p->_discontinuity_indicator = (data_p[pos]&0x80)>>7; + field_p->_random_access_indicator = (data_p[pos]&0x40)>>6; + field_p->_elementary_stream_priority_indicator = (data_p[pos]&0x20)>>5; + field_p->_PCR_flag = (data_p[pos]&0x10)>>4; + field_p->_OPCR_flag = (data_p[pos]&0x08)>>3; + field_p->_splicing_point_flag = (data_p[pos]&0x04)>>2; + field_p->_transport_private_data_flag = (data_p[pos]&0x02)>>1; + field_p->_adaptation_field_extension_flag = (data_p[pos]&0x01); + pos++; + + if( field_p->_PCR_flag == 1 ) { // PCR info + //program_clock_reference_base 33 uimsbf + //reserved 6 bslbf + //program_clock_reference_extension 9 uimsbf + pos += 6; + } + if( field_p->_OPCR_flag == 1 ) { + //original_program_clock_reference_base 33 uimsbf + //reserved 6 bslbf + //original_program_clock_reference_extension 9 uimsbf + pos += 6; + } + if( field_p->_splicing_point_flag == 1 ) { + //splice_countdown 8 tcimsbf + pos++; + } + if( field_p->_transport_private_data_flag == 1 ) { + //transport_private_data_length 8 uimsbf + field_p->_transport_private_data_length = data_p[pos]; + pos++; + memcpy(field_p->_private_data_byte, data_p + pos, field_p->_transport_private_data_length); + } + if( field_p->_adaptation_field_extension_flag == 1 ) { + //adaptation_field_extension_length 8 uimsbf + field_p->_adaptation_field_extension_length = data_p[pos]; + pos++; + //ltw_flag 1 bslbf + field_p->_ltw_flag = (data_p[pos]&0x80)>>7; + //piecewise_rate_flag 1 bslbf + field_p->_piecewise_rate_flag = (data_p[pos]&0x40)>>6; + //seamless_splice_flag 1 bslbf + field_p->_seamless_splice_flag = (data_p[pos]&0x20)>>5; + //reserved 5 bslbf + pos++; + if (field_p->_ltw_flag == 1) { + //ltw_valid_flag 1 bslbf + //ltw_offset 15 uimsbf + pos += 2; + } + if (field_p->_piecewise_rate_flag == 1) { + //reserved 2 bslbf + //piecewise_rate 22 uimsbf + pos += 3; + } + if (field_p->_seamless_splice_flag == 1) { + //splice_type 4 bslbf + //DTS_next_AU[32..30] 3 bslbf + //marker_bit 1 bslbf + //DTS_next_AU[29..15] 15 bslbf + //marker_bit 1 bslbf + //DTS_next_AU[14..0] 15 bslbf + //marker_bit 1 bslbf + pos += 5; + } + } + } + + + npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length; + assert(pos == npos); + } + + if(ts_header_info._adaptation_field_control == 1 + || ts_header_info._adaptation_field_control == 3 ) { + // data_byte with placeholder + // payload parser + if(ts_header_info._PID == 0x00){ + // PAT // program association table + if(ts_header_info._payload_unit_start_indicator) { + pos++; + } + _pat._table_id = data_p[pos]; + pos++; + _pat._section_syntax_indicator = (data_p[pos]>>7)&0x01; + // skip 3 bits of 1 zero and 2 reserved + _pat._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; + pos += 2; + _pat._transport_stream_id = (data_p[pos]<<8)|data_p[pos+1]; + pos += 2; + // reserved 2 bits + _pat._version_number = (data_p[pos]&0x3E)>>1; + _pat._current_next_indicator = data_p[pos]&0x01; + pos++; + _pat._section_number = data_p[pos]; + pos++; + _pat._last_section_number = data_p[pos]; + assert(table_id == 0x00); + assert(188-npos>section_length+3); // PAT = section_length + 3 + pos++; + _pat._pid_vec.clear(); + for (;pos+4 <= section_length-5-4+9;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte ) + PID_INFO pid_info; + //program_number 16 uimsbf + pid_info._program_number = data_p[pos]<<8|data_p[pos+1]; + pos += 2; +// reserved 3 bslbf + + if (pid_info._program_number == 0) { +// // network_PID 13 uimsbf + pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + pos += 2; + } + else { +// // program_map_PID 13 uimsbf + pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + pos += 2; + } + _pat._pid_vec.push_back(pid_info); + // network_PID and program_map_PID save to list + } +// CRC_32 use pat to calc crc32, eq + pos += 4; + }else if(ts_header_info._PID == 0x01){ + // CAT // conditional access table + }else if(ts_header_info._PID == 0x02){ + //TSDT // transport stream description table + }else if(ts_header_info._PID == 0x03){ + //IPMP // IPMP control information table + // 0x0004-0x000F Reserved + // 0x0010-0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes + }else if(ts_header_info._PID == 0x11){ + // SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream + }else if(is_pmt(ts_header_info._PID)) { + int rpos = 0; + if(ts_header_info._payload_unit_start_indicator) + rpos++; + _pmt._table_id = data_p[rpos]; + pos++; + _pmt._section_syntax_indicator = (data_p[rpos]>>7)&0x01; + // skip 3 bits of 1 zero and 2 reserved + _pmt._section_length = ((data_p[rpos]<<8)|data_p[pos+1])&0x0FFF; + pos += 2; + _pmt._program_number = (data_p[rpos]<<8)|data_p[rpos+1]; + pos += 2; + // reserved 2 bits + _pmt._version_number = (data_p[pos]&0x3E)>>1; + _pmt._current_next_indicator = data_p[pos]&0x01; + pos++; + _pmt._section_number = data_p[pos]; + pos++; + _pmt._last_section_number = data_p[pos]; + pos++; + // skip 3 bits for reserved 3 bslbf + _pmt._PCR_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //PCR_PID 13 uimsbf + pos += 2; + + //reserved 4 bslbf + _pmt._program_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;//program_info_length 12 uimsbf + pos += 2; + assert(_pmt._table_id==0x02); // 0x02, // TS_program_map_section + memcpy(_pmt._dscr, data_p+pos, _pmt._program_info_length); +// for (i = 0; i < N; i++) { +// descriptor() +// } + pos += _pmt._program_info_length; + _pmt._stream_pid_vec.clear(); + for (; rpos + 5 <= _pmt._section_number + 4 - 4; ) { // rpos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32) + STREAM_PID_INFO pid_info; + pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + pos++; + //reserved 3 bslbf + pid_info._elementary_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //elementary_PID 13 uimsbf + pos += 2; + //reserved 4 bslbf + pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf + pos += 2; + if( rpos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 ) + break; + int absES_info_length = rpos + pid_info._ES_info_length; + for (; rpos +#include +#include + +class ts_media_data_callback_I { +public: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr); +}; + +typedef std::shared_ptr TS_DATA_CALLBACK_PTR; + +class adaptation_field { +public: + adaptation_field(){}; + ~adaptation_field(){}; + +public: + unsigned char _adaptation_field_length; + + unsigned char _discontinuity_indicator:1; + unsigned char _random_access_indicator:1; + unsigned char _elementary_stream_priority_indicator:1; + unsigned char _PCR_flag:1; + unsigned char _OPCR_flag:1; + unsigned char _splicing_point_flag:1; + unsigned char _transport_private_data_flag:1; + unsigned char _adaptation_field_extension_flag:1; + + //if(PCR_flag == '1') + unsigned long _program_clock_reference_base;//33 bits + unsigned short _program_clock_reference_extension;//9bits + //if (OPCR_flag == '1') + unsigned long _original_program_clock_reference_base;//33 bits + unsigned short _original_program_clock_reference_extension;//9bits + //if (splicing_point_flag == '1') + unsigned char _splice_countdown; + //if (transport_private_data_flag == '1') + unsigned char _transport_private_data_length; + unsigned char _private_data_byte[256]; + //if (adaptation_field_extension_flag == '1') + unsigned char _adaptation_field_extension_length; + unsigned char _ltw_flag; + unsigned char _piecewise_rate_flag; + unsigned char _seamless_splice_flag; + unsigned char _reserved0; + //if (ltw_flag == '1') + unsigned short _ltw_valid_flag:1; + unsigned short _ltw_offset:15; + //if (piecewise_rate_flag == '1') + unsigned int _piecewise_rate;//22bits + //if (seamless_splice_flag == '1') + unsigned char _splice_type;//4bits + unsigned char _DTS_next_AU1;//3bits + unsigned char _marker_bit1;//1bit + unsigned short _DTS_next_AU2;//15bit + unsigned char _marker_bit2;//1bit + unsigned short _DTS_next_AU3;//15bit +}; + +class ts_header { +public: + ts_header(){} + ~ts_header(){} + +public: + unsigned char _sync_byte; + + unsigned short _transport_error_indicator:1; + unsigned short _payload_unit_start_indicator:1; + unsigned short _transport_priority:1; + unsigned short _PID:13; + + unsigned char _transport_scrambling_control:2; + unsigned char _adaptation_field_control:2; + unsigned char _continuity_counter:4; + + adaptation_field _adaptation_field_info; +}; + +typedef struct { + unsigned short _program_number; + unsigned short _pid; + unsigned short _network_id; +} PID_INFO; + +class pat_info { +public: + pat_info(); + ~pat_info(); + +public: + unsigned char _table_id; + + unsigned short _section_syntax_indicator:1; + unsigned short _reserved0:1; + unsigned short _reserved1:2; + unsigned short _section_length:12; + + unsigned short transport_stream_id; + + unsigned char _reserved3:2; + unsigned char _version_number:5; + unsigned char _current_next_indicator:1; + + unsigned char _section_number; + unsigned char _last_section_number; + std::vector _pid_vec; +}; + +typedef struct { + unsigned char _stream_type; + unsigned short _reserved1:3; + unsigned short _elementary_PID:13; + unsigned short _reserved:4; + unsigned short _ES_info_length; + unsigned char _dscr[4096]; + unsigned int _crc_32; +} STREAM_PID_INFO; + +class pmt_info { +public: + pmt_info(){}; + ~pmt_info(){}; +private: + unsigned char _table_id; + unsigned short _section_syntax_indicator:1; + unsigned short _reserved1:1; + unsigned short _reserved2:2; + unsigned short _section_length:12; + unsigned short _program_number:16; + unsigned char _reserved:2; + unsigned char _version_number:5; + unsigned char _current_next_indicator:5; + unsigned char _section_number; + unsigned char _last_section_number; + unsigned short _reserved3:3; + unsigned short _PCR_PID:13; + unsigned short _reserved4:4; + unsigned short _program_info_length:12; + unsigned char _dscr[4096]; + + std::vector _stream_pid_vec; +}; + +class ts_demux { +public: + ts_demux(); + ~ts_demux(); + + int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback); + +private: + int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info); + bool is_pmt(unsigned short pmt_id); + +private: + std::string _key_path;//only for srt + + pat_info _pat; + pmt_info _pmt; +}; + +#endif \ No newline at end of file