diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b1d0ab573..0f28e4db0 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -498,12 +498,12 @@ srs_error_t SrsRtcPlayStream::cycle() // Send-out all RTP packets and do cleanup if (true) { - err = send_packets(source, pkts, info); - - if (epp.can_print(err)) { - srs_warn("play send packets=%u, err: %s", pkts.size(), srs_error_desc(err).c_str()); + if ((err = send_packets(source, pkts, info)) != srs_success) { + if (epp.can_print(err)) { + srs_warn("play send packets=%u, err: %s", pkts.size(), srs_error_desc(err).c_str()); + } + srs_freep(err); } - srs_freep(err); for (int i = 0; i < msg_count; i++) { SrsRtpPacket2* pkt = pkts[i]; @@ -664,75 +664,27 @@ srs_error_t SrsRtcPlayStream::notify(int type, srs_utime_t interval, srs_utime_t return err; } -srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp) { - srs_error_t err = srs_success; - - // TODO: Use SrsBuffer to parse it. - char* ph = data; - int nb_left = nb_data; - - while (nb_left) { - uint8_t payload_type = ph[1]; - uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - - int length = (length_4bytes + 1) * 4; - - if (length > nb_data) { - return srs_error_new(ERROR_RTC_RTCP, "invalid length=%u/%u, left=%u, bytes=%s", - length, nb_data, nb_left, srs_string_dumps_hex(ph, nb_left, 8).c_str()); - } - - srs_verbose("on rtcp, payload_type=%u", payload_type); - - switch (payload_type) { - case kSR: { - err = on_rtcp_sr(ph, length); - break; - } - case kRR: { - err = on_rtcp_rr(ph, length); - break; - } - case kSDES: { - break; - } - case kBye: { - break; - } - case kApp: { - break; - } - case kRtpFb: { - err = on_rtcp_feedback(ph, length); - break; - } - case kPsFb: { - err = on_rtcp_ps_feedback(ph, length); - break; - } - case kXR: { - err = on_rtcp_xr(ph, length); - break; - } - default:{ - return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); - break; - } - } - - if (err != srs_success) { - return srs_error_wrap(err, "rtcp left=%u, bytes=%s", nb_left, srs_string_dumps_hex(ph, nb_left, 8).c_str()); - } - - ph += length; - nb_left -= length; + if(SrsRtcpType_rr == rtcp->type()) { + SrsRtcpRR* rr = dynamic_cast(rtcp); + return on_rtcp_rr(rr); + } else if(SrsRtcpType_rtpfb == rtcp->type()) { + //currently rtpfb of nack will be handle by player. TWCC will be handled by SrsRtcConnection + SrsRtcpNack* nack = dynamic_cast(rtcp); + return on_rtcp_nack(nack); + } else if(SrsRtcpType_psfb == rtcp->type()) { + SrsRtcpPsfbCommon* psfb = dynamic_cast(rtcp); + return on_rtcp_ps_feedback(psfb); + } else if(SrsRtcpType_xr == rtcp->type()) { + SrsRtcpXr* xr = dynamic_cast(rtcp); + return on_rtcp_xr(xr); + } else { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", rtcp->type()); } - - return err; } -srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_rr(SrsRtcpRR* rtcp) { srs_error_t err = srs_success; @@ -743,7 +695,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_xr(SrsRtcpXr* rtcp) { srs_error_t err = srs_success; @@ -754,73 +706,24 @@ srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp) { srs_error_t err = srs_success; - if (nb_buf < 12) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf); - } - - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); - - // @see: https://tools.ietf.org/html/rfc4585#section-6.1 - /* - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - |V=2|P| FMT | PT | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of packet sender | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of media source | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - : Feedback Control Information (FCI) : - : : - */ - uint8_t first = stream->read_1bytes(); - //uint8_t version = first & 0xC0; - //uint8_t padding = first & 0x20; - uint8_t fmt = first & 0x1F; - if(15 == fmt) { - return session_->on_rtcp_feedback(buf, nb_buf); - } - - /*uint8_t payload_type = */stream->read_1bytes(); - /*uint16_t length = */stream->read_2bytes(); - /*uint32_t ssrc_of_sender = */stream->read_4bytes(); - uint32_t ssrc_of_media_source = stream->read_4bytes(); - - /* - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | PID | BLP | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - - uint16_t pid = stream->read_2bytes(); - int blp = stream->read_2bytes(); - - // TODO: FIXME: Support ARQ. - vector resend_pkts; - nack_fetch(resend_pkts, ssrc_of_media_source, pid); - // If NACK disabled, print a log. if (!nack_enabled_) { - srs_trace("RTC NACK seq=%u, ignored", pid); + vector sns = rtcp->get_lost_sns(); + srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", rtcp->get_media_ssrc(), srs_join_vector_string(sns, ",").c_str()); return err; } - uint16_t mask = 0x01; - for (int i = 1; i < 16 && blp; ++i, mask <<= 1) { - if (!(blp & mask)) { - continue; - } + // TODO: FIXME: Support ARQ. + vector resend_pkts; - uint32_t loss_seq = pid + i; - nack_fetch(resend_pkts, ssrc_of_media_source, loss_seq); + vector sns = rtcp->get_lost_sns(); + for(int i = 0; i < sns.size(); ++i) { + uint16_t seq = sns.at(i); + nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq); } for (int i = 0; i < (int)resend_pkts.size(); ++i) { @@ -840,32 +743,16 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp) { srs_error_t err = srs_success; - if (nb_buf < 12) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf); - } - - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); - - uint8_t first = stream->read_1bytes(); - //uint8_t version = first & 0xC0; - //uint8_t padding = first & 0x20; - uint8_t fmt = first & 0x1F; - - /*uint8_t payload_type = */stream->read_1bytes(); - /*uint16_t length = */stream->read_2bytes(); - /*uint32_t ssrc_of_sender = */stream->read_4bytes(); - uint32_t ssrc_of_media_source = stream->read_4bytes(); - + uint8_t fmt = rtcp->get_rc(); switch (fmt) { case kPLI: { ISrsRtcPublishStream* publisher = source_->publish_stream(); if (publisher) { - uint32_t ssrc = get_video_publish_ssrc(ssrc_of_media_source); + uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc()); if (ssrc != 0) { publisher->request_keyframe(ssrc); srs_trace("RTC request PLI"); @@ -895,17 +782,6 @@ srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayStream::on_rtcp_rr(char* data, int nb_data) -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Implements it. - - session_->stat_->nn_rr++; - - return err; -} - uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc) { std::map::iterator it; @@ -1279,162 +1155,33 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc() return session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); } -srs_error_t SrsRtcPublishStream::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcPublishStream::on_rtcp(SrsRtcpCommon* rtcp) { - srs_error_t err = srs_success; - - char* ph = data; - int nb_left = nb_data; - while (nb_left) { - uint8_t payload_type = ph[1]; - uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - - int length = (length_4bytes + 1) * 4; - - if (length > nb_data) { - return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length); - } - - srs_verbose("on rtcp, payload_type=%u", payload_type); - - switch (payload_type) { - case kSR: { - err = on_rtcp_sr(ph, length); - break; - } - case kRR: { - err = on_rtcp_rr(ph, length); - break; - } - case kSDES: { - break; - } - case kBye: { - break; - } - case kApp: { - break; - } - case kRtpFb: { - err = on_rtcp_feedback(ph, length); - break; - } - case kPsFb: { - err = on_rtcp_ps_feedback(ph, length); - break; - } - case kXR: { - err = on_rtcp_xr(ph, length); - break; - } - default:{ - return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); - break; - } - } - - if (err != srs_success) { - return srs_error_wrap(err, "rtcp"); - } - - ph += length; - nb_left -= length; + if(SrsRtcpType_sr == rtcp->type()) { + SrsRtcpSR* sr = dynamic_cast(rtcp); + return on_rtcp_sr(sr); + } else if(SrsRtcpType_xr == rtcp->type()) { + SrsRtcpXr* xr = dynamic_cast(rtcp); + return on_rtcp_xr(xr); + } else { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", rtcp->type()); } - - return err; } -srs_error_t SrsRtcPublishStream::on_rtcp_sr(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_sr(SrsRtcpSR* rtcp) { srs_error_t err = srs_success; + SrsNtp srs_ntp = SrsNtp::to_time_ms(rtcp->get_ntp()); - if (nb_buf < 28) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp sender report packet, nb_buf=%d", nb_buf); - } - - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); - - // @see: https://tools.ietf.org/html/rfc3550#section-6.4.1 - /* - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -header |V=2|P| RC | PT=SR=200 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of sender | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -sender | NTP timestamp, most significant word | -info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NTP timestamp, least significant word | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | RTP timestamp | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | sender's packet count | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | sender's octet count | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -report | SSRC_1 (SSRC of first source) | -block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 1 | fraction lost | cumulative number of packets lost | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | extended highest sequence number received | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | interarrival jitter | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | last SR (LSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | delay since last SR (DLSR) | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -report | SSRC_2 (SSRC of second source) | -block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 2 : ... : - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | profile-specific extensions | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - uint8_t first = stream->read_1bytes(); - uint8_t rc = first & 0x1F; - - uint8_t payload_type = stream->read_1bytes(); - srs_assert(payload_type == kSR); - uint16_t length = stream->read_2bytes(); - - if (((length + 1) * 4) != (rc * 24 + 28)) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp sender report packet, length=%u, rc=%u", length, rc); - } - - uint32_t ssrc_of_sender = stream->read_4bytes(); - uint64_t ntp = stream->read_8bytes(); - SrsNtp srs_ntp = SrsNtp::to_time_ms(ntp); - uint32_t rtp_time = stream->read_4bytes(); - uint32_t sender_packet_count = stream->read_4bytes(); - uint32_t sender_octec_count = stream->read_4bytes(); - - (void)sender_packet_count; (void)sender_octec_count; (void)rtp_time; srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u", - ssrc_of_sender, rtp_time, sender_packet_count, sender_octec_count); + rtcp->get_ssrc(), rtcp->get_rtp_ts(), rtcp->get_rtp_send_packets(), rtcp->get_rtp_send_bytes()); - for (int i = 0; i < rc; ++i) { - uint32_t ssrc = stream->read_4bytes(); - uint8_t fraction_lost = stream->read_1bytes(); - uint32_t cumulative_number_of_packets_lost = stream->read_3bytes(); - uint32_t highest_seq = stream->read_4bytes(); - uint32_t jitter = stream->read_4bytes(); - uint32_t lst = stream->read_4bytes(); - uint32_t dlsr = stream->read_4bytes(); - - (void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr; - srs_verbose("sender report, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u", - ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); - } - - update_send_report_time(ssrc_of_sender, srs_ntp); + update_send_report_time(rtcp->get_ssrc(), srs_ntp); return err; } -srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_xr(SrsRtcpXr* rtcp) { srs_error_t err = srs_success; @@ -1452,15 +1199,15 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ - SrsBuffer stream(buf, nb_buf); + SrsBuffer stream(rtcp->data(), rtcp->size()); /*uint8_t first = */stream.read_1bytes(); uint8_t pt = stream.read_1bytes(); srs_assert(pt == kXR); uint16_t length = (stream.read_2bytes() + 1) * 4; /*uint32_t ssrc = */stream.read_4bytes(); - if (length != nb_buf) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf); + if (length != rtcp->size()) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, rtcp->size()); } while (stream.pos() + 4 < length) { @@ -1468,8 +1215,8 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) stream.skip(1); uint16_t block_length = (stream.read_2bytes() + 1) * 4; - if (stream.pos() + block_length - 4 > nb_buf) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, nb_buf); + if (stream.pos() + block_length - 4 > rtcp->size()) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, rtcp->size()); } if (bt == 5) { @@ -1494,128 +1241,6 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPublishStream::on_rtcp_feedback(char* buf, int nb_buf) -{ - srs_error_t err = srs_success; - // TODO: FIXME: Implements it. - return err; -} - -srs_error_t SrsRtcPublishStream::on_rtcp_ps_feedback(char* buf, int nb_buf) -{ - srs_error_t err = srs_success; - - if (nb_buf < 12) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf); - } - - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); - - uint8_t first = stream->read_1bytes(); - //uint8_t version = first & 0xC0; - //uint8_t padding = first & 0x20; - uint8_t fmt = first & 0x1F; - - /*uint8_t payload_type = */stream->read_1bytes(); - /*uint16_t length = */stream->read_2bytes(); - /*uint32_t ssrc_of_sender = */stream->read_4bytes(); - /*uint32_t ssrc_of_media_source = */stream->read_4bytes(); - - switch (fmt) { - case kPLI: { - srs_verbose("pli"); - break; - } - case kSLI: { - srs_verbose("sli"); - break; - } - case kRPSI: { - srs_verbose("rpsi"); - break; - } - case kAFB: { - srs_verbose("afb"); - break; - } - default: { - return srs_error_new(ERROR_RTC_RTCP, "unknown payload specific feedback=%u", fmt); - } - } - - return err; -} - -srs_error_t SrsRtcPublishStream::on_rtcp_rr(char* buf, int nb_buf) -{ - srs_error_t err = srs_success; - - if (nb_buf < 8) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp receiver report packet, nb_buf=%d", nb_buf); - } - - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); - - // @see: https://tools.ietf.org/html/rfc3550#section-6.4.2 - /* - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -header |V=2|P| RC | PT=RR=201 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of packet sender | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -report | SSRC_1 (SSRC of first source) | -block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 1 | fraction lost | cumulative number of packets lost | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | extended highest sequence number received | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | interarrival jitter | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | last SR (LSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | delay since last SR (DLSR) | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -report | SSRC_2 (SSRC of second source) | -block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 2 : ... : - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | profile-specific extensions | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - uint8_t first = stream->read_1bytes(); - //uint8_t version = first & 0xC0; - //uint8_t padding = first & 0x20; - uint8_t rc = first & 0x1F; - - /*uint8_t payload_type = */stream->read_1bytes(); - uint16_t length = stream->read_2bytes(); - /*uint32_t ssrc_of_sender = */stream->read_4bytes(); - - if (((length + 1) * 4) != (rc * 24 + 8)) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp receiver packet, length=%u, rc=%u", length, rc); - } - - for (int i = 0; i < rc; ++i) { - uint32_t ssrc = stream->read_4bytes(); - uint8_t fraction_lost = stream->read_1bytes(); - uint32_t cumulative_number_of_packets_lost = stream->read_3bytes(); - uint32_t highest_seq = stream->read_4bytes(); - uint32_t jitter = stream->read_4bytes(); - uint32_t lst = stream->read_4bytes(); - uint32_t dlsr = stream->read_4bytes(); - - (void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr; - srs_verbose("ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u", - ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); - } - - return err; -} - // TODO: FIXME: Use async request PLI to prevent dup requests. void SrsRtcPublishStream::request_keyframe(uint32_t ssrc) { @@ -2071,17 +1696,59 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); } - if (player_) { - err = player_->on_rtcp(unprotected_buf, nb_unprotected_buf); + SrsBuffer* buffer = new SrsBuffer(unprotected_buf, nb_unprotected_buf); + SrsAutoFree(SrsBuffer, buffer); + + SrsRtcpCompound rtcp_compound; + if(srs_success != (err = rtcp_compound.decode(buffer))) { + return srs_error_wrap(err, "decode rtcp plaintext=%u, bytes=%s, at=%s", nb_unprotected_buf, + srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str(), + srs_string_dumps_hex(buffer->head(), buffer->left(), 8).c_str()); } - if (publisher_) { - err = publisher_->on_rtcp(unprotected_buf, nb_unprotected_buf); + SrsRtcpCommon* rtcp = NULL; + while(NULL != (rtcp = rtcp_compound.get_next_rtcp())) { + err = dispatch_rtcp(rtcp); + srs_freep(rtcp); + + if(srs_success != err) { + return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=%s, rtcp=(%u,%u,%u,%u)", nb_data, nb_unprotected_buf, + srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str(), + rtcp->get_rc(), rtcp->type(), rtcp->get_ssrc(), rtcp->size()); + } } - if (err != srs_success) { - return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=%s", nb_data, nb_unprotected_buf, - srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str()); + return err; +} + +srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp) +{ + srs_error_t err = srs_success; + + if(SrsRtcpType_sr == rtcp->type()) { + return publisher_->on_rtcp(rtcp); + } else if(SrsRtcpType_rr == rtcp->type()) { + SrsRtcpRR* rr = dynamic_cast(rtcp); + if (rr->get_rb_ssrc()) { + return player_->on_rtcp(rtcp); + } + } else if(SrsRtcpType_rtpfb == rtcp->type()) { + if(1 == rtcp->get_rc()) { + //nack + return player_->on_rtcp(rtcp); + } else if(15 == rtcp->get_rc()) { + // twcc + return on_rtcp_feedback(rtcp->data(), rtcp->size()); + } + } else if(SrsRtcpType_psfb == rtcp->type()) { + return player_->on_rtcp(rtcp); + } else { + if (player_) { + return player_->on_rtcp(rtcp); + } + if (publisher_) { + return publisher_->on_rtcp(rtcp); + } } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 5b1637c08..fdfdf50bc 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -250,21 +250,20 @@ public: virtual srs_error_t cycle(); private: srs_error_t send_packets(SrsRtcStream* source, const std::vector& pkts, SrsRtcPlayStreamStatistic& info); -public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); +public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); public: - srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: - srs_error_t on_rtcp_sr(char* buf, int nb_buf); - srs_error_t on_rtcp_xr(char* buf, int nb_buf); - srs_error_t on_rtcp_feedback(char* data, int nb_data); - srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); - srs_error_t on_rtcp_rr(char* data, int nb_data); + srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); + srs_error_t on_rtcp_nack(SrsRtcpNack* rtcp); + srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp); + srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); uint32_t get_video_publish_ssrc(uint32_t play_ssrc); }; @@ -316,13 +315,10 @@ public: private: srs_error_t send_periodic_twcc(); public: - srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: - srs_error_t on_rtcp_sr(char* buf, int nb_buf); - srs_error_t on_rtcp_xr(char* buf, int nb_buf); - srs_error_t on_rtcp_feedback(char* data, int nb_data); - srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); - srs_error_t on_rtcp_rr(char* data, int nb_data); + srs_error_t on_rtcp_sr(SrsRtcpSR* rtcp); + srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); public: void request_keyframe(uint32_t ssrc); // interface ISrsHourGlass @@ -443,6 +439,9 @@ public: srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); srs_error_t on_rtcp(char* data, int nb_data); +private: + srs_error_t dispatch_rtcp(SrsRtcpCommon* rtcp); +public: srs_error_t on_rtcp_feedback(char* buf, int nb_buf); void set_hijacker(ISrsRtcConnectionHijacker* h); public: diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp index b2f7c03ba..1fa16714a 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp @@ -427,6 +427,7 @@ public: SrsRtcpCompound(); virtual ~SrsRtcpCompound(); + // TODO: FIXME: Should rename it to pop(), because it's not a GET method. SrsRtcpCommon* get_next_rtcp(); srs_error_t add_rtcp(SrsRtcpCommon *rtcp); void clear(); diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 69b5bc3bd..e5f1b28ba 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -379,20 +379,6 @@ srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int s return err; } -string srs_join_vector_string(vector& vs, string separator) -{ - string str = ""; - - for (int i = 0; i < (int)vs.size(); i++) { - str += vs.at(i); - if (i != (int)vs.size() - 1) { - str += separator; - } - } - - return str; -} - bool srs_is_ipv4(string domain) { for (int i = 0; i < (int)domain.length(); i++) { diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index aaf907ec3..84e07bcc7 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include @@ -108,7 +109,20 @@ extern std::string srs_generate_rtmp_url(std::string server, int port, std::stri extern srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL); // join string in vector with indicated separator -extern std::string srs_join_vector_string(std::vector& vs, std::string separator); +template +std::string srs_join_vector_string(std::vector& vs, std::string separator) +{ + std::stringstream ss; + + for (int i = 0; i < (int)vs.size(); i++) { + ss << vs.at(i); + if (i != (int)vs.size() - 1) { + ss << separator; + } + } + + return ss.str(); +} // Whether domain is an IPv4 address. extern bool srs_is_ipv4(std::string domain);