winlin

support snapshot by http hooks for #152.

@@ -36,7 +36,8 @@ reload(sys) @@ -36,7 +36,8 @@ reload(sys)
36 exec("sys.setdefaultencoding('utf-8')") 36 exec("sys.setdefaultencoding('utf-8')")
37 assert sys.getdefaultencoding().lower() == "utf-8" 37 assert sys.getdefaultencoding().lower() == "utf-8"
38 38
39 -import os, json, time, datetime, cherrypy, threading, urllib2 39 +import os, json, time, datetime, cherrypy, threading, urllib2, shlex, subprocess
  40 +import cherrypy.process.plugins
40 41
41 # simple log functions. 42 # simple log functions.
42 def trace(msg): 43 def trace(msg):
@@ -769,6 +770,46 @@ class RESTChats(object): @@ -769,6 +770,46 @@ class RESTChats(object):
769 def OPTIONS(self, *args, **kwargs): 770 def OPTIONS(self, *args, **kwargs):
770 enable_crossdomain() 771 enable_crossdomain()
771 772
  773 +'''
  774 +the snapshot api,
  775 +to start a snapshot when encoder start publish stream,
  776 +stop the snapshot worker when stream finished.
  777 +'''
  778 +class RESTSnapshots(object):
  779 + exposed = True
  780 +
  781 + def __init__(self):
  782 + pass
  783 +
  784 + def POST(self):
  785 + enable_crossdomain()
  786 +
  787 + # return the error code in str
  788 + code = Error.success
  789 +
  790 + req = cherrypy.request.body.read()
  791 + trace("post to streams, req=%s"%(req))
  792 + try:
  793 + json_req = json.loads(req)
  794 + except Exception, ex:
  795 + code = Error.system_parse_json
  796 + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  797 + return str(code)
  798 +
  799 + action = json_req["action"]
  800 + if action == "on_publish":
  801 + code = worker.snapshot_create(json_req)
  802 + elif action == "on_unpublish":
  803 + code = worker.snapshot_destroy(json_req)
  804 + else:
  805 + trace("invalid request action: %s"%(json_req["action"]))
  806 + code = Error.request_invalid_action
  807 +
  808 + return str(code)
  809 +
  810 + def OPTIONS(self, *args, **kwargs):
  811 + enable_crossdomain()
  812 +
772 # HTTP RESTful path. 813 # HTTP RESTful path.
773 class Root(object): 814 class Root(object):
774 exposed = True 815 exposed = True
@@ -809,6 +850,7 @@ class V1(object): @@ -809,6 +850,7 @@ class V1(object):
809 self.proxy = RESTProxy() 850 self.proxy = RESTProxy()
810 self.chats = RESTChats() 851 self.chats = RESTChats()
811 self.servers = RESTServers() 852 self.servers = RESTServers()
  853 + self.snapshots = RESTSnapshots()
812 def GET(self): 854 def GET(self):
813 enable_crossdomain(); 855 enable_crossdomain();
814 return json.dumps({"code":Error.success, "urls":{ 856 return json.dumps({"code":Error.success, "urls":{
@@ -849,10 +891,118 @@ port = int(sys.argv[1]) @@ -849,10 +891,118 @@ port = int(sys.argv[1])
849 static_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "static-dir")) 891 static_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "static-dir"))
850 trace("api server listen at port: %s, static_dir: %s"%(port, static_dir)) 892 trace("api server listen at port: %s, static_dir: %s"%(port, static_dir))
851 893
  894 +
  895 +discard = open("/dev/null", "rw")
  896 +'''
  897 +create process by specifies command.
  898 +@param command the command str to start the process.
  899 +@param stdout_fd an int fd specifies the stdout fd.
  900 +@param stderr_fd an int fd specifies the stderr fd.
  901 +@param log_file a file object specifies the additional log to write to. ignore if None.
  902 +@return a Popen object created by subprocess.Popen().
  903 +'''
  904 +def create_process(command, stdout_fd, stderr_fd):
  905 + # log the original command
  906 + msg = "process start command: %s"%(command);
  907 +
  908 + # to avoid shell injection, directly use the command, no need to filter.
  909 + args = shlex.split(str(command));
  910 + process = subprocess.Popen(args, stdout=stdout_fd, stderr=stderr_fd);
  911 +
  912 + return process;
  913 +'''
  914 +isolate thread for srs worker, to do some job in background,
  915 +for example, to snapshot thumbnail of RTMP stream.
  916 +'''
  917 +class SrsWorker(cherrypy.process.plugins.SimplePlugin):
  918 + def __init__(self, bus):
  919 + cherrypy.process.plugins.SimplePlugin.__init__(self, bus);
  920 + self.__snapshots = {}
  921 +
  922 + def start(self):
  923 + print "srs worker thread started"
  924 +
  925 + def stop(self):
  926 + print "srs worker thread stopped"
  927 +
  928 + def main(self):
  929 + for url in self.__snapshots:
  930 + snapshot = self.__snapshots[url]
  931 +
  932 + diff = time.time() - snapshot['timestamp']
  933 + process = snapshot['process']
  934 +
  935 + # aborted.
  936 + if process is not None and snapshot['abort']:
  937 + process.kill()
  938 + process.poll()
  939 + del self.__snapshots[url]
  940 + print 'abort snapshot %s'%snapshot['cmd']
  941 + break
  942 +
  943 + # already snapshoted and not expired.
  944 + if process is not None and diff < 10:
  945 + continue
  946 +
  947 + # terminate the active process
  948 + if process is not None:
  949 + # the poll will set the process.returncode
  950 + process.poll()
  951 +
  952 + # None incidates the process hasn't terminate yet.
  953 + if process.returncode is not None:
  954 + # process terminated, check the returncode.
  955 + if process.returncode != 0:
  956 + print 'process terminated with error=%s, cmd=%s'%(process.returncode, snapshot['cmd'])
  957 + else:
  958 + # kill the process when user cancel.
  959 + process.kill()
  960 +
  961 + # create new process to snapshot.
  962 + ffmpeg = "./objs/ffmpeg/bin/ffmpeg"
  963 + output = os.path.join(static_dir, "%s-%s-%%3d.png"%(snapshot['app'], snapshot['stream']))
  964 + cmd = '%s -i %s -vf fps=1/6 -vcodec png -f image2 -an -y -vframes 3 -y %s'%(ffmpeg, url, output)
  965 + print 'snapshot by: %s'%cmd
  966 +
  967 + process = create_process(cmd, discard.fileno(), discard.fileno())
  968 + snapshot['process'] = process
  969 + snapshot['cmd'] = cmd
  970 + snapshot['timestamp'] = time.time()
  971 + pass;
  972 +
  973 + # {"action":"on_publish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  974 + # ffmpeg -i rtmp://127.0.0.1:1935/live?vhost=dev/stream -vf fps=1/6 -vcodec png -f image2 -an -y -vframes 3 -y static-dir/live-livestream-%3d.png
  975 + def snapshot_create(self, req):
  976 + url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
  977 + if url in self.__snapshots:
  978 + print 'ignore exists %s'%url
  979 + return Error.success
  980 +
  981 + req['process'] = None
  982 + req['abort'] = False
  983 + req['timestamp'] = time.time()
  984 + self.__snapshots[url] = req
  985 + return Error.success
  986 +
  987 + # {"action":"on_unpublish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  988 + def snapshot_destroy(self, req):
  989 + url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
  990 + if url in self.__snapshots:
  991 + snapshot = self.__snapshots[url]
  992 + snapshot['abort'] = True
  993 + return Error.success
  994 +
  995 +# subscribe the plugin to cherrypy.
  996 +worker = SrsWorker(cherrypy.engine)
  997 +worker.subscribe();
  998 +
  999 +# disable the autoreloader to make it more simple.
  1000 +cherrypy.engine.autoreload.unsubscribe();
  1001 +
852 # cherrypy config. 1002 # cherrypy config.
853 conf = { 1003 conf = {
854 'global': { 1004 'global': {
855 - 'server.shutdown_timeout': 1, 1005 + 'server.shutdown_timeout': 3,
856 'server.socket_host': '0.0.0.0', 1006 'server.socket_host': '0.0.0.0',
857 'server.socket_port': port, 1007 'server.socket_port': port,
858 'tools.encode.on': True, 1008 'tools.encode.on': True,