12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067 |
- #!/usr/bin/python
- '''
- The MIT License (MIT)
- Copyright (c) 2013-2016 SRS(ossrs)
- Permission is hereby granted, free of charge, to any person obtaining a copy of
- this software and associated documentation files (the "Software"), to deal in
- the Software without restriction, including without limitation the rights to
- use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- the Software, and to permit persons to whom the Software is furnished to do so,
- subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- '''
- """
- the api-server is a default demo server for srs to call
- when srs get some event, for example, when client connect
- to srs, srs can invoke the http api of the api-server
- """
- import sys
- # reload sys model to enable the getdefaultencoding method.
- reload(sys)
- # set the default encoding to utf-8
- # using exec to set the encoding, to avoid error in IDE.
- exec("sys.setdefaultencoding('utf-8')")
- assert sys.getdefaultencoding().lower() == "utf-8"
- import os, json, time, datetime, cherrypy, threading, urllib2, shlex, subprocess
- import cherrypy.process.plugins
- # simple log functions.
- def trace(msg):
- date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print "[%s][trace] %s"%(date, msg)
- # enable crossdomain access for js-client
- # define the following method:
- # def OPTIONS(self, *args, **kwargs)
- # enable_crossdomain()
- # invoke this method to enable js to request crossdomain.
- def enable_crossdomain():
- cherrypy.response.headers["Access-Control-Allow-Origin"] = "*"
- cherrypy.response.headers["Access-Control-Allow-Methods"] = "GET, POST, HEAD, PUT, DELETE"
- # generate allow headers for crossdomain.
- allow_headers = ["Cache-Control", "X-Proxy-Authorization", "X-Requested-With", "Content-Type"]
- cherrypy.response.headers["Access-Control-Allow-Headers"] = ",".join(allow_headers)
- # error codes definition
- class Error:
- # ok, success, completed.
- success = 0
- # error when parse json
- system_parse_json = 100
- # request action invalid
- request_invalid_action = 200
- # cdn node not exists
- cdn_node_not_exists = 201
- '''
- handle the clients requests: connect/disconnect vhost/app.
- '''
- class RESTClients(object):
- exposed = True
- def GET(self):
- enable_crossdomain()
- clients = {}
- return json.dumps(clients)
- '''
- for SRS hook: on_connect/on_close
- on_connect:
- when client connect to vhost/app, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_connect",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
- "pageUrl": "http://www.test.com/live.html"
- }
- on_close:
- when client close/disconnect to vhost/app/stream, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_close",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "send_bytes": 10240, "recv_bytes": 10240
- }
- if valid, the hook must return HTTP code 200(Stauts OK) and response
- an int value specifies the error code(0 corresponding to success):
- 0
- '''
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to clients, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_connect":
- code = self.__on_connect(json_req)
- elif action == "on_close":
- code = self.__on_close(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- def __on_connect(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, pageUrl=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["pageUrl"]
- ))
- # TODO: process the on_connect event
- return code
- def __on_close(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, send_bytes=%s, recv_bytes=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["send_bytes"], req["recv_bytes"]
- ))
- # TODO: process the on_close event
- return code
- '''
- handle the streams requests: publish/unpublish stream.
- '''
- class RESTStreams(object):
- exposed = True
- def GET(self):
- enable_crossdomain()
- streams = {}
- return json.dumps(streams)
- '''
- for SRS hook: on_publish/on_unpublish
- on_publish:
- when client(encoder) publish to vhost/app/stream, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_publish",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy"
- }
- on_unpublish:
- when client(encoder) stop publish to vhost/app/stream, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_unpublish",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy"
- }
- if valid, the hook must return HTTP code 200(Stauts OK) and response
- an int value specifies the error code(0 corresponding to success):
- 0
- '''
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to streams, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_publish":
- code = self.__on_publish(json_req)
- elif action == "on_unpublish":
- code = self.__on_unpublish(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- def __on_publish(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
- ))
- # TODO: process the on_publish event
- return code
- def __on_unpublish(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
- ))
- # TODO: process the on_unpublish event
- return code
- '''
- handle the dvrs requests: dvr stream.
- '''
- class RESTDvrs(object):
- exposed = True
- def GET(self):
- enable_crossdomain()
- dvrs = {}
- return json.dumps(dvrs)
- '''
- for SRS hook: on_dvr
- on_dvr:
- when srs reap a dvr file, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_dvr",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy",
- "cwd": "/usr/local/srs",
- "file": "./objs/nginx/html/live/livestream.1420254068776.flv"
- }
- if valid, the hook must return HTTP code 200(Stauts OK) and response
- an int value specifies the error code(0 corresponding to success):
- 0
- '''
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to dvrs, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_dvr":
- code = self.__on_dvr(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- def __on_dvr(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, cwd=%s, file=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"],
- req["cwd"], req["file"]
- ))
- # TODO: process the on_dvr event
- return code
- '''
- handle the hls proxy requests: hls stream.
- '''
- class RESTProxy(object):
- exposed = True
- '''
- for SRS hook: on_hls_notify
- on_hls_notify:
- when srs reap a ts file of hls, call this hook,
- used to push file to cdn network, by get the ts file from cdn network.
- so we use HTTP GET and use the variable following:
- [app], replace with the app.
- [stream], replace with the stream.
- [param], replace with the param.
- [ts_url], replace with the ts url.
- ignore any return data of server.
- '''
- def GET(self, *args, **kwargs):
- enable_crossdomain()
-
- url = "http://" + "/".join(args);
- print "start to proxy url: %s"%url
-
- f = None
- try:
- f = urllib2.urlopen(url)
- f.read()
- except:
- print "error proxy url: %s"%url
- finally:
- if f: f.close()
- print "completed proxy url: %s"%url
- return url
- '''
- handle the hls requests: hls stream.
- '''
- class RESTHls(object):
- exposed = True
- '''
- for SRS hook: on_hls_notify
- on_hls_notify:
- when srs reap a ts file of hls, call this hook,
- used to push file to cdn network, by get the ts file from cdn network.
- so we use HTTP GET and use the variable following:
- [app], replace with the app.
- [stream], replace with the stream.
- [param], replace with the param.
- [ts_url], replace with the ts url.
- ignore any return data of server.
- '''
- def GET(self, *args, **kwargs):
- enable_crossdomain()
- hls = {
- "args": args,
- "kwargs": kwargs
- }
- return json.dumps(hls)
- '''
- for SRS hook: on_hls
- on_hls:
- when srs reap a dvr file, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_dvr",
- "client_id": 1985,
- "ip": "192.168.1.10",
- "vhost": "video.test.com",
- "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy",
- "duration": 9.68, // in seconds
- "cwd": "/usr/local/srs",
- "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts",
- "seq_no": 100
- }
- if valid, the hook must return HTTP code 200(Stauts OK) and response
- an int value specifies the error code(0 corresponding to success):
- 0
- '''
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to hls, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_hls":
- code = self.__on_hls(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- def __on_hls(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%s, cwd=%s, file=%s, seq_no=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["duration"],
- req["cwd"], req["file"], req["seq_no"]
- ))
- # TODO: process the on_hls event
- return code
- '''
- handle the sessions requests: client play/stop stream
- '''
- class RESTSessions(object):
- exposed = True
- def GET(self):
- enable_crossdomain()
- sessions = {}
- return json.dumps(sessions)
- '''
- for SRS hook: on_play/on_stop
- on_play:
- when client(encoder) publish to vhost/app/stream, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_play",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy",
- "pageUrl": "http://www.test.com/live.html"
- }
- on_stop:
- when client(encoder) stop publish to vhost/app/stream, call the hook,
- the request in the POST data string is a object encode by json:
- {
- "action": "on_stop",
- "client_id": 1985,
- "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
- "stream": "livestream", "param":"?token=xxx&salt=yyy"
- }
- if valid, the hook must return HTTP code 200(Stauts OK) and response
- an int value specifies the error code(0 corresponding to success):
- 0
- '''
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to sessions, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_play":
- code = self.__on_play(json_req)
- elif action == "on_stop":
- code = self.__on_stop(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- def __on_play(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, pageUrl=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["pageUrl"]
- ))
- # TODO: process the on_play event
- return code
- def __on_stop(self, req):
- code = Error.success
- trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
- req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
- ))
- # TODO: process the on_stop event
- return code
- global_arm_server_id = os.getpid();
- class ArmServer:
- def __init__(self):
- global global_arm_server_id
- global_arm_server_id += 1
-
- self.id = str(global_arm_server_id)
- self.ip = None
- self.device_id = None
- self.summaries = None
- self.devices = None
-
- self.public_ip = cherrypy.request.remote.ip
- self.heartbeat = time.time()
-
- self.clients = 0
-
- def dead(self):
- dead_time_seconds = 20
- if time.time() - self.heartbeat > dead_time_seconds:
- return True
- return False
-
- def json_dump(self):
- data = {}
- data["id"] = self.id
- data["ip"] = self.ip
- data["device_id"] = self.device_id
- data["summaries"] = self.summaries
- data["devices"] = self.devices
- data["public_ip"] = self.public_ip
- data["heartbeat"] = self.heartbeat
- data["heartbeat_h"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.heartbeat))
- data["api"] = "http://%s:1985/api/v1/summaries"%(self.ip)
- data["console"] = "http://ossrs.net/console/ng_index.html#/summaries?host=%s&port=1985"%(self.ip)
- return data
-
- '''
- the server list
- '''
- class RESTServers(object):
- exposed = True
-
- def __init__(self):
- self.__nodes = []
-
- self.__last_update = datetime.datetime.now();
-
- self.__lock = threading.Lock()
-
- def __get_node(self, device_id):
- for node in self.__nodes:
- if node.device_id == device_id:
- return node
- return None
-
- def __refresh_nodes(self):
- while len(self.__nodes) > 0:
- has_dead_node = False
- for node in self.__nodes:
- if node.dead():
- self.__nodes.remove(node)
- has_dead_node = True
- if not has_dead_node:
- break
- '''
- post to update server ip.
- request body: the new raspberry-pi server ip. TODO: FIXME: more info.
- '''
- def POST(self):
- enable_crossdomain()
-
- try:
- self.__lock.acquire()
- req = cherrypy.request.body.read()
- trace("post to nodes, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code":code, "data": None})
-
- device_id = json_req["device_id"]
- node = self.__get_node(device_id)
- if node is None:
- node = ArmServer()
- self.__nodes.append(node)
-
- node.ip = json_req["ip"]
- if "summaries" in json_req:
- node.summaries = json_req["summaries"]
- if "devices" in json_req:
- node.devices = json_req["devices"]
- node.device_id = device_id
- node.public_ip = cherrypy.request.remote.ip
- node.heartbeat = time.time()
-
- return json.dumps({"code":Error.success, "data": {"id":node.id}})
- finally:
- self.__lock.release()
-
- '''
- get all servers which report to this api-server.
- '''
- def GET(self, id=None):
- enable_crossdomain()
-
- try:
- self.__lock.acquire()
-
- self.__refresh_nodes()
-
- data = []
- for node in self.__nodes:
- if id == None or node.id == str(id) or node.device_id == str(id):
- data.append(node.json_dump())
-
- return json.dumps(data)
- finally:
- self.__lock.release()
- def DELETE(self, id):
- enable_crossdomain()
- raise cherrypy.HTTPError(405, "Not allowed.")
- def PUT(self, id):
- enable_crossdomain()
- raise cherrypy.HTTPError(405, "Not allowed.")
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- global_chat_id = os.getpid();
- '''
- the chat streams, public chat room.
- '''
- class RESTChats(object):
- exposed = True
- global_id = 100
-
- def __init__(self):
- # object fields:
- # id: an int value indicates the id of user.
- # username: a str indicates the user name.
- # url: a str indicates the url of user stream.
- # agent: a str indicates the agent of user.
- # join_date: a number indicates the join timestamp in seconds.
- # join_date_str: a str specifies the formated friendly time.
- # heatbeat: a number indicates the heartbeat timestamp in seconds.
- # vcodec: a dict indicates the video codec info.
- # acodec: a dict indicates the audio codec info.
- self.__chats = [];
- self.__chat_lock = threading.Lock();
- # dead time in seconds, if exceed, remove the chat.
- self.__dead_time = 15;
-
- '''
- get the rtmp url of chat object. None if overflow.
- '''
- def get_url_by_index(self, index):
- index = int(index)
- if index is None or index >= len(self.__chats):
- return None;
- return self.__chats[index]["url"];
- def GET(self):
- enable_crossdomain()
- try:
- self.__chat_lock.acquire();
- chats = [];
- copy = self.__chats[:];
- for chat in copy:
- if time.time() - chat["heartbeat"] > self.__dead_time:
- self.__chats.remove(chat);
- continue;
- chats.append({
- "id": chat["id"],
- "username": chat["username"],
- "url": chat["url"],
- "join_date_str": chat["join_date_str"],
- "heartbeat": chat["heartbeat"],
- });
- finally:
- self.__chat_lock.release();
-
- return json.dumps({"code":0, "data": {"now": time.time(), "chats": chats}})
-
- def POST(self):
- enable_crossdomain()
-
- req = cherrypy.request.body.read()
- chat = json.loads(req)
- global global_chat_id;
- chat["id"] = global_chat_id
- global_chat_id += 1
- chat["join_date"] = time.time();
- chat["heartbeat"] = time.time();
- chat["join_date_str"] = time.strftime("%Y-%m-%d %H:%M:%S");
- try:
- self.__chat_lock.acquire();
- self.__chats.append(chat)
- finally:
- self.__chat_lock.release();
- trace("create chat success, id=%s"%(chat["id"]))
-
- return json.dumps({"code":0, "data": chat["id"]})
- def DELETE(self, id):
- enable_crossdomain()
- try:
- self.__chat_lock.acquire();
- for chat in self.__chats:
- if str(id) != str(chat["id"]):
- continue
- self.__chats.remove(chat)
- trace("delete chat success, id=%s"%(id))
- return json.dumps({"code":0, "data": None})
- finally:
- self.__chat_lock.release();
- raise cherrypy.HTTPError(405, "Not allowed.")
- def PUT(self, id):
- enable_crossdomain()
- try:
- self.__chat_lock.acquire();
- for chat in self.__chats:
- if str(id) != str(chat["id"]):
- continue
- chat["heartbeat"] = time.time();
- trace("heartbeat chat success, id=%s"%(id))
- return json.dumps({"code":0, "data": None})
- finally:
- self.__chat_lock.release();
- raise cherrypy.HTTPError(405, "Not allowed.")
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- '''
- the snapshot api,
- to start a snapshot when encoder start publish stream,
- stop the snapshot worker when stream finished.
- '''
- class RESTSnapshots(object):
- exposed = True
-
- def __init__(self):
- pass
- def POST(self):
- enable_crossdomain()
- # return the error code in str
- code = Error.success
- req = cherrypy.request.body.read()
- trace("post to streams, req=%s"%(req))
- try:
- json_req = json.loads(req)
- except Exception, ex:
- code = Error.system_parse_json
- trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
- return json.dumps({"code": int(code), "data": None})
- action = json_req["action"]
- if action == "on_publish":
- code = worker.snapshot_create(json_req)
- elif action == "on_unpublish":
- code = worker.snapshot_destroy(json_req)
- else:
- trace("invalid request action: %s"%(json_req["action"]))
- code = Error.request_invalid_action
- return json.dumps({"code": int(code), "data": None})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain()
- # HTTP RESTful path.
- class Root(object):
- exposed = True
- def __init__(self):
- self.api = Api()
- def GET(self):
- enable_crossdomain();
- return json.dumps({"code":Error.success, "urls":{"api":"the api root"}})
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain();
- # HTTP RESTful path.
- class Api(object):
- exposed = True
- def __init__(self):
- self.v1 = V1()
- def GET(self):
- enable_crossdomain();
- return json.dumps({"code":Error.success,
- "urls": {
- "v1": "the api version 1.0"
- }
- });
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain();
- # HTTP RESTful path. to access as:
- # http://127.0.0.1:8085/api/v1/clients
- class V1(object):
- exposed = True
- def __init__(self):
- self.clients = RESTClients()
- self.streams = RESTStreams()
- self.sessions = RESTSessions()
- self.dvrs = RESTDvrs()
- self.hls = RESTHls()
- self.proxy = RESTProxy()
- self.chats = RESTChats()
- self.servers = RESTServers()
- self.snapshots = RESTSnapshots()
- def GET(self):
- enable_crossdomain();
- return json.dumps({"code":Error.success, "urls":{
- "clients": "for srs http callback, to handle the clients requests: connect/disconnect vhost/app.",
- "streams": "for srs http callback, to handle the streams requests: publish/unpublish stream.",
- "sessions": "for srs http callback, to handle the sessions requests: client play/stop stream",
- "dvrs": "for srs http callback, to handle the dvr requests: dvr stream.",
- "chats": "for srs demo meeting, the chat streams, public chat room.",
- "servers": {
- "summary": "for srs raspberry-pi and meeting demo",
- "GET": "get the current raspberry-pi servers info",
- "POST ip=node_ip&device_id=device_id": "the new raspberry-pi server info."
- }
- }});
- def OPTIONS(self, *args, **kwargs):
- enable_crossdomain();
- '''
- main code start.
- '''
- # donot support use this module as library.
- if __name__ != "__main__":
- raise Exception("embed not support")
- # check the user options
- if len(sys.argv) <= 1:
- print "SRS api callback server, Copyright (c) 2013-2016 SRS(ossrs)"
- print "Usage: python %s <port>"%(sys.argv[0])
- print " port: the port to listen at."
- print "For example:"
- print " python %s 8085"%(sys.argv[0])
- print ""
- print "See also: https://github.com/ossrs/srs"
- sys.exit(1)
- # parse port from user options.
- port = int(sys.argv[1])
- static_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "static-dir"))
- trace("api server listen at port: %s, static_dir: %s"%(port, static_dir))
- discard = open("/dev/null", "rw")
- '''
- create process by specifies command.
- @param command the command str to start the process.
- @param stdout_fd an int fd specifies the stdout fd.
- @param stderr_fd an int fd specifies the stderr fd.
- @param log_file a file object specifies the additional log to write to. ignore if None.
- @return a Popen object created by subprocess.Popen().
- '''
- def create_process(command, stdout_fd, stderr_fd):
- # log the original command
- msg = "process start command: %s"%(command);
- # to avoid shell injection, directly use the command, no need to filter.
- args = shlex.split(str(command));
- process = subprocess.Popen(args, stdout=stdout_fd, stderr=stderr_fd);
- return process;
- '''
- isolate thread for srs worker, to do some job in background,
- for example, to snapshot thumbnail of RTMP stream.
- '''
- class SrsWorker(cherrypy.process.plugins.SimplePlugin):
- def __init__(self, bus):
- cherrypy.process.plugins.SimplePlugin.__init__(self, bus);
- self.__snapshots = {}
- def start(self):
- print "srs worker thread started"
- def stop(self):
- print "srs worker thread stopped"
- def main(self):
- for url in self.__snapshots:
- snapshot = self.__snapshots[url]
-
- diff = time.time() - snapshot['timestamp']
- process = snapshot['process']
-
- # aborted.
- if process is not None and snapshot['abort']:
- process.kill()
- process.poll()
- del self.__snapshots[url]
- print 'abort snapshot %s'%snapshot['cmd']
- break
- # how many snapshots to output.
- vframes = 5
- # the expire in seconds for ffmpeg to snapshot.
- expire = 1
- # the timeout to kill ffmpeg.
- kill_ffmpeg_timeout = 30 * expire
- # the ffmpeg binary path
- ffmpeg = "./objs/ffmpeg/bin/ffmpeg"
- # the best url for thumbnail.
- besturl = os.path.join(static_dir, "%s/%s-best.png"%(snapshot['app'], snapshot['stream']))
- # the lambda to generate the thumbnail with index.
- lgo = lambda dir, app, stream, index: os.path.join(dir, "%s/%s-%03d.png"%(app, stream, index))
- # the output for snapshot command
- output = os.path.join(static_dir, "%s/%s-%%03d.png"%(snapshot['app'], snapshot['stream']))
- # the ffmepg command to snapshot
- cmd = '%s -i %s -vf fps=1 -vcodec png -f image2 -an -y -vframes %s -y %s'%(ffmpeg, url, vframes, output)
-
- # already snapshoted and not expired.
- if process is not None and diff < expire:
- continue
-
- # terminate the active process
- if process is not None:
- # the poll will set the process.returncode
- process.poll()
- # None incidates the process hasn't terminate yet.
- if process.returncode is not None:
- # process terminated with error.
- if process.returncode != 0:
- print 'process terminated with error=%s, cmd=%s'%(process.returncode, snapshot['cmd'])
- # process terminated normally.
- else:
- # guess the best one.
- bestsize = 0
- for i in range(0, vframes):
- output = lgo(static_dir, snapshot['app'], snapshot['stream'], i + 1)
- fsize = os.path.getsize(output)
- if bestsize < fsize:
- os.system("rm -f '%s'"%besturl)
- os.system("ln -sf '%s' '%s'"%(output, besturl))
- bestsize = fsize
- print 'the best thumbnail is %s'%besturl
- else:
- # wait for process to terminate, timeout is N*expire.
- if diff < kill_ffmpeg_timeout:
- continue
- # kill the process when user cancel.
- else:
- process.kill()
- print 'kill the process %s'%snapshot['cmd']
-
- # create new process to snapshot.
- print 'snapshot by: %s'%cmd
-
- process = create_process(cmd, discard.fileno(), discard.fileno())
- snapshot['process'] = process
- snapshot['cmd'] = cmd
- snapshot['timestamp'] = time.time()
- pass;
-
- # {"action":"on_publish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
- # ffmpeg -i rtmp://127.0.0.1:1935/live?vhost=dev/stream -vf fps=1 -vcodec png -f image2 -an -y -vframes 3 -y static-dir/live/livestream-%03d.png
- def snapshot_create(self, req):
- url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
- if url in self.__snapshots:
- print 'ignore exists %s'%url
- return Error.success
-
- req['process'] = None
- req['abort'] = False
- req['timestamp'] = time.time()
- self.__snapshots[url] = req
- return Error.success
-
- # {"action":"on_unpublish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
- def snapshot_destroy(self, req):
- url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
- if url in self.__snapshots:
- snapshot = self.__snapshots[url]
- snapshot['abort'] = True
- return Error.success
- # subscribe the plugin to cherrypy.
- worker = SrsWorker(cherrypy.engine)
- worker.subscribe();
- # disable the autoreloader to make it more simple.
- cherrypy.engine.autoreload.unsubscribe();
- # cherrypy config.
- conf = {
- 'global': {
- 'server.shutdown_timeout': 3,
- 'server.socket_host': '0.0.0.0',
- 'server.socket_port': port,
- 'tools.encode.on': True,
- 'tools.staticdir.on': True,
- 'tools.encode.encoding': "utf-8",
- #'server.thread_pool': 2, # single thread server.
- },
- '/': {
- 'tools.staticdir.dir': static_dir,
- 'tools.staticdir.index': "index.html",
- # for cherrypy RESTful api support
- 'request.dispatch': cherrypy.dispatch.MethodDispatcher()
- }
- }
- # start cherrypy web engine
- trace("start cherrypy server")
- root = Root()
- cherrypy.quickstart(root, '/', conf)
|