QueryListenerWithMultiThreading.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. #!/usr/bin/python
  2. #coding=utf-8
  3. import threading
  4. import time
  5. import queue
  6. from kscore.session import get_session
  7. from threading import Lock
  8. '''
  9. 当含有大量监听器真实服务器的时候
  10. 可以使用这个脚本示例,来规避全量直接查询全量监听器列表过慢的问题
  11. 流程:
  12. 项目制获取项目->查询负载均衡->并发按照负载均衡ID查询监听器
  13. 使用信号量Semaphore做并发量控制
  14. 使用queue做多线程排队并发操作
  15. '''
  16. sem = threading.Semaphore(20)
  17. lock = Lock()
  18. results = []
  19. class SlbThread (threading.Thread):
  20. def __init__(self, lbid, q):
  21. threading.Thread.__init__(self)
  22. self.lbid = lbid
  23. self.q = q
  24. def run(self):
  25. try:
  26. __hasNextPage = True
  27. __nextToken = 1
  28. __maxResults = 50
  29. while __hasNextPage:
  30. __param = {
  31. 'NextToken': __nextToken,
  32. 'MaxResults': __maxResults,
  33. 'Filter.1.Name': 'load-balancer-id',
  34. 'Filter.1.Value.1': self.lbid
  35. }
  36. __allListeners = slbClient.describe_listeners(**__param)
  37. '''
  38. 整合每页的获取结果
  39. '''
  40. lock.acquire(True)
  41. try:
  42. results.extend(__allListeners['ListenerSet'])
  43. finally:
  44. lock.release()
  45. if "NextToken" in __allListeners:
  46. '''
  47. 设置下一页
  48. '''
  49. __nextToken = __allListeners["NextToken"]
  50. else:
  51. '''
  52. 没有NextToken下自动跳出循环
  53. '''
  54. __hasNextPage = False
  55. finally:
  56. sem.release()
  57. self.q.task_done()
  58. if __name__ == "__main__":
  59. s = get_session()
  60. region = 'cn-shanghai-2'
  61. slbClient = s.create_client("slb", region, use_ssl=True)
  62. '''
  63. 获取项目制
  64. '''
  65. # IAM
  66. projects = []
  67. iam = s.create_client("iam", use_ssl=False)
  68. resp = iam.get_account_all_project_list()
  69. for item in resp["ListProjectResult"]["ProjectList"]:
  70. projects.append(item['ProjectId'])
  71. _param = {}
  72. count = 1
  73. for i in projects:
  74. key = "ProjectId." + str(count)
  75. _param.update({key: str(i)})
  76. count = count + 1
  77. print(count)
  78. '''
  79. 获取负载均衡
  80. '''
  81. allLbs = slbClient.describe_load_balancers(**_param)
  82. count = 0
  83. q = queue.Queue(len(allLbs["LoadBalancerDescriptions"]))
  84. old = time.time()
  85. for lb in allLbs["LoadBalancerDescriptions"]:
  86. q.put(lb["LoadBalancerId"])
  87. for lb in allLbs["LoadBalancerDescriptions"]:
  88. lb_id = lb["LoadBalancerId"]
  89. sem.acquire()
  90. print(lb_id, q)
  91. thread = SlbThread(lb_id, q)
  92. thread.start()
  93. q.join()
  94. print(time.time()-old)
  95. print(len(results))
  96. # for item in results:
  97. # print item['ListenerName']
  98. # print item['ListenerId']