diff --git a/README.md b/README.md
index 97840fc31..514bc2e78 100755
--- a/README.md
+++ b/README.md
@@ -240,6 +240,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.
## History
+* v1.0, 2014-06-21, support edge token traverse, fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
* v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126
* v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125
diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf
index 43f32c5e1..15b401484 100644
--- a/trunk/conf/full.conf
+++ b/trunk/conf/full.conf
@@ -124,8 +124,15 @@ vhost same.edge.srs.com {
# @remark user can specifies multiple origin for error backup, by space,
# for example, 192.168.1.100:1935 192.168.1.101:1935 192.168.1.102:1935
origin 127.0.0.1:1935 localhost:1935;
+ # for edge, whether open the token traverse mode,
+ # if token traverse on, all connections of edge will forward to origin to check(auth),
+ # it's very important for the edge to do the token auth.
+ # the better way is use http callback to do the token auth by the edge,
+ # but if user prefer origin check(auth), the token_traverse if better solution.
+ # default: off
+ token_traverse off;
}
-# vhost for edge, chnage vhost.
+# vhost for edge, change vhost.
vhost change.edge.srs.com {
mode remote;
# TODO: FIXME: support extra params.
diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp
index fb1c92570..307eb4531 100644
--- a/trunk/src/app/srs_app_config.cpp
+++ b/trunk/src/app/srs_app_config.cpp
@@ -509,7 +509,8 @@ int SrsConfig::reload()
//
// always support reload without additional code:
// chunk_size, ff_log_dir, max_connections,
- // bandcheck, http_hooks, heartbeat
+ // bandcheck, http_hooks, heartbeat,
+ // token_traverse
// merge config: listen
if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) {
@@ -1987,6 +1988,22 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
return conf->get("origin");
}
+bool SrsConfig::get_vhost_edge_token_traverse(std::string vhost)
+{
+ SrsConfDirective* conf = get_vhost(vhost);
+
+ if (!conf) {
+ return false;
+ }
+
+ conf = conf->get("token_traverse");
+ if (!conf || conf->arg0() != "on") {
+ return false;
+ }
+
+ return true;
+}
+
SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope)
{
SrsConfDirective* conf = get_vhost(vhost);
diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp
index e58cd2c5f..a2d99dd68 100644
--- a/trunk/src/app/srs_app_config.hpp
+++ b/trunk/src/app/srs_app_config.hpp
@@ -214,6 +214,7 @@ public:
virtual bool get_vhost_is_edge(std::string vhost);
virtual bool get_vhost_is_edge(SrsConfDirective* vhost);
virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost);
+ virtual bool get_vhost_edge_token_traverse(std::string vhost);
// vhost transcode section
public:
virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope);
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index 55412e019..dbb5f867a 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -24,6 +24,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
#include
+#include
+#include
+#include
using namespace std;
@@ -45,6 +48,8 @@ using namespace std;
#include
#include
#include
+#include
+#include
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
@@ -63,6 +68,9 @@ using namespace std;
// if timeout, close the connection.
#define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
+// when edge timeout, retry next.
+#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
+
SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd)
{
@@ -290,6 +298,16 @@ int SrsRtmpConn::stream_service_cycle()
}
srs_info("set chunk_size=%d success", chunk_size);
+ // do token traverse before serve it.
+ bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
+ bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
+ if (vhost_is_edge && edge_traverse) {
+ if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
+ srs_warn("token auth failed, ret=%d", ret);
+ return ret;
+ }
+ }
+
// find a source to serve.
SrsSource* source = NULL;
if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) {
@@ -297,8 +315,6 @@ int SrsRtmpConn::stream_service_cycle()
}
srs_assert(source != NULL);
- bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
-
// check publish available
// for edge, never check it, for edge use proxy mode.
if (!vhost_is_edge) {
@@ -846,6 +862,122 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg
return ret;
}
+int SrsRtmpConn::check_edge_token_traverse_auth()
+{
+ int ret = ERROR_SUCCESS;
+
+ srs_assert(req);
+
+ st_netfd_t stsock = NULL;
+ SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
+ for (int i = 0; i < (int)conf->args.size(); i++) {
+ if ((ret = connect_server(i, &stsock)) == ERROR_SUCCESS) {
+ break;
+ }
+ }
+ if (ret != ERROR_SUCCESS) {
+ srs_warn("token traverse connect failed. ret=%d", ret);
+ return ret;
+ }
+
+ srs_assert(stsock);
+ SrsSocket* io = new SrsSocket(stsock);
+ SrsRtmpClient* client = new SrsRtmpClient(io);
+
+ ret = do_token_traverse_auth(io, client);
+
+ srs_freep(client);
+ srs_freep(io);
+ srs_close_stfd(stsock);
+
+ return ret;
+}
+
+// TODO: FIXME: refine the connect server serials functions.
+int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock)
+{
+ int ret = ERROR_SUCCESS;
+
+ SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);
+ srs_assert(conf);
+
+ // select the origin.
+ std::string server = conf->args.at(origin_index % conf->args.size());
+ origin_index = (origin_index + 1) % conf->args.size();
+
+ std::string s_port = RTMP_DEFAULT_PORT;
+ int port = ::atoi(RTMP_DEFAULT_PORT);
+ size_t pos = server.find(":");
+ if (pos != std::string::npos) {
+ s_port = server.substr(pos + 1);
+ server = server.substr(0, pos);
+ port = ::atoi(s_port.c_str());
+ }
+
+ // connect to server.
+ std::string ip = srs_dns_resolve(server);
+ if (ip.empty()) {
+ ret = ERROR_SYSTEM_IP_INVALID;
+ srs_error("dns resolve server error, ip empty. ret=%d", ret);
+ return ret;
+ }
+
+ // open socket.
+ // TODO: FIXME: extract utility method
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if(sock == -1){
+ ret = ERROR_SOCKET_CREATE;
+ srs_error("create socket error. ret=%d", ret);
+ return ret;
+ }
+
+ st_netfd_t stsock = st_netfd_open_socket(sock);
+ if(stsock == NULL){
+ ret = ERROR_ST_OPEN_SOCKET;
+ srs_error("st_netfd_open_socket failed. ret=%d", ret);
+ return ret;
+ }
+
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = inet_addr(ip.c_str());
+
+ if (st_connect(stsock, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US) == -1){
+ ret = ERROR_ST_CONNECT;
+ srs_close_stfd(stsock);
+ srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
+ return ret;
+ }
+ srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);
+
+ *pstsock = stsock;
+ return ret;
+}
+
+int SrsRtmpConn::do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client)
+{
+ int ret = ERROR_SUCCESS;
+
+ srs_assert(client);
+
+ client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
+ client->set_send_timeout(SRS_SEND_TIMEOUT_US);
+
+ if ((ret = client->handshake()) != ERROR_SUCCESS) {
+ srs_error("handshake with server failed. ret=%d", ret);
+ return ret;
+ }
+ if ((ret = client->connect_app(req->app, req->tcUrl, req)) != ERROR_SUCCESS) {
+ srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret);
+ return ret;
+ }
+
+ srs_trace("edge token auth ok, tcUrl=%s", req->tcUrl.c_str());
+
+ return ret;
+}
+
int SrsRtmpConn::http_hooks_on_connect()
{
int ret = ERROR_SUCCESS;
diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp
index 2566f2b30..3fd77a047 100644
--- a/trunk/src/app/srs_app_rtmp_conn.hpp
+++ b/trunk/src/app/srs_app_rtmp_conn.hpp
@@ -47,6 +47,7 @@ class SrsHttpHooks;
#endif
class SrsBandwidth;
class SrsKbps;
+class SrsRtmpClient;
/**
* the client provides the main logic control for RTMP clients.
@@ -90,6 +91,10 @@ private:
virtual int flash_publish(SrsSource* source);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
+private:
+ virtual int check_edge_token_traverse_auth();
+ virtual int connect_server(int origin_index, st_netfd_t* pstsock);
+ virtual int do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client);
private:
virtual int http_hooks_on_connect();
virtual void http_hooks_on_close();
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index bd6dcaaa8..7870166f6 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR "0"
#define VERSION_MINOR "9"
-#define VERSION_REVISION "128"
+#define VERSION_REVISION "129"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info.
#define RTMP_SIG_SRS_KEY "SRS"