2
0

server.py 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. #!/usr/bin/python
  2. '''
  3. The MIT License (MIT)
  4. Copyright (c) 2013-2016 SRS(ossrs)
  5. Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. this software and associated documentation files (the "Software"), to deal in
  7. the Software without restriction, including without limitation the rights to
  8. use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. the Software, and to permit persons to whom the Software is furnished to do so,
  10. subject to the following conditions:
  11. The above copyright notice and this permission notice shall be included in all
  12. copies or substantial portions of the Software.
  13. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  15. FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  16. COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  17. IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  18. CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  19. '''
  20. """
  21. the api-server is a default demo server for srs to call
  22. when srs get some event, for example, when client connect
  23. to srs, srs can invoke the http api of the api-server
  24. """
  25. import sys
  26. # reload sys model to enable the getdefaultencoding method.
  27. reload(sys)
  28. # set the default encoding to utf-8
  29. # using exec to set the encoding, to avoid error in IDE.
  30. exec("sys.setdefaultencoding('utf-8')")
  31. assert sys.getdefaultencoding().lower() == "utf-8"
  32. import os, json, time, datetime, cherrypy, threading, urllib2, shlex, subprocess
  33. import cherrypy.process.plugins
  34. # simple log functions.
  35. def trace(msg):
  36. date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  37. print "[%s][trace] %s"%(date, msg)
  38. # enable crossdomain access for js-client
  39. # define the following method:
  40. # def OPTIONS(self, *args, **kwargs)
  41. # enable_crossdomain()
  42. # invoke this method to enable js to request crossdomain.
  43. def enable_crossdomain():
  44. cherrypy.response.headers["Access-Control-Allow-Origin"] = "*"
  45. cherrypy.response.headers["Access-Control-Allow-Methods"] = "GET, POST, HEAD, PUT, DELETE"
  46. # generate allow headers for crossdomain.
  47. allow_headers = ["Cache-Control", "X-Proxy-Authorization", "X-Requested-With", "Content-Type"]
  48. cherrypy.response.headers["Access-Control-Allow-Headers"] = ",".join(allow_headers)
  49. # error codes definition
  50. class Error:
  51. # ok, success, completed.
  52. success = 0
  53. # error when parse json
  54. system_parse_json = 100
  55. # request action invalid
  56. request_invalid_action = 200
  57. # cdn node not exists
  58. cdn_node_not_exists = 201
  59. '''
  60. handle the clients requests: connect/disconnect vhost/app.
  61. '''
  62. class RESTClients(object):
  63. exposed = True
  64. def GET(self):
  65. enable_crossdomain()
  66. clients = {}
  67. return json.dumps(clients)
  68. '''
  69. for SRS hook: on_connect/on_close
  70. on_connect:
  71. when client connect to vhost/app, call the hook,
  72. the request in the POST data string is a object encode by json:
  73. {
  74. "action": "on_connect",
  75. "client_id": 1985,
  76. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  77. "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
  78. "pageUrl": "http://www.test.com/live.html"
  79. }
  80. on_close:
  81. when client close/disconnect to vhost/app/stream, call the hook,
  82. the request in the POST data string is a object encode by json:
  83. {
  84. "action": "on_close",
  85. "client_id": 1985,
  86. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  87. "send_bytes": 10240, "recv_bytes": 10240
  88. }
  89. if valid, the hook must return HTTP code 200(Stauts OK) and response
  90. an int value specifies the error code(0 corresponding to success):
  91. 0
  92. '''
  93. def POST(self):
  94. enable_crossdomain()
  95. # return the error code in str
  96. code = Error.success
  97. req = cherrypy.request.body.read()
  98. trace("post to clients, req=%s"%(req))
  99. try:
  100. json_req = json.loads(req)
  101. except Exception, ex:
  102. code = Error.system_parse_json
  103. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  104. return json.dumps({"code": int(code), "data": None})
  105. action = json_req["action"]
  106. if action == "on_connect":
  107. code = self.__on_connect(json_req)
  108. elif action == "on_close":
  109. code = self.__on_close(json_req)
  110. else:
  111. trace("invalid request action: %s"%(json_req["action"]))
  112. code = Error.request_invalid_action
  113. return json.dumps({"code": int(code), "data": None})
  114. def OPTIONS(self, *args, **kwargs):
  115. enable_crossdomain()
  116. def __on_connect(self, req):
  117. code = Error.success
  118. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, pageUrl=%s"%(
  119. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["pageUrl"]
  120. ))
  121. # TODO: process the on_connect event
  122. return code
  123. def __on_close(self, req):
  124. code = Error.success
  125. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, send_bytes=%s, recv_bytes=%s"%(
  126. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["send_bytes"], req["recv_bytes"]
  127. ))
  128. # TODO: process the on_close event
  129. return code
  130. '''
  131. handle the streams requests: publish/unpublish stream.
  132. '''
  133. class RESTStreams(object):
  134. exposed = True
  135. def GET(self):
  136. enable_crossdomain()
  137. streams = {}
  138. return json.dumps(streams)
  139. '''
  140. for SRS hook: on_publish/on_unpublish
  141. on_publish:
  142. when client(encoder) publish to vhost/app/stream, call the hook,
  143. the request in the POST data string is a object encode by json:
  144. {
  145. "action": "on_publish",
  146. "client_id": 1985,
  147. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  148. "stream": "livestream", "param":"?token=xxx&salt=yyy"
  149. }
  150. on_unpublish:
  151. when client(encoder) stop publish to vhost/app/stream, call the hook,
  152. the request in the POST data string is a object encode by json:
  153. {
  154. "action": "on_unpublish",
  155. "client_id": 1985,
  156. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  157. "stream": "livestream", "param":"?token=xxx&salt=yyy"
  158. }
  159. if valid, the hook must return HTTP code 200(Stauts OK) and response
  160. an int value specifies the error code(0 corresponding to success):
  161. 0
  162. '''
  163. def POST(self):
  164. enable_crossdomain()
  165. # return the error code in str
  166. code = Error.success
  167. req = cherrypy.request.body.read()
  168. trace("post to streams, req=%s"%(req))
  169. try:
  170. json_req = json.loads(req)
  171. except Exception, ex:
  172. code = Error.system_parse_json
  173. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  174. return json.dumps({"code": int(code), "data": None})
  175. action = json_req["action"]
  176. if action == "on_publish":
  177. code = self.__on_publish(json_req)
  178. elif action == "on_unpublish":
  179. code = self.__on_unpublish(json_req)
  180. else:
  181. trace("invalid request action: %s"%(json_req["action"]))
  182. code = Error.request_invalid_action
  183. return json.dumps({"code": int(code), "data": None})
  184. def OPTIONS(self, *args, **kwargs):
  185. enable_crossdomain()
  186. def __on_publish(self, req):
  187. code = Error.success
  188. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
  189. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
  190. ))
  191. # TODO: process the on_publish event
  192. return code
  193. def __on_unpublish(self, req):
  194. code = Error.success
  195. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
  196. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
  197. ))
  198. # TODO: process the on_unpublish event
  199. return code
  200. '''
  201. handle the dvrs requests: dvr stream.
  202. '''
  203. class RESTDvrs(object):
  204. exposed = True
  205. def GET(self):
  206. enable_crossdomain()
  207. dvrs = {}
  208. return json.dumps(dvrs)
  209. '''
  210. for SRS hook: on_dvr
  211. on_dvr:
  212. when srs reap a dvr file, call the hook,
  213. the request in the POST data string is a object encode by json:
  214. {
  215. "action": "on_dvr",
  216. "client_id": 1985,
  217. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  218. "stream": "livestream", "param":"?token=xxx&salt=yyy",
  219. "cwd": "/usr/local/srs",
  220. "file": "./objs/nginx/html/live/livestream.1420254068776.flv"
  221. }
  222. if valid, the hook must return HTTP code 200(Stauts OK) and response
  223. an int value specifies the error code(0 corresponding to success):
  224. 0
  225. '''
  226. def POST(self):
  227. enable_crossdomain()
  228. # return the error code in str
  229. code = Error.success
  230. req = cherrypy.request.body.read()
  231. trace("post to dvrs, req=%s"%(req))
  232. try:
  233. json_req = json.loads(req)
  234. except Exception, ex:
  235. code = Error.system_parse_json
  236. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  237. return json.dumps({"code": int(code), "data": None})
  238. action = json_req["action"]
  239. if action == "on_dvr":
  240. code = self.__on_dvr(json_req)
  241. else:
  242. trace("invalid request action: %s"%(json_req["action"]))
  243. code = Error.request_invalid_action
  244. return json.dumps({"code": int(code), "data": None})
  245. def OPTIONS(self, *args, **kwargs):
  246. enable_crossdomain()
  247. def __on_dvr(self, req):
  248. code = Error.success
  249. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, cwd=%s, file=%s"%(
  250. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"],
  251. req["cwd"], req["file"]
  252. ))
  253. # TODO: process the on_dvr event
  254. return code
  255. '''
  256. handle the hls proxy requests: hls stream.
  257. '''
  258. class RESTProxy(object):
  259. exposed = True
  260. '''
  261. for SRS hook: on_hls_notify
  262. on_hls_notify:
  263. when srs reap a ts file of hls, call this hook,
  264. used to push file to cdn network, by get the ts file from cdn network.
  265. so we use HTTP GET and use the variable following:
  266. [app], replace with the app.
  267. [stream], replace with the stream.
  268. [param], replace with the param.
  269. [ts_url], replace with the ts url.
  270. ignore any return data of server.
  271. '''
  272. def GET(self, *args, **kwargs):
  273. enable_crossdomain()
  274. url = "http://" + "/".join(args);
  275. print "start to proxy url: %s"%url
  276. f = None
  277. try:
  278. f = urllib2.urlopen(url)
  279. f.read()
  280. except:
  281. print "error proxy url: %s"%url
  282. finally:
  283. if f: f.close()
  284. print "completed proxy url: %s"%url
  285. return url
  286. '''
  287. handle the hls requests: hls stream.
  288. '''
  289. class RESTHls(object):
  290. exposed = True
  291. '''
  292. for SRS hook: on_hls_notify
  293. on_hls_notify:
  294. when srs reap a ts file of hls, call this hook,
  295. used to push file to cdn network, by get the ts file from cdn network.
  296. so we use HTTP GET and use the variable following:
  297. [app], replace with the app.
  298. [stream], replace with the stream.
  299. [param], replace with the param.
  300. [ts_url], replace with the ts url.
  301. ignore any return data of server.
  302. '''
  303. def GET(self, *args, **kwargs):
  304. enable_crossdomain()
  305. hls = {
  306. "args": args,
  307. "kwargs": kwargs
  308. }
  309. return json.dumps(hls)
  310. '''
  311. for SRS hook: on_hls
  312. on_hls:
  313. when srs reap a dvr file, call the hook,
  314. the request in the POST data string is a object encode by json:
  315. {
  316. "action": "on_dvr",
  317. "client_id": 1985,
  318. "ip": "192.168.1.10",
  319. "vhost": "video.test.com",
  320. "app": "live",
  321. "stream": "livestream", "param":"?token=xxx&salt=yyy",
  322. "duration": 9.68, // in seconds
  323. "cwd": "/usr/local/srs",
  324. "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts",
  325. "seq_no": 100
  326. }
  327. if valid, the hook must return HTTP code 200(Stauts OK) and response
  328. an int value specifies the error code(0 corresponding to success):
  329. 0
  330. '''
  331. def POST(self):
  332. enable_crossdomain()
  333. # return the error code in str
  334. code = Error.success
  335. req = cherrypy.request.body.read()
  336. trace("post to hls, req=%s"%(req))
  337. try:
  338. json_req = json.loads(req)
  339. except Exception, ex:
  340. code = Error.system_parse_json
  341. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  342. return json.dumps({"code": int(code), "data": None})
  343. action = json_req["action"]
  344. if action == "on_hls":
  345. code = self.__on_hls(json_req)
  346. else:
  347. trace("invalid request action: %s"%(json_req["action"]))
  348. code = Error.request_invalid_action
  349. return json.dumps({"code": int(code), "data": None})
  350. def OPTIONS(self, *args, **kwargs):
  351. enable_crossdomain()
  352. def __on_hls(self, req):
  353. code = Error.success
  354. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%s, cwd=%s, file=%s, seq_no=%s"%(
  355. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["duration"],
  356. req["cwd"], req["file"], req["seq_no"]
  357. ))
  358. # TODO: process the on_hls event
  359. return code
  360. '''
  361. handle the sessions requests: client play/stop stream
  362. '''
  363. class RESTSessions(object):
  364. exposed = True
  365. def GET(self):
  366. enable_crossdomain()
  367. sessions = {}
  368. return json.dumps(sessions)
  369. '''
  370. for SRS hook: on_play/on_stop
  371. on_play:
  372. when client(encoder) publish to vhost/app/stream, call the hook,
  373. the request in the POST data string is a object encode by json:
  374. {
  375. "action": "on_play",
  376. "client_id": 1985,
  377. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  378. "stream": "livestream", "param":"?token=xxx&salt=yyy",
  379. "pageUrl": "http://www.test.com/live.html"
  380. }
  381. on_stop:
  382. when client(encoder) stop publish to vhost/app/stream, call the hook,
  383. the request in the POST data string is a object encode by json:
  384. {
  385. "action": "on_stop",
  386. "client_id": 1985,
  387. "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
  388. "stream": "livestream", "param":"?token=xxx&salt=yyy"
  389. }
  390. if valid, the hook must return HTTP code 200(Stauts OK) and response
  391. an int value specifies the error code(0 corresponding to success):
  392. 0
  393. '''
  394. def POST(self):
  395. enable_crossdomain()
  396. # return the error code in str
  397. code = Error.success
  398. req = cherrypy.request.body.read()
  399. trace("post to sessions, req=%s"%(req))
  400. try:
  401. json_req = json.loads(req)
  402. except Exception, ex:
  403. code = Error.system_parse_json
  404. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  405. return json.dumps({"code": int(code), "data": None})
  406. action = json_req["action"]
  407. if action == "on_play":
  408. code = self.__on_play(json_req)
  409. elif action == "on_stop":
  410. code = self.__on_stop(json_req)
  411. else:
  412. trace("invalid request action: %s"%(json_req["action"]))
  413. code = Error.request_invalid_action
  414. return json.dumps({"code": int(code), "data": None})
  415. def OPTIONS(self, *args, **kwargs):
  416. enable_crossdomain()
  417. def __on_play(self, req):
  418. code = Error.success
  419. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, pageUrl=%s"%(
  420. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["pageUrl"]
  421. ))
  422. # TODO: process the on_play event
  423. return code
  424. def __on_stop(self, req):
  425. code = Error.success
  426. trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
  427. req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
  428. ))
  429. # TODO: process the on_stop event
  430. return code
  431. global_arm_server_id = os.getpid();
  432. class ArmServer:
  433. def __init__(self):
  434. global global_arm_server_id
  435. global_arm_server_id += 1
  436. self.id = str(global_arm_server_id)
  437. self.ip = None
  438. self.device_id = None
  439. self.summaries = None
  440. self.devices = None
  441. self.public_ip = cherrypy.request.remote.ip
  442. self.heartbeat = time.time()
  443. self.clients = 0
  444. def dead(self):
  445. dead_time_seconds = 20
  446. if time.time() - self.heartbeat > dead_time_seconds:
  447. return True
  448. return False
  449. def json_dump(self):
  450. data = {}
  451. data["id"] = self.id
  452. data["ip"] = self.ip
  453. data["device_id"] = self.device_id
  454. data["summaries"] = self.summaries
  455. data["devices"] = self.devices
  456. data["public_ip"] = self.public_ip
  457. data["heartbeat"] = self.heartbeat
  458. data["heartbeat_h"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.heartbeat))
  459. data["api"] = "http://%s:1985/api/v1/summaries"%(self.ip)
  460. data["console"] = "http://ossrs.net/console/ng_index.html#/summaries?host=%s&port=1985"%(self.ip)
  461. return data
  462. '''
  463. the server list
  464. '''
  465. class RESTServers(object):
  466. exposed = True
  467. def __init__(self):
  468. self.__nodes = []
  469. self.__last_update = datetime.datetime.now();
  470. self.__lock = threading.Lock()
  471. def __get_node(self, device_id):
  472. for node in self.__nodes:
  473. if node.device_id == device_id:
  474. return node
  475. return None
  476. def __refresh_nodes(self):
  477. while len(self.__nodes) > 0:
  478. has_dead_node = False
  479. for node in self.__nodes:
  480. if node.dead():
  481. self.__nodes.remove(node)
  482. has_dead_node = True
  483. if not has_dead_node:
  484. break
  485. '''
  486. post to update server ip.
  487. request body: the new raspberry-pi server ip. TODO: FIXME: more info.
  488. '''
  489. def POST(self):
  490. enable_crossdomain()
  491. try:
  492. self.__lock.acquire()
  493. req = cherrypy.request.body.read()
  494. trace("post to nodes, req=%s"%(req))
  495. try:
  496. json_req = json.loads(req)
  497. except Exception, ex:
  498. code = Error.system_parse_json
  499. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  500. return json.dumps({"code":code, "data": None})
  501. device_id = json_req["device_id"]
  502. node = self.__get_node(device_id)
  503. if node is None:
  504. node = ArmServer()
  505. self.__nodes.append(node)
  506. node.ip = json_req["ip"]
  507. if "summaries" in json_req:
  508. node.summaries = json_req["summaries"]
  509. if "devices" in json_req:
  510. node.devices = json_req["devices"]
  511. node.device_id = device_id
  512. node.public_ip = cherrypy.request.remote.ip
  513. node.heartbeat = time.time()
  514. return json.dumps({"code":Error.success, "data": {"id":node.id}})
  515. finally:
  516. self.__lock.release()
  517. '''
  518. get all servers which report to this api-server.
  519. '''
  520. def GET(self, id=None):
  521. enable_crossdomain()
  522. try:
  523. self.__lock.acquire()
  524. self.__refresh_nodes()
  525. data = []
  526. for node in self.__nodes:
  527. if id == None or node.id == str(id) or node.device_id == str(id):
  528. data.append(node.json_dump())
  529. return json.dumps(data)
  530. finally:
  531. self.__lock.release()
  532. def DELETE(self, id):
  533. enable_crossdomain()
  534. raise cherrypy.HTTPError(405, "Not allowed.")
  535. def PUT(self, id):
  536. enable_crossdomain()
  537. raise cherrypy.HTTPError(405, "Not allowed.")
  538. def OPTIONS(self, *args, **kwargs):
  539. enable_crossdomain()
  540. global_chat_id = os.getpid();
  541. '''
  542. the chat streams, public chat room.
  543. '''
  544. class RESTChats(object):
  545. exposed = True
  546. global_id = 100
  547. def __init__(self):
  548. # object fields:
  549. # id: an int value indicates the id of user.
  550. # username: a str indicates the user name.
  551. # url: a str indicates the url of user stream.
  552. # agent: a str indicates the agent of user.
  553. # join_date: a number indicates the join timestamp in seconds.
  554. # join_date_str: a str specifies the formated friendly time.
  555. # heatbeat: a number indicates the heartbeat timestamp in seconds.
  556. # vcodec: a dict indicates the video codec info.
  557. # acodec: a dict indicates the audio codec info.
  558. self.__chats = [];
  559. self.__chat_lock = threading.Lock();
  560. # dead time in seconds, if exceed, remove the chat.
  561. self.__dead_time = 15;
  562. '''
  563. get the rtmp url of chat object. None if overflow.
  564. '''
  565. def get_url_by_index(self, index):
  566. index = int(index)
  567. if index is None or index >= len(self.__chats):
  568. return None;
  569. return self.__chats[index]["url"];
  570. def GET(self):
  571. enable_crossdomain()
  572. try:
  573. self.__chat_lock.acquire();
  574. chats = [];
  575. copy = self.__chats[:];
  576. for chat in copy:
  577. if time.time() - chat["heartbeat"] > self.__dead_time:
  578. self.__chats.remove(chat);
  579. continue;
  580. chats.append({
  581. "id": chat["id"],
  582. "username": chat["username"],
  583. "url": chat["url"],
  584. "join_date_str": chat["join_date_str"],
  585. "heartbeat": chat["heartbeat"],
  586. });
  587. finally:
  588. self.__chat_lock.release();
  589. return json.dumps({"code":0, "data": {"now": time.time(), "chats": chats}})
  590. def POST(self):
  591. enable_crossdomain()
  592. req = cherrypy.request.body.read()
  593. chat = json.loads(req)
  594. global global_chat_id;
  595. chat["id"] = global_chat_id
  596. global_chat_id += 1
  597. chat["join_date"] = time.time();
  598. chat["heartbeat"] = time.time();
  599. chat["join_date_str"] = time.strftime("%Y-%m-%d %H:%M:%S");
  600. try:
  601. self.__chat_lock.acquire();
  602. self.__chats.append(chat)
  603. finally:
  604. self.__chat_lock.release();
  605. trace("create chat success, id=%s"%(chat["id"]))
  606. return json.dumps({"code":0, "data": chat["id"]})
  607. def DELETE(self, id):
  608. enable_crossdomain()
  609. try:
  610. self.__chat_lock.acquire();
  611. for chat in self.__chats:
  612. if str(id) != str(chat["id"]):
  613. continue
  614. self.__chats.remove(chat)
  615. trace("delete chat success, id=%s"%(id))
  616. return json.dumps({"code":0, "data": None})
  617. finally:
  618. self.__chat_lock.release();
  619. raise cherrypy.HTTPError(405, "Not allowed.")
  620. def PUT(self, id):
  621. enable_crossdomain()
  622. try:
  623. self.__chat_lock.acquire();
  624. for chat in self.__chats:
  625. if str(id) != str(chat["id"]):
  626. continue
  627. chat["heartbeat"] = time.time();
  628. trace("heartbeat chat success, id=%s"%(id))
  629. return json.dumps({"code":0, "data": None})
  630. finally:
  631. self.__chat_lock.release();
  632. raise cherrypy.HTTPError(405, "Not allowed.")
  633. def OPTIONS(self, *args, **kwargs):
  634. enable_crossdomain()
  635. '''
  636. the snapshot api,
  637. to start a snapshot when encoder start publish stream,
  638. stop the snapshot worker when stream finished.
  639. '''
  640. class RESTSnapshots(object):
  641. exposed = True
  642. def __init__(self):
  643. pass
  644. def POST(self):
  645. enable_crossdomain()
  646. # return the error code in str
  647. code = Error.success
  648. req = cherrypy.request.body.read()
  649. trace("post to streams, req=%s"%(req))
  650. try:
  651. json_req = json.loads(req)
  652. except Exception, ex:
  653. code = Error.system_parse_json
  654. trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
  655. return json.dumps({"code": int(code), "data": None})
  656. action = json_req["action"]
  657. if action == "on_publish":
  658. code = worker.snapshot_create(json_req)
  659. elif action == "on_unpublish":
  660. code = worker.snapshot_destroy(json_req)
  661. else:
  662. trace("invalid request action: %s"%(json_req["action"]))
  663. code = Error.request_invalid_action
  664. return json.dumps({"code": int(code), "data": None})
  665. def OPTIONS(self, *args, **kwargs):
  666. enable_crossdomain()
  667. # HTTP RESTful path.
  668. class Root(object):
  669. exposed = True
  670. def __init__(self):
  671. self.api = Api()
  672. def GET(self):
  673. enable_crossdomain();
  674. return json.dumps({"code":Error.success, "urls":{"api":"the api root"}})
  675. def OPTIONS(self, *args, **kwargs):
  676. enable_crossdomain();
  677. # HTTP RESTful path.
  678. class Api(object):
  679. exposed = True
  680. def __init__(self):
  681. self.v1 = V1()
  682. def GET(self):
  683. enable_crossdomain();
  684. return json.dumps({"code":Error.success,
  685. "urls": {
  686. "v1": "the api version 1.0"
  687. }
  688. });
  689. def OPTIONS(self, *args, **kwargs):
  690. enable_crossdomain();
  691. # HTTP RESTful path. to access as:
  692. # http://127.0.0.1:8085/api/v1/clients
  693. class V1(object):
  694. exposed = True
  695. def __init__(self):
  696. self.clients = RESTClients()
  697. self.streams = RESTStreams()
  698. self.sessions = RESTSessions()
  699. self.dvrs = RESTDvrs()
  700. self.hls = RESTHls()
  701. self.proxy = RESTProxy()
  702. self.chats = RESTChats()
  703. self.servers = RESTServers()
  704. self.snapshots = RESTSnapshots()
  705. def GET(self):
  706. enable_crossdomain();
  707. return json.dumps({"code":Error.success, "urls":{
  708. "clients": "for srs http callback, to handle the clients requests: connect/disconnect vhost/app.",
  709. "streams": "for srs http callback, to handle the streams requests: publish/unpublish stream.",
  710. "sessions": "for srs http callback, to handle the sessions requests: client play/stop stream",
  711. "dvrs": "for srs http callback, to handle the dvr requests: dvr stream.",
  712. "chats": "for srs demo meeting, the chat streams, public chat room.",
  713. "servers": {
  714. "summary": "for srs raspberry-pi and meeting demo",
  715. "GET": "get the current raspberry-pi servers info",
  716. "POST ip=node_ip&device_id=device_id": "the new raspberry-pi server info."
  717. }
  718. }});
  719. def OPTIONS(self, *args, **kwargs):
  720. enable_crossdomain();
  721. '''
  722. main code start.
  723. '''
  724. # donot support use this module as library.
  725. if __name__ != "__main__":
  726. raise Exception("embed not support")
  727. # check the user options
  728. if len(sys.argv) <= 1:
  729. print "SRS api callback server, Copyright (c) 2013-2016 SRS(ossrs)"
  730. print "Usage: python %s <port>"%(sys.argv[0])
  731. print " port: the port to listen at."
  732. print "For example:"
  733. print " python %s 8085"%(sys.argv[0])
  734. print ""
  735. print "See also: https://github.com/ossrs/srs"
  736. sys.exit(1)
  737. # parse port from user options.
  738. port = int(sys.argv[1])
  739. static_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "static-dir"))
  740. trace("api server listen at port: %s, static_dir: %s"%(port, static_dir))
  741. discard = open("/dev/null", "rw")
  742. '''
  743. create process by specifies command.
  744. @param command the command str to start the process.
  745. @param stdout_fd an int fd specifies the stdout fd.
  746. @param stderr_fd an int fd specifies the stderr fd.
  747. @param log_file a file object specifies the additional log to write to. ignore if None.
  748. @return a Popen object created by subprocess.Popen().
  749. '''
  750. def create_process(command, stdout_fd, stderr_fd):
  751. # log the original command
  752. msg = "process start command: %s"%(command);
  753. # to avoid shell injection, directly use the command, no need to filter.
  754. args = shlex.split(str(command));
  755. process = subprocess.Popen(args, stdout=stdout_fd, stderr=stderr_fd);
  756. return process;
  757. '''
  758. isolate thread for srs worker, to do some job in background,
  759. for example, to snapshot thumbnail of RTMP stream.
  760. '''
  761. class SrsWorker(cherrypy.process.plugins.SimplePlugin):
  762. def __init__(self, bus):
  763. cherrypy.process.plugins.SimplePlugin.__init__(self, bus);
  764. self.__snapshots = {}
  765. def start(self):
  766. print "srs worker thread started"
  767. def stop(self):
  768. print "srs worker thread stopped"
  769. def main(self):
  770. for url in self.__snapshots:
  771. snapshot = self.__snapshots[url]
  772. diff = time.time() - snapshot['timestamp']
  773. process = snapshot['process']
  774. # aborted.
  775. if process is not None and snapshot['abort']:
  776. process.kill()
  777. process.poll()
  778. del self.__snapshots[url]
  779. print 'abort snapshot %s'%snapshot['cmd']
  780. break
  781. # how many snapshots to output.
  782. vframes = 5
  783. # the expire in seconds for ffmpeg to snapshot.
  784. expire = 1
  785. # the timeout to kill ffmpeg.
  786. kill_ffmpeg_timeout = 30 * expire
  787. # the ffmpeg binary path
  788. ffmpeg = "./objs/ffmpeg/bin/ffmpeg"
  789. # the best url for thumbnail.
  790. besturl = os.path.join(static_dir, "%s/%s-best.png"%(snapshot['app'], snapshot['stream']))
  791. # the lambda to generate the thumbnail with index.
  792. lgo = lambda dir, app, stream, index: os.path.join(dir, "%s/%s-%03d.png"%(app, stream, index))
  793. # the output for snapshot command
  794. output = os.path.join(static_dir, "%s/%s-%%03d.png"%(snapshot['app'], snapshot['stream']))
  795. # the ffmepg command to snapshot
  796. cmd = '%s -i %s -vf fps=1 -vcodec png -f image2 -an -y -vframes %s -y %s'%(ffmpeg, url, vframes, output)
  797. # already snapshoted and not expired.
  798. if process is not None and diff < expire:
  799. continue
  800. # terminate the active process
  801. if process is not None:
  802. # the poll will set the process.returncode
  803. process.poll()
  804. # None incidates the process hasn't terminate yet.
  805. if process.returncode is not None:
  806. # process terminated with error.
  807. if process.returncode != 0:
  808. print 'process terminated with error=%s, cmd=%s'%(process.returncode, snapshot['cmd'])
  809. # process terminated normally.
  810. else:
  811. # guess the best one.
  812. bestsize = 0
  813. for i in range(0, vframes):
  814. output = lgo(static_dir, snapshot['app'], snapshot['stream'], i + 1)
  815. fsize = os.path.getsize(output)
  816. if bestsize < fsize:
  817. os.system("rm -f '%s'"%besturl)
  818. os.system("ln -sf '%s' '%s'"%(output, besturl))
  819. bestsize = fsize
  820. print 'the best thumbnail is %s'%besturl
  821. else:
  822. # wait for process to terminate, timeout is N*expire.
  823. if diff < kill_ffmpeg_timeout:
  824. continue
  825. # kill the process when user cancel.
  826. else:
  827. process.kill()
  828. print 'kill the process %s'%snapshot['cmd']
  829. # create new process to snapshot.
  830. print 'snapshot by: %s'%cmd
  831. process = create_process(cmd, discard.fileno(), discard.fileno())
  832. snapshot['process'] = process
  833. snapshot['cmd'] = cmd
  834. snapshot['timestamp'] = time.time()
  835. pass;
  836. # {"action":"on_publish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  837. # ffmpeg -i rtmp://127.0.0.1:1935/live?vhost=dev/stream -vf fps=1 -vcodec png -f image2 -an -y -vframes 3 -y static-dir/live/livestream-%03d.png
  838. def snapshot_create(self, req):
  839. url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
  840. if url in self.__snapshots:
  841. print 'ignore exists %s'%url
  842. return Error.success
  843. req['process'] = None
  844. req['abort'] = False
  845. req['timestamp'] = time.time()
  846. self.__snapshots[url] = req
  847. return Error.success
  848. # {"action":"on_unpublish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  849. def snapshot_destroy(self, req):
  850. url = "rtmp://127.0.0.1/%s...vhost...%s/%s"%(req['app'], req['vhost'], req['stream'])
  851. if url in self.__snapshots:
  852. snapshot = self.__snapshots[url]
  853. snapshot['abort'] = True
  854. return Error.success
  855. # subscribe the plugin to cherrypy.
  856. worker = SrsWorker(cherrypy.engine)
  857. worker.subscribe();
  858. # disable the autoreloader to make it more simple.
  859. cherrypy.engine.autoreload.unsubscribe();
  860. # cherrypy config.
  861. conf = {
  862. 'global': {
  863. 'server.shutdown_timeout': 3,
  864. 'server.socket_host': '0.0.0.0',
  865. 'server.socket_port': port,
  866. 'tools.encode.on': True,
  867. 'tools.staticdir.on': True,
  868. 'tools.encode.encoding': "utf-8",
  869. #'server.thread_pool': 2, # single thread server.
  870. },
  871. '/': {
  872. 'tools.staticdir.dir': static_dir,
  873. 'tools.staticdir.index': "index.html",
  874. # for cherrypy RESTful api support
  875. 'request.dispatch': cherrypy.dispatch.MethodDispatcher()
  876. }
  877. }
  878. # start cherrypy web engine
  879. trace("start cherrypy server")
  880. root = Root()
  881. cherrypy.quickstart(root, '/', conf)