From ebcaef43c6162753453e4d74cf1eb73ceda3b26f Mon Sep 17 00:00:00 2001 From: Winlin Date: Mon, 18 Aug 2025 13:43:27 -0600 Subject: [PATCH] RTMP: Support RTMPS server. v7.0.56 (#4443) This PR is extracted by AI from #3949 to support RTMPS server in SRS. Run SRS with RTMPS support: ```bash ./objs/srs -c conf/rtmps.conf ``` Publish RTMPS stream by FFmpeg: ```bash ffmpeg -re -i doc/source.flv -c copy -f flv rtmps://localhost:1443/live/livetream ``` Play RTMPS stream by ffplay: ```bash ffplay rtmps://localhost:1443/live/livetream ``` Below work is done by AI: * [AI: Extract RTMP transport for RTMPS.](https://github.com/ossrs/srs/pull/4443/commits/79481114648b58b3fe9fe00d2e9ac7b365679095) * [AI: Extract RTMPS transport.](https://github.com/ossrs/srs/pull/4443/commits/a669cbba8925950993ed1a0787b610abc8a13bca) --------- Co-authored-by: john Co-authored-by: OSSRS-AI --- trunk/conf/full.conf | 27 +++- trunk/conf/rtmps.conf | 12 ++ trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_config.cpp | 103 ++++++++++++++- trunk/src/app/srs_app_config.hpp | 9 ++ trunk/src/app/srs_app_conn.cpp | 69 ++++++---- trunk/src/app/srs_app_rtmp_conn.cpp | 118 +++++++++++++++--- trunk/src/app/srs_app_rtmp_conn.hpp | 55 +++++++- trunk/src/app/srs_app_server.cpp | 18 ++- trunk/src/app/srs_app_server.hpp | 4 + trunk/src/core/srs_core_version7.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 8 +- .../src/protocol/srs_protocol_http_client.cpp | 42 +++---- 13 files changed, 390 insertions(+), 78 deletions(-) create mode 100644 trunk/conf/rtmps.conf diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 61d0c0606..a0e03b219 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -146,7 +146,7 @@ inotify_auto_reload off; auto_reload_for_docker on; ############################################################################################# -# RTMP sections +# RTMP/RTMPS sections ############################################################################################# # the rtmp listen ports, split by space, each listen entry is <[ip:]port> # for example, 192.168.1.100:1935 10.10.10.100:1935 @@ -161,6 +161,27 @@ listen 1935; # default: 60000 chunk_size 60000; +# the rtmps server section +rtmps { + # Whether rtmps is enabled. + # Overwrite by env SRS_RTMPS_ENABLED + # default: off + enabled on; + # The rtmps listen port + # Overwrite by env SRS_RTMPS_LISTEN + listen 1443; + # The SSL private key file, generated by: + # openssl genrsa -out server.key 2048 + # Overwrite by env SRS_RTMPS_KEY + # default: ./conf/server.key + key ./conf/server.key; + # The SSL public cert file, generated by: + # openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io" + # Overwrite by env SRS_RTMPS_CERT + # default: ./conf/server.crt + cert ./conf/server.crt; +} + ############################################################################################# # HTTP sections ############################################################################################# @@ -235,7 +256,7 @@ http_api { # default: ./conf/server.key key ./conf/server.key; # The SSL public cert file, generated by: - # openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CN/ST=Beijing/L=Beijing/O=Me/OU=Me/CN=ossrs.net" + # openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io" # Overwrite by env SRS_HTTP_API_HTTPS_CERT # default: ./conf/server.crt cert ./conf/server.crt; @@ -289,7 +310,7 @@ http_server { # default: ./conf/server.key key ./conf/server.key; # The SSL public cert file, generated by: - # openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CN/ST=Beijing/L=Beijing/O=Me/OU=Me/CN=ossrs.net" + # openssl req -new -x509 -key server.key -out server.crt -days 3650 -subj "/C=CA/ST=Toronto/L=Toronto/O=Me/OU=Me/CN=ossrs.io" # Overwrite by env SRS_HTTP_SERVER_HTTPS_CERT # default: ./conf/server.crt cert ./conf/server.crt; diff --git a/trunk/conf/rtmps.conf b/trunk/conf/rtmps.conf new file mode 100644 index 000000000..3191b72cb --- /dev/null +++ b/trunk/conf/rtmps.conf @@ -0,0 +1,12 @@ +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; +rtmps { + enabled on; + listen 1443; + key ./conf/server.key; + cert ./conf/server.crt; +} +vhost __defaultVhost__ { +} diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 4a94677f5..8775868f9 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-08-18, Merge [#4443](https://github.com/ossrs/srs/pull/4443): Support RTMPS server. v7.0.56 (#4443) * v7.0, 2025-08-16, Merge [#4441](https://github.com/ossrs/srs/pull/4441): fix err memory leak in rtc to rtmp bridge. v7.0.55 (#4441) * v7.0, 2025-08-14, Merge [#4161](https://github.com/ossrs/srs/pull/4161): fix hls & dash segments cleanup. v7.0.54 (#4161) * v7.0, 2025-08-13, Merge [#4225](https://github.com/ossrs/srs/pull/4225): issue #4223: remove hls_acodec and hls_vcodec config. v7.0.53 (#4225) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index dc9d8a035..9e7461d0d 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2366,7 +2366,7 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; i < (int)root->directives.size(); i++) { SrsConfDirective *conf = root->at(i); std::string n = conf->name; - if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir" && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_level_v2" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "tencentcloud_apm" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads" && n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls" && n != "exporter" && n != "rtsp_server") { + if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir" && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_level_v2" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "tencentcloud_apm" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads" && n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls" && n != "exporter" && n != "rtsp_server" && n != "rtmps") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); } } @@ -2452,6 +2452,15 @@ srs_error_t SrsConfig::check_normal_config() } } } + if (true) { + SrsConfDirective *conf = root->get("rtmps"); + for (int i = 0; conf && i < (int)conf->directives.size(); i++) { + string n = conf->at(i)->name; + if (n != "enabled" && n != "listen" && n != "key" && n != "cert") { + return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtmps.%s", n.c_str()); + } + } + } if (true) { SrsConfDirective *conf = root->get("exporter"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { @@ -8999,3 +9008,95 @@ SrsConfDirective *SrsConfig::get_stats_disk_device() return conf; } + +SrsConfDirective *SrsConfig::get_rtmps() +{ + SrsConfDirective *conf = root->get("rtmps"); + if (!conf) { + return NULL; + } + + return conf; +} + +bool SrsConfig::get_rtmps_enabled() +{ + SRS_OVERWRITE_BY_ENV_BOOL("srs.rtmps.enabled"); // SRS_RTMPS_ENABLED + + static bool DEFAULT = false; + + SrsConfDirective *conf = get_rtmps(); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PREFER_FALSE(conf->arg0()); +} + +vector SrsConfig::get_rtmps_listen() +{ + if (!srs_getenv("srs.rtmps.listen").empty()) { // SRS_RTMPS_LISTEN + return srs_string_split(srs_getenv("srs.rtmps.listen"), " "); + } + + std::vector ports; + + SrsConfDirective *conf = get_rtmps(); + if (!conf) { + return ports; + } + + conf = conf->get("listen"); + if (!conf) { + return ports; + } + + for (int i = 0; i < (int)conf->args.size(); i++) { + ports.push_back(conf->args.at(i)); + } + + return ports; +} + +string SrsConfig::get_rtmps_ssl_key() +{ + SRS_OVERWRITE_BY_ENV_STRING("srs.rtmps.key"); // SRS_RTMPS_KEY + + static string DEFAULT = "./conf/server.key"; + + SrsConfDirective *conf = get_rtmps(); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("key"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_rtmps_ssl_cert() +{ + SRS_OVERWRITE_BY_ENV_STRING("srs.rtmps.cert"); // SRS_RTMPS_CERT + + static string DEFAULT = "./conf/server.crt"; + + SrsConfDirective *conf = get_rtmps(); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("cert"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 9299f8401..890a31045 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -1129,6 +1129,15 @@ public: virtual std::string get_https_stream_listen(); virtual std::string get_https_stream_ssl_key(); virtual std::string get_https_stream_ssl_cert(); + // rtmps section +private: + SrsConfDirective *get_rtmps(); + +public: + virtual bool get_rtmps_enabled(); + virtual std::vector get_rtmps_listen(); + virtual std::string get_rtmps_ssl_key(); + virtual std::string get_rtmps_ssl_cert(); public: // Get whether vhost enabled http stream diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index f652452b3..a3960dd41 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -728,16 +728,16 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file) // TODO: Setup callback, see SSL_set_ex_data and SSL_set_info_callback if ((ssl = SSL_new(ssl_ctx)) == NULL) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "SSL_new ssl"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "SSL_new ssl"); } if ((bio_in = BIO_new(BIO_s_mem())) == NULL) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new in"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new in"); } if ((bio_out = BIO_new(BIO_s_mem())) == NULL) { BIO_free(bio_in); - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new out"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new out"); } SSL_set_bio(ssl, bio_in, bio_out); @@ -751,15 +751,15 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file) // Setup the key and cert file for server. if ((r0 = SSL_use_certificate_chain_file(ssl, crt_file.c_str())) != 1) { - return srs_error_new(ERROR_HTTPS_KEY_CRT, "use cert %s", crt_file.c_str()); + return srs_error_new(ERROR_TLS_KEY_CRT, "use cert %s", crt_file.c_str()); } if ((r0 = SSL_use_RSAPrivateKey_file(ssl, key_file.c_str(), SSL_FILETYPE_PEM)) != 1) { - return srs_error_new(ERROR_HTTPS_KEY_CRT, "use key %s", key_file.c_str()); + return srs_error_new(ERROR_TLS_KEY_CRT, "use key %s", key_file.c_str()); } if ((r0 = SSL_check_private_key(ssl)) != 1) { - return srs_error_new(ERROR_HTTPS_KEY_CRT, "check key %s with cert %s", + return srs_error_new(ERROR_TLS_KEY_CRT, "check key %s with cert %s", key_file.c_str(), crt_file.c_str()); } srs_info("ssl: use key %s and cert %s", key_file.c_str(), crt_file.c_str()); @@ -774,40 +774,40 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file) if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); } r0 = SSL_do_handshake(ssl); r1 = SSL_get_error(ssl, r0); ERR_clear_error(); if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); } if ((size = BIO_get_mem_data(bio_out, &data)) > 0) { // OK, reset it for the next write. if ((r0 = BIO_reset(bio_in)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } break; } } - srs_info("https: ClientHello done"); + srs_info("tls: ClientHello done"); // Send ServerHello, Certificate, Server Key Exchange, Server Hello Done size = BIO_get_mem_data(bio_out, &data); if (!data || size <= 0) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size); } if ((err = transport->write(data, size, NULL)) != srs_success) { return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } - srs_info("https: ServerHello done"); + srs_info("tls: ServerHello done"); // Receive Client Key Exchange, Change Cipher Spec, Encrypted Handshake Message while (true) { @@ -819,7 +819,7 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file) if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); } r0 = SSL_do_handshake(ssl); @@ -830,33 +830,33 @@ srs_error_t SrsSslConnection::handshake(string key_file, string crt_file) } if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); } if ((size = BIO_get_mem_data(bio_out, &data)) > 0) { // OK, reset it for the next write. if ((r0 = BIO_reset(bio_in)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } break; } } - srs_info("https: Client done"); + srs_info("tls: Client done"); // Send New Session Ticket, Change Cipher Spec, Encrypted Handshake Message size = BIO_get_mem_data(bio_out, &data); if (!data || size <= 0) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size); } if ((err = transport->write(data, size, NULL)) != srs_success) { return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } - srs_info("https: Server done"); + srs_info("tls: Server done"); return err; } @@ -874,7 +874,22 @@ srs_utime_t SrsSslConnection::get_recv_timeout() srs_error_t SrsSslConnection::read_fully(void *buf, size_t size, ssize_t *nread) { - return transport->read_fully(buf, size, nread); + srs_error_t err = srs_success; + ssize_t nb = 0; + void *p = buf; + while (nb < size) { + ssize_t once_nb = 0; + if ((err = read((char *)p + nb, size - nb, &once_nb)) != srs_success) { + return srs_error_wrap(err, "tls: read"); + } + nb += once_nb; + } + + if (nread) { + *nread = nb; + } + + return err; } int64_t SrsSslConnection::get_recv_bytes() @@ -916,20 +931,20 @@ srs_error_t SrsSslConnection::read(void *plaintext, size_t nn_plaintext, ssize_t // Read the cipher from SSL. ssize_t nn = 0; if ((err = transport->read(cipher.get(), nn_cipher, &nn)) != srs_success) { - return srs_error_wrap(err, "https: read"); + return srs_error_wrap(err, "tls: read"); } int r0 = BIO_write(bio_in, cipher.get(), nn); if (r0 <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn); + return srs_error_new(ERROR_TLS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn); } continue; } // Fail for error. if (r0 <= 0) { - return srs_error_new(ERROR_HTTPS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d", + return srs_error_new(ERROR_TLS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d", r0, r1, r2, r3); } } @@ -955,7 +970,7 @@ srs_error_t SrsSslConnection::write(void *plaintext, size_t nn_plaintext, ssize_ int r1 = SSL_get_error(ssl, r0); ERR_clear_error(); if (r0 <= 0) { - return srs_error_new(ERROR_HTTPS_WRITE, "https: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1); + return srs_error_new(ERROR_TLS_WRITE, "tls: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1); } // Move p to the next writing position. @@ -967,10 +982,10 @@ srs_error_t SrsSslConnection::write(void *plaintext, size_t nn_plaintext, ssize_ uint8_t *data = NULL; int size = BIO_get_mem_data(bio_out, &data); if ((err = transport->write(data, size, NULL)) != srs_success) { - return srs_error_wrap(err, "https: write data=%p, size=%d", data, size); + return srs_error_wrap(err, "tls: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_WRITE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_WRITE, "BIO_reset r0=%d", r0); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 7dc49ed6f..b577ca74c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -92,15 +92,97 @@ SrsClientInfo::~SrsClientInfo() srs_freep(res); } -SrsRtmpConn::SrsRtmpConn(SrsServer *svr, srs_netfd_t c, string cip, int cport) +SrsRtmpTransport::SrsRtmpTransport(srs_netfd_t c) +{ + stfd_ = c; + skt_ = new SrsTcpConnection(c); +} + +SrsRtmpTransport::~SrsRtmpTransport() +{ + srs_freep(skt_); +} + +srs_netfd_t SrsRtmpTransport::fd() +{ + return stfd_; +} + +ISrsProtocolReadWriter *SrsRtmpTransport::io() +{ + return skt_; +} + +srs_error_t SrsRtmpTransport::handshake() +{ + return srs_success; +} + +const char *SrsRtmpTransport::transport_type() +{ + return "plaintext"; +} + +srs_error_t SrsRtmpTransport::set_socket_buffer(srs_utime_t buffer_v) +{ + return skt_->set_socket_buffer(buffer_v); +} + +srs_error_t SrsRtmpTransport::set_tcp_nodelay(bool v) +{ + return skt_->set_tcp_nodelay(v); +} + +int64_t SrsRtmpTransport::get_recv_bytes() +{ + return skt_->get_recv_bytes(); +} + +int64_t SrsRtmpTransport::get_send_bytes() +{ + return skt_->get_send_bytes(); +} + +SrsRtmpsTransport::SrsRtmpsTransport(srs_netfd_t c) : SrsRtmpTransport(c) +{ + ssl_ = new SrsSslConnection(skt_); +} + +SrsRtmpsTransport::~SrsRtmpsTransport() +{ + srs_freep(ssl_); +} + +ISrsProtocolReadWriter *SrsRtmpsTransport::io() +{ + return ssl_; +} + +srs_error_t SrsRtmpsTransport::handshake() +{ + string crt_file = _srs_config->get_rtmps_ssl_cert(); + string key_file = _srs_config->get_rtmps_ssl_key(); + srs_error_t err = ssl_->handshake(key_file, crt_file); + if (err != srs_success) { + return srs_error_wrap(err, "ssl handshake"); + } + + return srs_success; +} + +const char *SrsRtmpsTransport::transport_type() +{ + return "ssl"; +} + +SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip, int cport) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); server = svr; - stfd = c; - skt = new SrsTcpConnection(c); + transport_ = transport; manager = svr; ip = cip; port = cport; @@ -113,11 +195,11 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, srs_netfd_t c, string cip, int cport) trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id()); kbps = new SrsNetworkKbps(); - kbps->set_io(skt, skt); + kbps->set_io(transport_->io(), transport_->io()); delta_ = new SrsNetworkDelta(); - delta_->set_io(skt, skt); + delta_->set_io(transport_->io(), transport_->io()); - rtmp = new SrsRtmpServer(skt); + rtmp = new SrsRtmpServer(transport_->io()); refer = new SrsRefer(); security = new SrsSecurity(); duration = 0; @@ -149,7 +231,7 @@ SrsRtmpConn::~SrsRtmpConn() srs_freep(kbps); srs_freep(delta_); - srs_freep(skt); + srs_freep(transport_); srs_freep(info); srs_freep(rtmp); @@ -186,12 +268,16 @@ srs_error_t SrsRtmpConn::do_cycle() #endif #ifdef SRS_APM - srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd), + srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d, trace=%s, span=%s", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd()), span_main_->format_trace_id(), span_main_->format_span_id()); #else - srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd)); + srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd())); #endif + if ((err = transport_->handshake()) != srs_success) { + return srs_error_wrap(err, "transport handshake"); + } + rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); @@ -321,7 +407,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost) mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - skt->set_socket_buffer(mw_sleep); + transport_->set_socket_buffer(mw_sleep); return err; } @@ -359,7 +445,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost) mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - skt->set_socket_buffer(mw_sleep); + transport_->set_socket_buffer(mw_sleep); return err; } @@ -415,7 +501,7 @@ srs_error_t SrsRtmpConn::service_cycle() } // get the ip which client connected. - std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd)); + std::string local_ip = srs_get_local_ip(srs_netfd_fileno(transport_->fd())); // set chunk size to larger. // set the chunk size before any larger response greater than 128, @@ -807,7 +893,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr source, SrsLiveC // when mw_sleep changed, resize the socket send buffer. mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - skt->set_socket_buffer(mw_sleep); + transport_->set_socket_buffer(mw_sleep); // initialize the send_min_interval send_min_interval = _srs_config->get_send_min_interval(req->vhost); @@ -947,7 +1033,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) if ((err = acquire_err) == srs_success) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 - SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id()); + SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(transport_->fd()), 0, this, source, _srs_context->get_id()); err = do_publishing(source, &rtrd); rtrd.stop(); } @@ -1326,7 +1412,7 @@ void SrsRtmpConn::set_sock_options() if (nvalue != tcp_nodelay) { tcp_nodelay = nvalue; - srs_error_t err = skt->set_tcp_nodelay(tcp_nodelay); + srs_error_t err = transport_->set_tcp_nodelay(tcp_nodelay); if (err != srs_success) { srs_warn("ignore err %s", srs_error_desc(err).c_str()); srs_freep(err); @@ -1463,7 +1549,7 @@ void SrsRtmpConn::http_hooks_on_close() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - SrsHttpHooks::on_close(url, req, skt->get_send_bytes(), skt->get_recv_bytes()); + SrsHttpHooks::on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes()); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 762f6117c..5f54ffc9c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -40,6 +40,7 @@ class SrsCommonMessage; class SrsPacket; class SrsNetworkDelta; class ISrsApmSpan; +class SrsSslConnection; // The simple rtmp client for SRS. class SrsSimpleRtmpClient : public SrsBasicRtmpClient @@ -70,7 +71,54 @@ public: virtual ~SrsClientInfo(); }; -// The client provides the main logic control for RTMP clients. +// The base transport layer for RTMP connections over plain TCP. +class SrsRtmpTransport +{ +protected: + srs_netfd_t stfd_; + SrsTcpConnection *skt_; + +public: + SrsRtmpTransport(srs_netfd_t c); + virtual ~SrsRtmpTransport(); + +public: + // Get the file descriptor for logging and identification + virtual srs_netfd_t fd(); + // Get the appropriate I/O interface (TCP) + virtual ISrsProtocolReadWriter *io(); + // Perform handshake (no-op for plain RTMP) + virtual srs_error_t handshake(); + // Get transport type description for logging + virtual const char *transport_type(); + // Set socket buffer size + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); + // Set TCP nodelay option + virtual srs_error_t set_tcp_nodelay(bool v); + // Get network statistics + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); +}; + +// The SSL/TLS transport layer for RTMPS connections. +class SrsRtmpsTransport : public SrsRtmpTransport +{ +private: + SrsSslConnection *ssl_; + +public: + SrsRtmpsTransport(srs_netfd_t c); + virtual ~SrsRtmpsTransport(); + +public: + // Get the appropriate I/O interface (SSL) + virtual ISrsProtocolReadWriter *io(); + // Perform SSL handshake + virtual srs_error_t handshake(); + // Get transport type description for logging + virtual const char *transport_type(); +}; + class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsReloadHandler, public ISrsCoroutineHandler, public ISrsExpire { // For the thread to directly access any field of connection. @@ -106,8 +154,7 @@ private: SrsClientInfo *info; private: - srs_netfd_t stfd; - SrsTcpConnection *skt; + SrsRtmpTransport *transport_; // Each connection start a green thread, // when thread stop, the connection will be delete by server. SrsCoroutine *trd; @@ -128,7 +175,7 @@ private: ISrsApmSpan *span_client_; public: - SrsRtmpConn(SrsServer *svr, srs_netfd_t c, std::string cip, int port); + SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, std::string cip, int port); virtual ~SrsRtmpConn(); // Interface ISrsResource. public: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index dbd17423a..b81ec6e3f 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -340,6 +340,7 @@ SrsServer::SrsServer() ppid = ::getppid(); rtmp_listener_ = new SrsMultipleTcpListeners(this); + rtmps_listener_ = new SrsMultipleTcpListeners(this); api_listener_ = new SrsTcpListener(this); apis_listener_ = new SrsTcpListener(this); http_listener_ = new SrsTcpListener(this); @@ -400,6 +401,7 @@ void SrsServer::destroy() srs_freep(latest_version_); srs_freep(conn_manager); srs_freep(rtmp_listener_); + srs_freep(rtmps_listener_); srs_freep(api_listener_); srs_freep(apis_listener_); srs_freep(http_listener_); @@ -422,6 +424,7 @@ void SrsServer::dispose() // Destroy all listeners. rtmp_listener_->close(); + rtmps_listener_->close(); api_listener_->close(); apis_listener_->close(); http_listener_->close(); @@ -456,6 +459,7 @@ void SrsServer::gracefully_dispose() // Destroy all listeners. rtmp_listener_->close(); + rtmps_listener_->close(); api_listener_->close(); apis_listener_->close(); http_listener_->close(); @@ -596,6 +600,14 @@ srs_error_t SrsServer::listen() return srs_error_wrap(err, "rtmp listen"); } + // Create RTMPS listeners. + if (_srs_config->get_rtmps_enabled()) { + rtmps_listener_->add(_srs_config->get_rtmps_listen())->set_label("RTMPS"); + if ((err = rtmps_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "rtmps listen"); + } + } + // Create HTTP API listener. if (_srs_config->get_http_api_enabled()) { if (reuse_api_over_server_) { @@ -1292,7 +1304,11 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf // Create resource by normal listeners. if (!resource) { if (listener == rtmp_listener_) { - resource = new SrsRtmpConn(this, stfd2, ip, port); + SrsRtmpTransport *transport = new SrsRtmpTransport(stfd2); + resource = new SrsRtmpConn(this, transport, ip, port); + } else if (listener == rtmps_listener_) { + SrsRtmpTransport *transport = new SrsRtmpsTransport(stfd2); + resource = new SrsRtmpConn(this, transport, ip, port); } else if (listener == api_listener_ || listener == apis_listener_) { string key = listener == apis_listener_ ? _srs_config->get_https_api_ssl_key() : ""; string cert = listener == apis_listener_ ? _srs_config->get_https_api_ssl_cert() : ""; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 76f1a27c3..415e2fd1f 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -41,6 +41,8 @@ class SrsMultipleTcpListeners; class SrsHttpFlvListener; class SrsUdpCasterListener; class SrsGbListener; +class SrsRtmpTransport; +class SrsRtmpsTransport; // Convert signal to io, // @see: st-1.9/docs/notes.html @@ -126,6 +128,8 @@ private: bool reuse_rtc_over_server_; // RTMP stream listeners, over TCP. SrsMultipleTcpListeners *rtmp_listener_; + // RTMPS stream listeners, over TCP. + SrsMultipleTcpListeners *rtmps_listener_; // HTTP API listener, over TCP. Please note that it might reuse with stream listener. SrsTcpListener *api_listener_; // HTTPS API listener, over TCP. Please note that it might reuse with stream listener. diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index 2b813c107..1b8835518 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 55 +#define VERSION_REVISION 56 #endif \ No newline at end of file diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index bff4a6edd..dfa1b588e 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -314,10 +314,10 @@ XX(ERROR_BASE64_DECODE, 4039, "Base64Decode", "Failed to decode the BASE64 content") \ XX(ERROR_HTTP_STREAM_EOF, 4040, "HttpStreamEof", "HTTP stream is EOF") \ XX(ERROR_HTTPS_NOT_SUPPORTED, 4041, "HttpsNotSupported", "HTTPS is not supported") \ - XX(ERROR_HTTPS_HANDSHAKE, 4042, "HttpsHandshake", "Failed to do handshake for HTTPS") \ - XX(ERROR_HTTPS_READ, 4043, "HttpsRead", "Failed to read data from HTTPS stream") \ - XX(ERROR_HTTPS_WRITE, 4044, "HttpsWrite", "Failed to write data to HTTPS stream") \ - XX(ERROR_HTTPS_KEY_CRT, 4045, "HttpsSslFile", "Failed to load SSL key or crt file for HTTPS") \ + XX(ERROR_TLS_HANDSHAKE, 4042, "TlsHandshake", "Failed to do tls handshake") \ + XX(ERROR_TLS_READ, 4043, "TlsRead", "TLS read data failed") \ + XX(ERROR_TLS_WRITE, 4044, "TlsWrite", "TLS write data failed") \ + XX(ERROR_TLS_KEY_CRT, 4045, "TlsSslFile", "Failed to load SSL key or crt file") \ XX(ERROR_GB_SIP_HEADER, 4046, "GbHeaderCallId", "Missing field of SIP header for GB28181") \ XX(ERROR_GB_SIP_MESSAGE, 4047, "GbHeaderCallId", "Invalid SIP message for GB28181") \ XX(ERROR_GB_PS_HEADER, 4048, "GbPsHeader", "Invalid PS header for GB28181") \ diff --git a/trunk/src/protocol/srs_protocol_http_client.cpp b/trunk/src/protocol/srs_protocol_http_client.cpp index dfc2ee994..ded94415a 100644 --- a/trunk/src/protocol/srs_protocol_http_client.cpp +++ b/trunk/src/protocol/srs_protocol_http_client.cpp @@ -71,16 +71,16 @@ srs_error_t SrsSslClient::handshake(const std::string &host) // TODO: Setup callback, see SSL_set_ex_data and SSL_set_info_callback if ((ssl = SSL_new(ssl_ctx)) == NULL) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "SSL_new ssl"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "SSL_new ssl"); } if ((bio_in = BIO_new(BIO_s_mem())) == NULL) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new in"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new in"); } if ((bio_out = BIO_new(BIO_s_mem())) == NULL) { BIO_free(bio_in); - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_new out"); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_new out"); } SSL_set_bio(ssl, bio_in, bio_out); @@ -98,22 +98,22 @@ srs_error_t SrsSslClient::handshake(const std::string &host) int r1 = SSL_get_error(ssl, r0); ERR_clear_error(); if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); } uint8_t *data = NULL; int size = BIO_get_mem_data(bio_out, &data); if (!data || size <= 0) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake data=%p, size=%d", data, size); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake data=%p, size=%d", data, size); } if ((err = transport->write(data, size, NULL)) != srs_success) { return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } - srs_info("https: ClientHello done"); + srs_info("tls: ClientHello done"); // Receive ServerHello, Certificate, Server Key Exchange, Server Hello Done while (true) { @@ -125,36 +125,36 @@ srs_error_t SrsSslClient::handshake(const std::string &host) if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); } r0 = SSL_do_handshake(ssl); r1 = SSL_get_error(ssl, r0); ERR_clear_error(); if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); } if ((size = BIO_get_mem_data(bio_out, &data)) > 0) { // OK, reset it for the next write. if ((r0 = BIO_reset(bio_in)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } break; } } - srs_info("https: ServerHello done"); + srs_info("tls: ServerHello done"); // Send Client Key Exchange, Change Cipher Spec, Encrypted Handshake Message if ((err = transport->write(data, size, NULL)) != srs_success) { return srs_error_wrap(err, "handshake: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_reset r0=%d", r0); } - srs_info("https: Client done"); + srs_info("tls: Client done"); // Receive New Session Ticket, Change Cipher Spec, Encrypted Handshake Message while (true) { @@ -166,7 +166,7 @@ srs_error_t SrsSslClient::handshake(const std::string &host) if ((r0 = BIO_write(bio_in, buf, nn)) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); + return srs_error_new(ERROR_TLS_HANDSHAKE, "BIO_write r0=%d, data=%p, size=%d", r0, buf, nn); } r0 = SSL_do_handshake(ssl); @@ -177,11 +177,11 @@ srs_error_t SrsSslClient::handshake(const std::string &host) } if (r0 != -1 || r1 != SSL_ERROR_WANT_READ) { - return srs_error_new(ERROR_HTTPS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); + return srs_error_new(ERROR_TLS_HANDSHAKE, "handshake r0=%d, r1=%d", r0, r1); } } - srs_info("https: Server done"); + srs_info("tls: Server done"); return err; } @@ -222,14 +222,14 @@ srs_error_t SrsSslClient::read(void *plaintext, size_t nn_plaintext, ssize_t *nr int r0 = BIO_write(bio_in, cipher.get(), nn); if (r0 <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. - return srs_error_new(ERROR_HTTPS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn); + return srs_error_new(ERROR_TLS_READ, "BIO_write r0=%d, cipher=%p, size=%d", r0, cipher.get(), nn); } continue; } // Fail for error. if (r0 <= 0) { - return srs_error_new(ERROR_HTTPS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d", + return srs_error_new(ERROR_TLS_READ, "SSL_read r0=%d, r1=%d, r2=%d, r3=%d", r0, r1, r2, r3); } } @@ -245,7 +245,7 @@ srs_error_t SrsSslClient::write(void *plaintext, size_t nn_plaintext, ssize_t *n int r1 = SSL_get_error(ssl, r0); ERR_clear_error(); if (r0 <= 0) { - return srs_error_new(ERROR_HTTPS_WRITE, "https: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1); + return srs_error_new(ERROR_TLS_WRITE, "tls: write data=%p, size=%d, r0=%d, r1=%d", p, left, r0, r1); } // Move p to the next writing position. @@ -257,10 +257,10 @@ srs_error_t SrsSslClient::write(void *plaintext, size_t nn_plaintext, ssize_t *n uint8_t *data = NULL; int size = BIO_get_mem_data(bio_out, &data); if ((err = transport->write(data, size, NULL)) != srs_success) { - return srs_error_wrap(err, "https: write data=%p, size=%d", data, size); + return srs_error_wrap(err, "tls: write data=%p, size=%d", data, size); } if ((r0 = BIO_reset(bio_out)) != 1) { - return srs_error_new(ERROR_HTTPS_WRITE, "BIO_reset r0=%d", r0); + return srs_error_new(ERROR_TLS_WRITE, "BIO_reset r0=%d", r0); } }