From 33a282e576b89c61029a85b5bdbd84394d6860c3 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 20 Oct 2015 12:30:57 +0800 Subject: [PATCH] support snapshot by http hooks for #152. --- trunk/research/api-server/server.py | 154 +++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 2 deletions(-) diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index fdd0f0f15..3203de1ca 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -36,7 +36,8 @@ reload(sys) exec("sys.setdefaultencoding('utf-8')") assert sys.getdefaultencoding().lower() == "utf-8" -import os, json, time, datetime, cherrypy, threading, urllib2 +import os, json, time, datetime, cherrypy, threading, urllib2, shlex, subprocess +import cherrypy.process.plugins # simple log functions. def trace(msg): @@ -769,6 +770,46 @@ class RESTChats(object): def OPTIONS(self, *args, **kwargs): enable_crossdomain() +''' +the snapshot api, +to start a snapshot when encoder start publish stream, +stop the snapshot worker when stream finished. +''' +class RESTSnapshots(object): + exposed = True + + def __init__(self): + pass + + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to streams, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return str(code) + + action = json_req["action"] + if action == "on_publish": + code = worker.snapshot_create(json_req) + elif action == "on_unpublish": + code = worker.snapshot_destroy(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return str(code) + + def OPTIONS(self, *args, **kwargs): + enable_crossdomain() + # HTTP RESTful path. class Root(object): exposed = True @@ -809,6 +850,7 @@ class V1(object): self.proxy = RESTProxy() self.chats = RESTChats() self.servers = RESTServers() + self.snapshots = RESTSnapshots() def GET(self): enable_crossdomain(); return json.dumps({"code":Error.success, "urls":{ @@ -849,10 +891,118 @@ port = int(sys.argv[1]) static_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "static-dir")) trace("api server listen at port: %s, static_dir: %s"%(port, static_dir)) + +discard = open("/dev/null", "rw") +''' +create process by specifies command. +@param command the command str to start the process. +@param stdout_fd an int fd specifies the stdout fd. +@param stderr_fd an int fd specifies the stderr fd. +@param log_file a file object specifies the additional log to write to. ignore if None. +@return a Popen object created by subprocess.Popen(). +''' +def create_process(command, stdout_fd, stderr_fd): + # log the original command + msg = "process start command: %s"%(command); + + # to avoid shell injection, directly use the command, no need to filter. + args = shlex.split(str(command)); + process = subprocess.Popen(args, stdout=stdout_fd, stderr=stderr_fd); + + return process; +''' +isolate thread for srs worker, to do some job in background, +for example, to snapshot thumbnail of RTMP stream. +''' +class SrsWorker(cherrypy.process.plugins.SimplePlugin): + def __init__(self, bus): + cherrypy.process.plugins.SimplePlugin.__init__(self, bus); + self.__snapshots = {} + + def start(self): + print "srs worker thread started" + + def stop(self): + print "srs worker thread stopped" + + def main(self): + for url in self.__snapshots: + snapshot = self.__snapshots[url] + + diff = time.time() - snapshot['timestamp'] + process = snapshot['process'] + + # aborted. + if process is not None and snapshot['abort']: + process.kill() + process.poll() + del self.__snapshots[url] + print 'abort snapshot %s'%snapshot['cmd'] + break + + # already snapshoted and not expired. + if process is not None and diff < 10: + continue + + # terminate the active process + if process is not None: + # the poll will set the process.returncode + process.poll() + + # None incidates the process hasn't terminate yet. + if process.returncode is not None: + # process terminated, check the returncode. + if process.returncode != 0: + print 'process terminated with error=%s, cmd=%s'%(process.returncode, snapshot['cmd']) + else: + # kill the process when user cancel. + process.kill() + + # create new process to snapshot. + ffmpeg = "./objs/ffmpeg/bin/ffmpeg" + output = os.path.join(static_dir, "%s-%s-%%3d.png"%(snapshot['app'], snapshot['stream'])) + cmd = '%s -i %s -vf fps=1/6 -vcodec png -f image2 -an -y -vframes 3 -y %s'%(ffmpeg, url, output) + print 'snapshot by: %s'%cmd + + process = create_process(cmd, discard.fileno(), discard.fileno()) + snapshot['process'] = process + snapshot['cmd'] = cmd + snapshot['timestamp'] = time.time() + pass; + + # {"action":"on_publish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"} + # ffmpeg -i rtmp://127.0.0.1:1935/live?vhost=dev/stream -vf fps=1/6 -vcodec png -f image2 -an -y -vframes 3 -y static-dir/live-livestream-%3d.png + def snapshot_create(self, req): + url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream']) + if url in self.__snapshots: + print 'ignore exists %s'%url + return Error.success + + req['process'] = None + req['abort'] = False + req['timestamp'] = time.time() + self.__snapshots[url] = req + return Error.success + + # {"action":"on_unpublish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"} + def snapshot_destroy(self, req): + url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream']) + if url in self.__snapshots: + snapshot = self.__snapshots[url] + snapshot['abort'] = True + return Error.success + +# subscribe the plugin to cherrypy. +worker = SrsWorker(cherrypy.engine) +worker.subscribe(); + +# disable the autoreloader to make it more simple. +cherrypy.engine.autoreload.unsubscribe(); + # cherrypy config. conf = { 'global': { - 'server.shutdown_timeout': 1, + 'server.shutdown_timeout': 3, 'server.socket_host': '0.0.0.0', 'server.socket_port': port, 'tools.encode.on': True,