RTMP: Support RTMPS server. v7.0.56 (#4443)

This PR is extracted by AI from #3949 to support RTMPS server in SRS.

Run SRS with RTMPS support:

```bash
./objs/srs -c conf/rtmps.conf
```

Publish RTMPS stream by FFmpeg:

```bash
ffmpeg -re -i doc/source.flv -c copy -f flv rtmps://localhost:1443/live/livetream
```

Play RTMPS stream by ffplay:

```bash
ffplay rtmps://localhost:1443/live/livetream
```

Below work is done by AI:

* [AI: Extract RTMP transport for
RTMPS.](7948111464)
* [AI: Extract RTMPS
transport.](a669cbba89)

---------

Co-authored-by: john <hondaxiao@tencent.com>
Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-08-18 13:43:27 -06:00 committed by winlin
parent 8cd4616147
commit ebcaef43c6
13 changed files with 390 additions and 78 deletions

View File

@ -146,7 +146,7 @@ inotify_auto_reload off;
auto_reload_for_docker on;
#############################################################################################
# RTMP sections
# RTMP/RTMPS sections
#############################################################################################
# the rtmp listen ports, split by space, each listen entry is <[ip:]port>
# for example, 192.168.1.100:1935 10.10.10.100:1935
@ -161,6 +161,27 @@ listen 1935;
# default: 60000
chunk_size 60000;
# the rtmps server section
rtmps {
# Whether rtmps is enabled.
# Overwrite by env SRS_RTMPS_ENABLED
# default: off
enabled on;
# The rtmps listen port
# Overwrite by env SRS_RTMPS_LISTEN
listen 1443;
# The SSL private key file, generated by:
# openssl genrsa -out server.key 2048
# Overwrite by env SRS_RTMPS_KEY
# default: ./conf/server.key
key ./conf/server.key;
# The SSL public cert file, generated by:
# openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io"
# Overwrite by env SRS_RTMPS_CERT
# default: ./conf/server.crt
cert ./conf/server.crt;
}
#############################################################################################
# HTTP sections
#############################################################################################
@ -235,7 +256,7 @@ http_api {
# default: ./conf/server.key
key ./conf/server.key;
# The SSL public cert file, generated by:
# openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CN/ST=Beijing/L=Beijing/O=Me/OU=Me/CN=ossrs.net"
# openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io"
# Overwrite by env SRS_HTTP_API_HTTPS_CERT
# default: ./conf/server.crt
cert ./conf/server.crt;
@ -289,7 +310,7 @@ http_server {
# default: ./conf/server.key
key ./conf/server.key;
# The SSL public cert file, generated by:
# openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CN/ST=Beijing/L=Beijing/O=Me/OU=Me/CN=ossrs.net"
# openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io"
# Overwrite by env SRS_HTTP_SERVER_HTTPS_CERT
# default: ./conf/server.crt
cert ./conf/server.crt;

12
trunk/conf/rtmps.conf Normal file
View File

@ -0,0 +1,12 @@
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
rtmps {
enabled on;
listen 1443;
key ./conf/server.key;
cert ./conf/server.crt;
}
vhost __defaultVhost__ {
}

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-08-18, Merge [#4443](https://github.com/ossrs/srs/pull/4443): Support RTMPS server. v7.0.56 (#4443)
* v7.0, 2025-08-16, Merge [#4441](https://github.com/ossrs/srs/pull/4441): fix err memory leak in rtc to rtmp bridge. v7.0.55 (#4441)
* v7.0, 2025-08-14, Merge [#4161](https://github.com/ossrs/srs/pull/4161): fix hls & dash segments cleanup. v7.0.54 (#4161)
* v7.0, 2025-08-13, Merge [#4225](https://github.com/ossrs/srs/pull/4225): issue #4223: remove hls_acodec and hls_vcodec config. v7.0.53 (#4225)

View File

@ -2366,7 +2366,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; i < (int)root->directives.size(); i++) {
SrsConfDirective *conf = root->at(i);
std::string n = conf->name;
if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir" && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_level_v2" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "tencentcloud_apm" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads" && n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls" && n != "exporter" && n != "rtsp_server") {
if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir" && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_level_v2" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "tencentcloud_apm" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads" && n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls" && n != "exporter" && n != "rtsp_server" && n != "rtmps") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
}
}
@ -2452,6 +2452,15 @@ srs_error_t SrsConfig::check_normal_config()
}
}
}
if (true) {
SrsConfDirective *conf = root->get("rtmps");
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "key" && n != "cert") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtmps.%s", n.c_str());
}
}
}
if (true) {
SrsConfDirective *conf = root->get("exporter");
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
@ -8999,3 +9008,95 @@ SrsConfDirective *SrsConfig::get_stats_disk_device()
return conf;
}
SrsConfDirective *SrsConfig::get_rtmps()
{
SrsConfDirective *conf = root->get("rtmps");
if (!conf) {
return NULL;
}
return conf;
}
bool SrsConfig::get_rtmps_enabled()
{
SRS_OVERWRITE_BY_ENV_BOOL("srs.rtmps.enabled"); // SRS_RTMPS_ENABLED
static bool DEFAULT = false;
SrsConfDirective *conf = get_rtmps();
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf) {
return DEFAULT;
}
return SRS_CONF_PREFER_FALSE(conf->arg0());
}
vector<string> SrsConfig::get_rtmps_listen()
{
if (!srs_getenv("srs.rtmps.listen").empty()) { // SRS_RTMPS_LISTEN
return srs_string_split(srs_getenv("srs.rtmps.listen"), " ");
}
std::vector<string> ports;
SrsConfDirective *conf = get_rtmps();
if (!conf) {
return ports;
}
conf = conf->get("listen");
if (!conf) {
return ports;
}
for (int i = 0; i < (int)conf->args.size(); i++) {
ports.push_back(conf->args.at(i));
}
return ports;
}
string SrsConfig::get_rtmps_ssl_key()
{
SRS_OVERWRITE_BY_ENV_STRING("srs.rtmps.key"); // SRS_RTMPS_KEY
static string DEFAULT = "./conf/server.key";
SrsConfDirective *conf = get_rtmps();
if (!conf) {
return DEFAULT;
}
conf = conf->get("key");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_rtmps_ssl_cert()
{
SRS_OVERWRITE_BY_ENV_STRING("srs.rtmps.cert"); // SRS_RTMPS_CERT
static string DEFAULT = "./conf/server.crt";
SrsConfDirective *conf = get_rtmps();
if (!conf) {
return DEFAULT;
}
conf = conf->get("cert");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}

View File

@ -1129,6 +1129,15 @@ public:
virtual std::string get_https_stream_listen();
virtual std::string get_https_stream_ssl_key();
virtual std::string get_https_stream_ssl_cert();
// rtmps section
private:
SrsConfDirective *get_rtmps();
public:
virtual bool get_rtmps_enabled();
virtual std::vector<std::string> get_rtmps_listen();
virtual std::string get_rtmps_ssl_key();
virtual std::string get_rtmps_ssl_cert();
public:
// Get whether vhost enabled http stream

View File

@ -728,16 +728,16 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file)
// TODO: Setup callback, see SSL_set_ex_data and SSL_set_info_callback
if ((ssl = SSL_new(ssl_ctx)) == NULL) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "SSL_new ssl");
return srs_error_new(ERROR_TLS_HANDSHAKE, "SSL_new ssl");
}
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new in");
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new in");
}
if ((bio_out = BIO_new(BIO_s_mem())) == NULL) {
BIO_free(bio_in);
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new out");
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new out");
}
SSL_set_bio(ssl, bio_in, bio_out);
@ -751,15 +751,15 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file)
// Setup the key and cert file for server.
if ((r0 = SSL_use_certificate_chain_file(ssl, crt_file.c_str())) != 1) {
return srs_error_new(ERROR_HTTPS_KEY_CRT, "use cert %s", crt_file.c_str());
return srs_error_new(ERROR_TLS_KEY_CRT, "use cert %s", crt_file.c_str());
}
if ((r0 = SSL_use_RSAPrivateKey_file(ssl, key_file.c_str(), SSL_FILETYPE_PEM)) != 1) {
return srs_error_new(ERROR_HTTPS_KEY_CRT, "use key %s", key_file.c_str());
return srs_error_new(ERROR_TLS_KEY_CRT, "use key %s", key_file.c_str());
}
if ((r0 = SSL_check_private_key(ssl)) != 1) {
return srs_error_new(ERROR_HTTPS_KEY_CRT, "check key %s with cert %s",
return srs_error_new(ERROR_TLS_KEY_CRT, "check key %s with cert %s",
key_file.c_str(), crt_file.c_str());
}
srs_info("ssl: use key %s and cert %s", key_file.c_str(), crt_file.c_str());
@ -774,40 +774,40 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file)
if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
}
r0 = SSL_do_handshake(ssl);
r1 = SSL_get_error(ssl, r0);
ERR_clear_error();
if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
}
if ((size = BIO_get_mem_data(bio_out, &data)) > 0) {
// OK, reset it for the next write.
if ((r0 = BIO_reset(bio_in)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
break;
}
}
srs_info("https: ClientHello done");
srs_info("tls: ClientHello done");
// Send ServerHello, Certificate, Server Key Exchange, Server Hello Done
size = BIO_get_mem_data(bio_out, &data);
if (!data || size <= 0) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
}
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
srs_info("https: ServerHello done");
srs_info("tls: ServerHello done");
// Receive Client Key Exchange, Change Cipher Spec, Encrypted Handshake Message
while (true) {
@ -819,7 +819,7 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file)
if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
}
r0 = SSL_do_handshake(ssl);
@ -830,33 +830,33 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file)
}
if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
}
if ((size = BIO_get_mem_data(bio_out, &data)) > 0) {
// OK, reset it for the next write.
if ((r0 = BIO_reset(bio_in)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
break;
}
}
srs_info("https: Client done");
srs_info("tls: Client done");
// Send New Session Ticket, Change Cipher Spec, Encrypted Handshake Message
size = BIO_get_mem_data(bio_out, &data);
if (!data || size <= 0) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
}
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
srs_info("https: Server done");
srs_info("tls: Server done");
return err;
}
@ -874,7 +874,22 @@ srs_utime_t SrsSslConnection::get_recv_timeout()
srs_error_t SrsSslConnection::read_fully(void *buf, size_t size, ssize_t *nread)
{
return transport->read_fully(buf, size, nread);
srs_error_t err = srs_success;
ssize_t nb = 0;
void *p = buf;
while (nb < size) {
ssize_t once_nb = 0;
if ((err = read((char *)p + nb, size - nb, &once_nb)) != srs_success) {
return srs_error_wrap(err, "tls: read");
}
nb += once_nb;
}
if (nread) {
*nread = nb;
}
return err;
}
int64_t SrsSslConnection::get_recv_bytes()
@ -916,20 +931,20 @@ srs_error_t SrsSslConnection::read(void *plaintext, size_t nn_plaintext, ssize_t
// Read the cipher from SSL.
ssize_t nn = 0;
if ((err = transport->read(cipher.get(), nn_cipher, &nn)) != srs_success) {
return srs_error_wrap(err, "https: read");
return srs_error_wrap(err, "tls: read");
}
int r0 = BIO_write(bio_in, cipher.get(), nn);
if (r0 <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn);
return srs_error_new(ERROR_TLS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn);
}
continue;
}
// Fail for error.
if (r0 <= 0) {
return srs_error_new(ERROR_HTTPS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d",
return srs_error_new(ERROR_TLS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d",
r0, r1, r2, r3);
}
}
@ -955,7 +970,7 @@ srs_error_t SrsSslConnection::write(void *plaintext, size_t nn_plaintext, ssize_
int r1 = SSL_get_error(ssl, r0);
ERR_clear_error();
if (r0 <= 0) {
return srs_error_new(ERROR_HTTPS_WRITE, "https: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1);
return srs_error_new(ERROR_TLS_WRITE, "tls: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1);
}
// Move p to the next writing position.
@ -967,10 +982,10 @@ srs_error_t SrsSslConnection::write(void *plaintext, size_t nn_plaintext, ssize_
uint8_t *data = NULL;
int size = BIO_get_mem_data(bio_out, &data);
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "https: write data=%p, size=%d", data, size);
return srs_error_wrap(err, "tls: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_WRITE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_WRITE, "BIO_reset r0=%d", r0);
}
}

View File

@ -92,15 +92,97 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer *svr, srs_netfd_t c, string cip, int cport)
SrsRtmpTransport::SrsRtmpTransport(srs_netfd_t c)
{
stfd_ = c;
skt_ = new SrsTcpConnection(c);
}
SrsRtmpTransport::~SrsRtmpTransport()
{
srs_freep(skt_);
}
srs_netfd_t SrsRtmpTransport::fd()
{
return stfd_;
}
ISrsProtocolReadWriter *SrsRtmpTransport::io()
{
return skt_;
}
srs_error_t SrsRtmpTransport::handshake()
{
return srs_success;
}
const char *SrsRtmpTransport::transport_type()
{
return "plaintext";
}
srs_error_t SrsRtmpTransport::set_socket_buffer(srs_utime_t buffer_v)
{
return skt_->set_socket_buffer(buffer_v);
}
srs_error_t SrsRtmpTransport::set_tcp_nodelay(bool v)
{
return skt_->set_tcp_nodelay(v);
}
int64_t SrsRtmpTransport::get_recv_bytes()
{
return skt_->get_recv_bytes();
}
int64_t SrsRtmpTransport::get_send_bytes()
{
return skt_->get_send_bytes();
}
SrsRtmpsTransport::SrsRtmpsTransport(srs_netfd_t c) : SrsRtmpTransport(c)
{
ssl_ = new SrsSslConnection(skt_);
}
SrsRtmpsTransport::~SrsRtmpsTransport()
{
srs_freep(ssl_);
}
ISrsProtocolReadWriter *SrsRtmpsTransport::io()
{
return ssl_;
}
srs_error_t SrsRtmpsTransport::handshake()
{
string crt_file = _srs_config->get_rtmps_ssl_cert();
string key_file = _srs_config->get_rtmps_ssl_key();
srs_error_t err = ssl_->handshake(key_file, crt_file);
if (err != srs_success) {
return srs_error_wrap(err, "ssl handshake");
}
return srs_success;
}
const char *SrsRtmpsTransport::transport_type()
{
return "ssl";
}
SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip, int cport)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
server = svr;
stfd = c;
skt = new SrsTcpConnection(c);
transport_ = transport;
manager = svr;
ip = cip;
port = cport;
@ -113,11 +195,11 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, srs_netfd_t c, string cip, int cport)
trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id());
kbps = new SrsNetworkKbps();
kbps->set_io(skt, skt);
kbps->set_io(transport_->io(), transport_->io());
delta_ = new SrsNetworkDelta();
delta_->set_io(skt, skt);
delta_->set_io(transport_->io(), transport_->io());
rtmp = new SrsRtmpServer(skt);
rtmp = new SrsRtmpServer(transport_->io());
refer = new SrsRefer();
security = new SrsSecurity();
duration = 0;
@ -149,7 +231,7 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(kbps);
srs_freep(delta_);
srs_freep(skt);
srs_freep(transport_);
srs_freep(info);
srs_freep(rtmp);
@ -186,12 +268,16 @@ srs_error_t SrsRtmpConn::do_cycle()
#endif
#ifdef SRS_APM
srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd),
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d, trace=%s, span=%s", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd()),
span_main_->format_trace_id(), span_main_->format_span_id());
#else
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd()));
#endif
if ((err = transport_->handshake()) != srs_success) {
return srs_error_wrap(err, "transport handshake");
}
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
@ -321,7 +407,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
skt->set_socket_buffer(mw_sleep);
transport_->set_socket_buffer(mw_sleep);
return err;
}
@ -359,7 +445,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
skt->set_socket_buffer(mw_sleep);
transport_->set_socket_buffer(mw_sleep);
return err;
}
@ -415,7 +501,7 @@ srs_error_t SrsRtmpConn::service_cycle()
}
// get the ip which client connected.
std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
std::string local_ip = srs_get_local_ip(srs_netfd_fileno(transport_->fd()));
// set chunk size to larger.
// set the chunk size before any larger response greater than 128,
@ -807,7 +893,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
// when mw_sleep changed, resize the socket send buffer.
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
skt->set_socket_buffer(mw_sleep);
transport_->set_socket_buffer(mw_sleep);
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
@ -947,7 +1033,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
if ((err = acquire_err) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(transport_->fd()), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);
rtrd.stop();
}
@ -1326,7 +1412,7 @@ void SrsRtmpConn::set_sock_options()
if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue;
srs_error_t err = skt->set_tcp_nodelay(tcp_nodelay);
srs_error_t err = transport_->set_tcp_nodelay(tcp_nodelay);
if (err != srs_success) {
srs_warn("ignore err %s", srs_error_desc(err).c_str());
srs_freep(err);
@ -1463,7 +1549,7 @@ void SrsRtmpConn::http_hooks_on_close()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_close(url, req, skt->get_send_bytes(), skt->get_recv_bytes());
SrsHttpHooks::on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes());
}
}

View File

@ -40,6 +40,7 @@ class SrsCommonMessage;
class SrsPacket;
class SrsNetworkDelta;
class ISrsApmSpan;
class SrsSslConnection;
// The simple rtmp client for SRS.
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
@ -70,7 +71,54 @@ public:
virtual ~SrsClientInfo();
};
// The client provides the main logic control for RTMP clients.
// The base transport layer for RTMP connections over plain TCP.
class SrsRtmpTransport
{
protected:
srs_netfd_t stfd_;
SrsTcpConnection *skt_;
public:
SrsRtmpTransport(srs_netfd_t c);
virtual ~SrsRtmpTransport();
public:
// Get the file descriptor for logging and identification
virtual srs_netfd_t fd();
// Get the appropriate I/O interface (TCP)
virtual ISrsProtocolReadWriter *io();
// Perform handshake (no-op for plain RTMP)
virtual srs_error_t handshake();
// Get transport type description for logging
virtual const char *transport_type();
// Set socket buffer size
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Set TCP nodelay option
virtual srs_error_t set_tcp_nodelay(bool v);
// Get network statistics
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
};
// The SSL/TLS transport layer for RTMPS connections.
class SrsRtmpsTransport : public SrsRtmpTransport
{
private:
SrsSslConnection *ssl_;
public:
SrsRtmpsTransport(srs_netfd_t c);
virtual ~SrsRtmpsTransport();
public:
// Get the appropriate I/O interface (SSL)
virtual ISrsProtocolReadWriter *io();
// Perform SSL handshake
virtual srs_error_t handshake();
// Get transport type description for logging
virtual const char *transport_type();
};
class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsReloadHandler, public ISrsCoroutineHandler, public ISrsExpire
{
// For the thread to directly access any field of connection.
@ -106,8 +154,7 @@ private:
SrsClientInfo *info;
private:
srs_netfd_t stfd;
SrsTcpConnection *skt;
SrsRtmpTransport *transport_;
// Each connection start a green thread,
// when thread stop, the connection will be delete by server.
SrsCoroutine *trd;
@ -128,7 +175,7 @@ private:
ISrsApmSpan *span_client_;
public:
SrsRtmpConn(SrsServer *svr, srs_netfd_t c, std::string cip, int port);
SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, std::string cip, int port);
virtual ~SrsRtmpConn();
// Interface ISrsResource.
public:

View File

@ -340,6 +340,7 @@ SrsServer::SrsServer()
ppid = ::getppid();
rtmp_listener_ = new SrsMultipleTcpListeners(this);
rtmps_listener_ = new SrsMultipleTcpListeners(this);
api_listener_ = new SrsTcpListener(this);
apis_listener_ = new SrsTcpListener(this);
http_listener_ = new SrsTcpListener(this);
@ -400,6 +401,7 @@ void SrsServer::destroy()
srs_freep(latest_version_);
srs_freep(conn_manager);
srs_freep(rtmp_listener_);
srs_freep(rtmps_listener_);
srs_freep(api_listener_);
srs_freep(apis_listener_);
srs_freep(http_listener_);
@ -422,6 +424,7 @@ void SrsServer::dispose()
// Destroy all listeners.
rtmp_listener_->close();
rtmps_listener_->close();
api_listener_->close();
apis_listener_->close();
http_listener_->close();
@ -456,6 +459,7 @@ void SrsServer::gracefully_dispose()
// Destroy all listeners.
rtmp_listener_->close();
rtmps_listener_->close();
api_listener_->close();
apis_listener_->close();
http_listener_->close();
@ -596,6 +600,14 @@ srs_error_t SrsServer::listen()
return srs_error_wrap(err, "rtmp listen");
}
// Create RTMPS listeners.
if (_srs_config->get_rtmps_enabled()) {
rtmps_listener_->add(_srs_config->get_rtmps_listen())->set_label("RTMPS");
if ((err = rtmps_listener_->listen()) != srs_success) {
return srs_error_wrap(err, "rtmps listen");
}
}
// Create HTTP API listener.
if (_srs_config->get_http_api_enabled()) {
if (reuse_api_over_server_) {
@ -1292,7 +1304,11 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
// Create resource by normal listeners.
if (!resource) {
if (listener == rtmp_listener_) {
resource = new SrsRtmpConn(this, stfd2, ip, port);
SrsRtmpTransport *transport = new SrsRtmpTransport(stfd2);
resource = new SrsRtmpConn(this, transport, ip, port);
} else if (listener == rtmps_listener_) {
SrsRtmpTransport *transport = new SrsRtmpsTransport(stfd2);
resource = new SrsRtmpConn(this, transport, ip, port);
} else if (listener == api_listener_ || listener == apis_listener_) {
string key = listener == apis_listener_ ? _srs_config->get_https_api_ssl_key() : "";
string cert = listener == apis_listener_ ? _srs_config->get_https_api_ssl_cert() : "";

View File

@ -41,6 +41,8 @@ class SrsMultipleTcpListeners;
class SrsHttpFlvListener;
class SrsUdpCasterListener;
class SrsGbListener;
class SrsRtmpTransport;
class SrsRtmpsTransport;
// Convert signal to io,
// @see: st-1.9/docs/notes.html
@ -126,6 +128,8 @@ private:
bool reuse_rtc_over_server_;
// RTMP stream listeners, over TCP.
SrsMultipleTcpListeners *rtmp_listener_;
// RTMPS stream listeners, over TCP.
SrsMultipleTcpListeners *rtmps_listener_;
// HTTP API listener, over TCP. Please note that it might reuse with stream listener.
SrsTcpListener *api_listener_;
// HTTPS API listener, over TCP. Please note that it might reuse with stream listener.

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 55
#define VERSION_REVISION 56
#endif

View File

@ -314,10 +314,10 @@
XX(ERROR_BASE64_DECODE, 4039, "Base64Decode", "Failed to decode the BASE64 content") \
XX(ERROR_HTTP_STREAM_EOF, 4040, "HttpStreamEof", "HTTP stream is EOF") \
XX(ERROR_HTTPS_NOT_SUPPORTED, 4041, "HttpsNotSupported", "HTTPS is not supported") \
XX(ERROR_HTTPS_HANDSHAKE, 4042, "HttpsHandshake", "Failed to do handshake for HTTPS") \
XX(ERROR_HTTPS_READ, 4043, "HttpsRead", "Failed to read data from HTTPS stream") \
XX(ERROR_HTTPS_WRITE, 4044, "HttpsWrite", "Failed to write data to HTTPS stream") \
XX(ERROR_HTTPS_KEY_CRT, 4045, "HttpsSslFile", "Failed to load SSL key or crt file for HTTPS") \
XX(ERROR_TLS_HANDSHAKE, 4042, "TlsHandshake", "Failed to do tls handshake") \
XX(ERROR_TLS_READ, 4043, "TlsRead", "TLS read data failed") \
XX(ERROR_TLS_WRITE, 4044, "TlsWrite", "TLS write data failed") \
XX(ERROR_TLS_KEY_CRT, 4045, "TlsSslFile", "Failed to load SSL key or crt file") \
XX(ERROR_GB_SIP_HEADER, 4046, "GbHeaderCallId", "Missing field of SIP header for GB28181") \
XX(ERROR_GB_SIP_MESSAGE, 4047, "GbHeaderCallId", "Invalid SIP message for GB28181") \
XX(ERROR_GB_PS_HEADER, 4048, "GbPsHeader", "Invalid PS header for GB28181") \

View File

@ -71,16 +71,16 @@ srs_error_t SrsSslClient::handshake(const std::string &host)
// TODO: Setup callback, see SSL_set_ex_data and SSL_set_info_callback
if ((ssl = SSL_new(ssl_ctx)) == NULL) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "SSL_new ssl");
return srs_error_new(ERROR_TLS_HANDSHAKE, "SSL_new ssl");
}
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new in");
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new in");
}
if ((bio_out = BIO_new(BIO_s_mem())) == NULL) {
BIO_free(bio_in);
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new out");
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new out");
}
SSL_set_bio(ssl, bio_in, bio_out);
@ -98,22 +98,22 @@ srs_error_t SrsSslClient::handshake(const std::string &host)
int r1 = SSL_get_error(ssl, r0);
ERR_clear_error();
if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
}
uint8_t *data = NULL;
int size = BIO_get_mem_data(bio_out, &data);
if (!data || size <= 0) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size);
}
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
srs_info("https: ClientHello done");
srs_info("tls: ClientHello done");
// Receive ServerHello, Certificate, Server Key Exchange, Server Hello Done
while (true) {
@ -125,36 +125,36 @@ srs_error_t SrsSslClient::handshake(const std::string &host)
if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
}
r0 = SSL_do_handshake(ssl);
r1 = SSL_get_error(ssl, r0);
ERR_clear_error();
if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
}
if ((size = BIO_get_mem_data(bio_out, &data)) > 0) {
// OK, reset it for the next write.
if ((r0 = BIO_reset(bio_in)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
break;
}
}
srs_info("https: ServerHello done");
srs_info("tls: ServerHello done");
// Send Client Key Exchange, Change Cipher Spec, Encrypted Handshake Message
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0);
}
srs_info("https: Client done");
srs_info("tls: Client done");
// Receive New Session Ticket, Change Cipher Spec, Encrypted Handshake Message
while (true) {
@ -166,7 +166,7 @@ srs_error_t SrsSslClient::handshake(const std::string &host)
if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn);
}
r0 = SSL_do_handshake(ssl);
@ -177,11 +177,11 @@ srs_error_t SrsSslClient::handshake(const std::string &host)
}
if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) {
return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1);
}
}
srs_info("https: Server done");
srs_info("tls: Server done");
return err;
}
@ -222,14 +222,14 @@ srs_error_t SrsSslClient::read(void *plaintext, size_t nn_plaintext, ssize_t *nr
int r0 = BIO_write(bio_in, cipher.get(), nn);
if (r0 <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_HTTPS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn);
return srs_error_new(ERROR_TLS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn);
}
continue;
}
// Fail for error.
if (r0 <= 0) {
return srs_error_new(ERROR_HTTPS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d",
return srs_error_new(ERROR_TLS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d",
r0, r1, r2, r3);
}
}
@ -245,7 +245,7 @@ srs_error_t SrsSslClient::write(void *plaintext, size_t nn_plaintext, ssize_t *n
int r1 = SSL_get_error(ssl, r0);
ERR_clear_error();
if (r0 <= 0) {
return srs_error_new(ERROR_HTTPS_WRITE, "https: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1);
return srs_error_new(ERROR_TLS_WRITE, "tls: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1);
}
// Move p to the next writing position.
@ -257,10 +257,10 @@ srs_error_t SrsSslClient::write(void *plaintext, size_t nn_plaintext, ssize_t *n
uint8_t *data = NULL;
int size = BIO_get_mem_data(bio_out, &data);
if ((err = transport->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "https: write data=%p, size=%d", data, size);
return srs_error_wrap(err, "tls: write data=%p, size=%d", data, size);
}
if ((r0 = BIO_reset(bio_out)) != 1) {
return srs_error_new(ERROR_HTTPS_WRITE, "BIO_reset r0=%d", r0);
return srs_error_new(ERROR_TLS_WRITE, "BIO_reset r0=%d", r0);
}
}