batch-processor.t 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. use t::APISIX 'no_plan';
  18. log_level('debug');
  19. repeat_each(1);
  20. no_long_string();
  21. no_root_location();
  22. run_tests;
  23. __DATA__
  24. === TEST 1: send invalid arguments for constructor
  25. --- config
  26. location /t {
  27. content_by_lua_block {
  28. local Batch = require("apisix.utils.batch-processor")
  29. local config = {
  30. max_retry_count = 2,
  31. batch_max_size = 1,
  32. retry_delay = 0,
  33. }
  34. local func_to_send = function(elements)
  35. return true
  36. end
  37. local log_buffer, err = Batch:new("", config)
  38. if log_buffer then
  39. log_buffer:push({hello='world'})
  40. ngx.say("done")
  41. end
  42. if not log_buffer then
  43. ngx.say("failed")
  44. end
  45. }
  46. }
  47. --- request
  48. GET /t
  49. --- response_body
  50. failed
  51. --- wait: 0.5
  52. === TEST 2: sanity
  53. --- config
  54. location /t {
  55. content_by_lua_block {
  56. local Batch = require("apisix.utils.batch-processor")
  57. local func_to_send = function(elements)
  58. return true
  59. end
  60. local config = {
  61. max_retry_count = 2,
  62. batch_max_size = 1,
  63. retry_delay = 0,
  64. }
  65. local log_buffer, err = Batch:new(func_to_send, config)
  66. if not log_buffer then
  67. ngx.say(err)
  68. end
  69. log_buffer:push({hello='world'})
  70. ngx.say("done")
  71. }
  72. }
  73. --- request
  74. GET /t
  75. --- response_body
  76. done
  77. --- error_log
  78. Batch Processor[log buffer] successfully processed the entries
  79. --- wait: 0.5
  80. === TEST 3: batch processor timeout exceeded
  81. --- config
  82. location /t {
  83. content_by_lua_block {
  84. local Batch = require("apisix.utils.batch-processor")
  85. local config = {
  86. max_retry_count = 2,
  87. batch_max_size = 2,
  88. retry_delay = 0,
  89. inactive_timeout = 1
  90. }
  91. local func_to_send = function(elements)
  92. return true
  93. end
  94. local log_buffer, err = Batch:new(func_to_send, config)
  95. if not log_buffer then
  96. ngx.say(err)
  97. end
  98. log_buffer:push({hello='world'})
  99. ngx.say("done")
  100. }
  101. }
  102. --- request
  103. GET /t
  104. --- response_body
  105. done
  106. --- error_log
  107. Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
  108. Batch Processor[log buffer] successfully processed the entries
  109. --- wait: 3
  110. === TEST 4: batch processor batch max size exceeded
  111. --- config
  112. location /t {
  113. content_by_lua_block {
  114. local Batch = require("apisix.utils.batch-processor")
  115. local config = {
  116. max_retry_count = 2,
  117. batch_max_size = 2,
  118. retry_delay = 0,
  119. }
  120. local func_to_send = function(elements)
  121. return true
  122. end
  123. local log_buffer, err = Batch:new(func_to_send, config)
  124. if not log_buffer then
  125. ngx.say(err)
  126. end
  127. log_buffer:push({hello='world'})
  128. log_buffer:push({hello='world'})
  129. ngx.say("done")
  130. }
  131. }
  132. --- request
  133. GET /t
  134. --- response_body
  135. done
  136. --- no_error_log
  137. Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
  138. --- error_log
  139. Batch Processor[log buffer] batch max size has exceeded
  140. Batch Processor[log buffer] successfully processed the entries
  141. --- wait: 1
  142. === TEST 5: first failed to process and second try success
  143. --- config
  144. location /t {
  145. content_by_lua_block {
  146. local Batch = require("apisix.utils.batch-processor")
  147. local core = require("apisix.core")
  148. local retry = false
  149. local config = {
  150. max_retry_count = 2,
  151. batch_max_size = 2,
  152. retry_delay = 0,
  153. }
  154. local func_to_send = function(elements)
  155. if not retry then
  156. retry = true
  157. return false
  158. end
  159. return true
  160. end
  161. local log_buffer, err = Batch:new(func_to_send, config)
  162. if not log_buffer then
  163. ngx.say(err)
  164. end
  165. log_buffer:push({hello='world'})
  166. log_buffer:push({hello='world'})
  167. ngx.say("done")
  168. }
  169. }
  170. --- request
  171. GET /t
  172. --- response_body
  173. done
  174. --- error_log
  175. Batch Processor[log buffer] failed to process entries
  176. Batch Processor[log buffer] successfully processed the entries
  177. --- wait: 0.5
  178. === TEST 6: Exceeding max retry count
  179. --- config
  180. location /t {
  181. content_by_lua_block {
  182. local Batch = require("apisix.utils.batch-processor")
  183. local config = {
  184. max_retry_count = 2,
  185. batch_max_size = 2,
  186. retry_delay = 0,
  187. }
  188. local func_to_send = function(elements)
  189. return false
  190. end
  191. local log_buffer, err = Batch:new(func_to_send, config)
  192. if not log_buffer then
  193. ngx.say(err)
  194. end
  195. log_buffer:push({hello='world'})
  196. log_buffer:push({hello='world'})
  197. ngx.say("done")
  198. }
  199. }
  200. --- request
  201. GET /t
  202. --- response_body
  203. done
  204. --- no_error_log
  205. Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
  206. --- error_log
  207. Batch Processor[log buffer] failed to process entries
  208. Batch Processor[log buffer] exceeded the max_retry_count
  209. --- wait: 0.5
  210. === TEST 7: two batches
  211. --- config
  212. location /t {
  213. content_by_lua_block {
  214. local Batch = require("apisix.utils.batch-processor")
  215. local core = require("apisix.core")
  216. local count = 0
  217. local config = {
  218. max_retry_count = 2,
  219. batch_max_size = 2,
  220. retry_delay = 0,
  221. }
  222. local func_to_send = function(elements)
  223. count = count + 1
  224. core.log.info("batch[", count , "] sent")
  225. return true
  226. end
  227. local log_buffer, err = Batch:new(func_to_send, config)
  228. if not log_buffer then
  229. ngx.say(err)
  230. end
  231. log_buffer:push({hello='world'})
  232. log_buffer:push({hello='world'})
  233. log_buffer:push({hello='world'})
  234. log_buffer:push({hello='world'})
  235. ngx.say("done")
  236. }
  237. }
  238. --- request
  239. GET /t
  240. --- response_body
  241. done
  242. --- no_error_log
  243. Batch Processor[log buffer] activating flush due to no activity
  244. --- error_log
  245. batch[1] sent
  246. batch[2] sent
  247. --- wait: 0.5
  248. === TEST 8: batch processor retry count 0 and fail processing
  249. --- config
  250. location /t {
  251. content_by_lua_block {
  252. local Batch = require("apisix.utils.batch-processor")
  253. local config = {
  254. max_retry_count = 0,
  255. batch_max_size = 2,
  256. retry_delay = 0,
  257. }
  258. local func_to_send = function(elements)
  259. return false
  260. end
  261. local log_buffer, err = Batch:new(func_to_send, config)
  262. if not log_buffer then
  263. ngx.say(err)
  264. end
  265. log_buffer:push({hello='world'})
  266. log_buffer:push({hello='world'})
  267. ngx.say("done")
  268. }
  269. }
  270. --- request
  271. GET /t
  272. --- response_body
  273. done
  274. --- no_error_log
  275. Batch Processor[log buffer] activating flush due to no activity
  276. --- error_log
  277. Batch Processor[log buffer] exceeded the max_retry_count
  278. --- wait: 0.5
  279. === TEST 9: batch processor timeout exceeded
  280. --- config
  281. location /t {
  282. content_by_lua_block {
  283. local Batch = require("apisix.utils.batch-processor")
  284. local config = {
  285. max_retry_count = 2,
  286. batch_max_size = 2,
  287. retry_delay = 0,
  288. buffer_duration = 60,
  289. inactive_timeout = 1,
  290. }
  291. local func_to_send = function(elements)
  292. return true
  293. end
  294. local log_buffer, err = Batch:new(func_to_send, config)
  295. if not log_buffer then
  296. ngx.say(err)
  297. end
  298. log_buffer:push({hello='world'})
  299. ngx.say("done")
  300. }
  301. }
  302. --- request
  303. GET /t
  304. --- response_body
  305. done
  306. --- error_log
  307. Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
  308. Batch Processor[log buffer] successfully processed the entries
  309. --- wait: 3
  310. === TEST 10: json encode and log elements
  311. --- config
  312. location /t {
  313. content_by_lua_block {
  314. local Batch = require("apisix.utils.batch-processor")
  315. local core = require("apisix.core")
  316. local config = {
  317. max_retry_count = 2,
  318. batch_max_size = 2,
  319. retry_delay = 0,
  320. }
  321. local func_to_send = function(elements)
  322. core.log.info(require("toolkit.json").encode(elements))
  323. return true
  324. end
  325. local log_buffer, err = Batch:new(func_to_send, config)
  326. if not log_buffer then
  327. ngx.say(err)
  328. end
  329. log_buffer:push({msg='1'})
  330. log_buffer:push({msg='2'})
  331. log_buffer:push({msg='3'})
  332. log_buffer:push({msg='4'})
  333. ngx.say("done")
  334. }
  335. }
  336. --- request
  337. GET /t
  338. --- response_body
  339. done
  340. --- no_error_log
  341. Batch Processor[log buffer] activating flush due to no activity
  342. --- error_log
  343. [{"msg":"1"},{"msg":"2"}]
  344. [{"msg":"3"},{"msg":"4"}]
  345. --- wait: 0.5
  346. === TEST 11: extend timer
  347. --- config
  348. location /t {
  349. content_by_lua_block {
  350. local Batch = require("apisix.utils.batch-processor")
  351. local core = require("apisix.core")
  352. local config = {
  353. max_retry_count = 1,
  354. batch_max_size = 3,
  355. retry_delay = 0,
  356. inactive_timeout = 1
  357. }
  358. local func_to_send = function(elements)
  359. core.log.info(require("toolkit.json").encode(elements))
  360. return true
  361. end
  362. local log_buffer, err = Batch:new(func_to_send, config)
  363. if not log_buffer then
  364. ngx.say(err)
  365. end
  366. log_buffer:push({msg='1'})
  367. ngx.sleep(0.3)
  368. log_buffer:push({msg='2'})
  369. log_buffer:push({msg='3'})
  370. log_buffer:push({msg='4'})
  371. ngx.say("done")
  372. }
  373. }
  374. --- request
  375. GET /t
  376. --- response_body
  377. done
  378. --- no_error_log
  379. Batch Processor[log buffer] activating flush due to no activity
  380. --- error_log
  381. Batch Processor[log buffer] extending buffer timer
  382. --- wait: 3
  383. === TEST 12: partially consumed entries
  384. --- config
  385. location /t {
  386. content_by_lua_block {
  387. local Batch = require("apisix.utils.batch-processor")
  388. local core = require("apisix.core")
  389. local config = {
  390. max_retry_count = 1,
  391. batch_max_size = 3,
  392. retry_delay = 0,
  393. inactive_timeout = 1
  394. }
  395. local func_to_send = function(elements)
  396. core.log.info(require("toolkit.json").encode(elements))
  397. return false, "error after consuming single entry", 2
  398. end
  399. local log_buffer, err = Batch:new(func_to_send, config)
  400. if not log_buffer then
  401. ngx.say(err)
  402. end
  403. log_buffer:push({msg='1'})
  404. log_buffer:push({msg='2'})
  405. log_buffer:push({msg='3'})
  406. log_buffer:push({msg='4'})
  407. ngx.say("done")
  408. }
  409. }
  410. --- request
  411. GET /t
  412. --- response_body
  413. done
  414. --- error_log
  415. [{"msg":"1"},{"msg":"2"},{"msg":"3"}]
  416. Batch Processor[log buffer] failed to process entries [2/3]: error after consuming single entry
  417. [{"msg":"2"},{"msg":"3"}]
  418. Batch Processor[log buffer] failed to process entries [1/2]: error after consuming single entry
  419. [{"msg":"4"}]
  420. --- wait: 2