vp9_multi_thread.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * Copyright (c) 2017 The WebM project authors. All Rights Reserved.
  3. *
  4. * Use of this source code is governed by a BSD-style license
  5. * that can be found in the LICENSE file in the root of the source
  6. * tree. An additional intellectual property rights grant can be found
  7. * in the file PATENTS. All contributing project authors may
  8. * be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #include <assert.h>
  11. #include "vp9/encoder/vp9_encoder.h"
  12. #include "vp9/encoder/vp9_ethread.h"
  13. #include "vp9/encoder/vp9_multi_thread.h"
  14. #include "vp9/encoder/vp9_temporal_filter.h"
  15. void *vp9_enc_grp_get_next_job(MultiThreadHandle *multi_thread_ctxt,
  16. int tile_id) {
  17. RowMTInfo *row_mt_info;
  18. JobQueueHandle *job_queue_hdl = NULL;
  19. void *next = NULL;
  20. JobNode *job_info = NULL;
  21. #if CONFIG_MULTITHREAD
  22. pthread_mutex_t *mutex_handle = NULL;
  23. #endif
  24. row_mt_info = (RowMTInfo *)(&multi_thread_ctxt->row_mt_info[tile_id]);
  25. job_queue_hdl = (JobQueueHandle *)&row_mt_info->job_queue_hdl;
  26. #if CONFIG_MULTITHREAD
  27. mutex_handle = &row_mt_info->job_mutex;
  28. #endif
  29. // lock the mutex for queue access
  30. #if CONFIG_MULTITHREAD
  31. pthread_mutex_lock(mutex_handle);
  32. #endif
  33. next = job_queue_hdl->next;
  34. if (NULL != next) {
  35. JobQueue *job_queue = (JobQueue *)next;
  36. job_info = &job_queue->job_info;
  37. // Update the next job in the queue
  38. job_queue_hdl->next = job_queue->next;
  39. job_queue_hdl->num_jobs_acquired++;
  40. }
  41. #if CONFIG_MULTITHREAD
  42. pthread_mutex_unlock(mutex_handle);
  43. #endif
  44. return job_info;
  45. }
  46. void vp9_row_mt_alloc_rd_thresh(VP9_COMP *const cpi,
  47. TileDataEnc *const this_tile) {
  48. VP9_COMMON *const cm = &cpi->common;
  49. const int sb_rows =
  50. (mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2) + 1;
  51. int i;
  52. this_tile->row_base_thresh_freq_fact =
  53. (int *)vpx_calloc(sb_rows * BLOCK_SIZES * MAX_MODES,
  54. sizeof(*(this_tile->row_base_thresh_freq_fact)));
  55. for (i = 0; i < sb_rows * BLOCK_SIZES * MAX_MODES; i++)
  56. this_tile->row_base_thresh_freq_fact[i] = RD_THRESH_INIT_FACT;
  57. }
  58. void vp9_row_mt_mem_alloc(VP9_COMP *cpi) {
  59. struct VP9Common *cm = &cpi->common;
  60. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  61. int tile_row, tile_col;
  62. const int tile_cols = 1 << cm->log2_tile_cols;
  63. const int tile_rows = 1 << cm->log2_tile_rows;
  64. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  65. int jobs_per_tile_col, total_jobs;
  66. // Allocate memory that is large enough for all row_mt stages. First pass
  67. // uses 16x16 block size.
  68. jobs_per_tile_col = VPXMAX(cm->mb_rows, sb_rows);
  69. // Calculate the total number of jobs
  70. total_jobs = jobs_per_tile_col * tile_cols;
  71. multi_thread_ctxt->allocated_tile_cols = tile_cols;
  72. multi_thread_ctxt->allocated_tile_rows = tile_rows;
  73. multi_thread_ctxt->allocated_vert_unit_rows = jobs_per_tile_col;
  74. multi_thread_ctxt->job_queue =
  75. (JobQueue *)vpx_memalign(32, total_jobs * sizeof(JobQueue));
  76. #if CONFIG_MULTITHREAD
  77. // Create mutex for each tile
  78. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  79. RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
  80. pthread_mutex_init(&row_mt_info->job_mutex, NULL);
  81. }
  82. #endif
  83. // Allocate memory for row based multi-threading
  84. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  85. TileDataEnc *this_tile = &cpi->tile_data[tile_col];
  86. vp9_row_mt_sync_mem_alloc(&this_tile->row_mt_sync, cm, jobs_per_tile_col);
  87. if (cpi->sf.adaptive_rd_thresh_row_mt) {
  88. if (this_tile->row_base_thresh_freq_fact != NULL) {
  89. vpx_free(this_tile->row_base_thresh_freq_fact);
  90. this_tile->row_base_thresh_freq_fact = NULL;
  91. }
  92. vp9_row_mt_alloc_rd_thresh(cpi, this_tile);
  93. }
  94. }
  95. // Assign the sync pointer of tile row zero for every tile row > 0
  96. for (tile_row = 1; tile_row < tile_rows; tile_row++) {
  97. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  98. TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
  99. TileDataEnc *this_col_tile = &cpi->tile_data[tile_col];
  100. this_tile->row_mt_sync = this_col_tile->row_mt_sync;
  101. }
  102. }
  103. // Calculate the number of vertical units in the given tile row
  104. for (tile_row = 0; tile_row < tile_rows; tile_row++) {
  105. TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols];
  106. TileInfo *tile_info = &this_tile->tile_info;
  107. multi_thread_ctxt->num_tile_vert_sbs[tile_row] =
  108. get_num_vert_units(*tile_info, MI_BLOCK_SIZE_LOG2);
  109. }
  110. }
  111. void vp9_row_mt_mem_dealloc(VP9_COMP *cpi) {
  112. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  113. int tile_col;
  114. #if CONFIG_MULTITHREAD
  115. int tile_row;
  116. #endif
  117. // Deallocate memory for job queue
  118. if (multi_thread_ctxt->job_queue) vpx_free(multi_thread_ctxt->job_queue);
  119. #if CONFIG_MULTITHREAD
  120. // Destroy mutex for each tile
  121. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  122. tile_col++) {
  123. RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
  124. if (row_mt_info) pthread_mutex_destroy(&row_mt_info->job_mutex);
  125. }
  126. #endif
  127. // Free row based multi-threading sync memory
  128. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  129. tile_col++) {
  130. TileDataEnc *this_tile = &cpi->tile_data[tile_col];
  131. vp9_row_mt_sync_mem_dealloc(&this_tile->row_mt_sync);
  132. }
  133. #if CONFIG_MULTITHREAD
  134. for (tile_row = 0; tile_row < multi_thread_ctxt->allocated_tile_rows;
  135. tile_row++) {
  136. for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
  137. tile_col++) {
  138. TileDataEnc *this_tile =
  139. &cpi->tile_data[tile_row * multi_thread_ctxt->allocated_tile_cols +
  140. tile_col];
  141. if (this_tile->row_base_thresh_freq_fact != NULL) {
  142. vpx_free(this_tile->row_base_thresh_freq_fact);
  143. this_tile->row_base_thresh_freq_fact = NULL;
  144. }
  145. }
  146. }
  147. #endif
  148. }
  149. void vp9_multi_thread_tile_init(VP9_COMP *cpi) {
  150. VP9_COMMON *const cm = &cpi->common;
  151. const int tile_cols = 1 << cm->log2_tile_cols;
  152. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  153. int i;
  154. for (i = 0; i < tile_cols; i++) {
  155. TileDataEnc *this_tile = &cpi->tile_data[i];
  156. int jobs_per_tile_col = cpi->oxcf.pass == 1 ? cm->mb_rows : sb_rows;
  157. // Initialize cur_col to -1 for all rows.
  158. memset(this_tile->row_mt_sync.cur_col, -1,
  159. sizeof(*this_tile->row_mt_sync.cur_col) * jobs_per_tile_col);
  160. vp9_zero(this_tile->fp_data);
  161. this_tile->fp_data.image_data_start_row = INVALID_ROW;
  162. }
  163. }
  164. void vp9_assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt,
  165. int tile_cols, int num_workers) {
  166. int tile_id = 0;
  167. int i;
  168. // Allocating the threads for the tiles
  169. for (i = 0; i < num_workers; i++) {
  170. multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
  171. if (tile_id == tile_cols) tile_id = 0;
  172. }
  173. }
  174. int vp9_get_job_queue_status(MultiThreadHandle *multi_thread_ctxt,
  175. int cur_tile_id) {
  176. RowMTInfo *row_mt_info;
  177. JobQueueHandle *job_queue_hndl;
  178. #if CONFIG_MULTITHREAD
  179. pthread_mutex_t *mutex;
  180. #endif
  181. int num_jobs_remaining;
  182. row_mt_info = &multi_thread_ctxt->row_mt_info[cur_tile_id];
  183. job_queue_hndl = &row_mt_info->job_queue_hdl;
  184. #if CONFIG_MULTITHREAD
  185. mutex = &row_mt_info->job_mutex;
  186. #endif
  187. #if CONFIG_MULTITHREAD
  188. pthread_mutex_lock(mutex);
  189. #endif
  190. num_jobs_remaining =
  191. multi_thread_ctxt->jobs_per_tile_col - job_queue_hndl->num_jobs_acquired;
  192. #if CONFIG_MULTITHREAD
  193. pthread_mutex_unlock(mutex);
  194. #endif
  195. return (num_jobs_remaining);
  196. }
  197. void vp9_prepare_job_queue(VP9_COMP *cpi, JOB_TYPE job_type) {
  198. VP9_COMMON *const cm = &cpi->common;
  199. MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
  200. JobQueue *job_queue = multi_thread_ctxt->job_queue;
  201. const int tile_cols = 1 << cm->log2_tile_cols;
  202. int job_row_num, jobs_per_tile, jobs_per_tile_col = 0, total_jobs;
  203. const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
  204. int tile_col, i;
  205. switch (job_type) {
  206. case ENCODE_JOB: jobs_per_tile_col = sb_rows; break;
  207. case FIRST_PASS_JOB: jobs_per_tile_col = cm->mb_rows; break;
  208. case ARNR_JOB:
  209. jobs_per_tile_col = ((cm->mi_rows + TF_ROUND) >> TF_SHIFT);
  210. break;
  211. default: assert(0);
  212. }
  213. total_jobs = jobs_per_tile_col * tile_cols;
  214. multi_thread_ctxt->jobs_per_tile_col = jobs_per_tile_col;
  215. // memset the entire job queue buffer to zero
  216. memset(job_queue, 0, total_jobs * sizeof(JobQueue));
  217. // Job queue preparation
  218. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  219. RowMTInfo *tile_ctxt = &multi_thread_ctxt->row_mt_info[tile_col];
  220. JobQueue *job_queue_curr, *job_queue_temp;
  221. int tile_row = 0;
  222. tile_ctxt->job_queue_hdl.next = (void *)job_queue;
  223. tile_ctxt->job_queue_hdl.num_jobs_acquired = 0;
  224. job_queue_curr = job_queue;
  225. job_queue_temp = job_queue;
  226. // loop over all the vertical rows
  227. for (job_row_num = 0, jobs_per_tile = 0; job_row_num < jobs_per_tile_col;
  228. job_row_num++, jobs_per_tile++) {
  229. job_queue_curr->job_info.vert_unit_row_num = job_row_num;
  230. job_queue_curr->job_info.tile_col_id = tile_col;
  231. job_queue_curr->job_info.tile_row_id = tile_row;
  232. job_queue_curr->next = (void *)(job_queue_temp + 1);
  233. job_queue_curr = ++job_queue_temp;
  234. if (ENCODE_JOB == job_type) {
  235. if (jobs_per_tile >=
  236. multi_thread_ctxt->num_tile_vert_sbs[tile_row] - 1) {
  237. tile_row++;
  238. jobs_per_tile = -1;
  239. }
  240. }
  241. }
  242. // Set the last pointer to NULL
  243. job_queue_curr += -1;
  244. job_queue_curr->next = (void *)NULL;
  245. // Move to the next tile
  246. job_queue += jobs_per_tile_col;
  247. }
  248. for (i = 0; i < cpi->num_workers; i++) {
  249. EncWorkerData *thread_data;
  250. thread_data = &cpi->tile_thr_data[i];
  251. thread_data->thread_id = i;
  252. for (tile_col = 0; tile_col < tile_cols; tile_col++)
  253. thread_data->tile_completion_status[tile_col] = 0;
  254. }
  255. }
  256. int vp9_get_tiles_proc_status(MultiThreadHandle *multi_thread_ctxt,
  257. int *tile_completion_status, int *cur_tile_id,
  258. int tile_cols) {
  259. int tile_col;
  260. int tile_id = -1; // Stores the tile ID with minimum proc done
  261. int max_num_jobs_remaining = 0;
  262. int num_jobs_remaining;
  263. // Mark the completion to avoid check in the loop
  264. tile_completion_status[*cur_tile_id] = 1;
  265. // Check for the status of all the tiles
  266. for (tile_col = 0; tile_col < tile_cols; tile_col++) {
  267. if (tile_completion_status[tile_col] == 0) {
  268. num_jobs_remaining =
  269. vp9_get_job_queue_status(multi_thread_ctxt, tile_col);
  270. // Mark the completion to avoid checks during future switches across tiles
  271. if (num_jobs_remaining == 0) tile_completion_status[tile_col] = 1;
  272. if (num_jobs_remaining > max_num_jobs_remaining) {
  273. max_num_jobs_remaining = num_jobs_remaining;
  274. tile_id = tile_col;
  275. }
  276. }
  277. }
  278. if (-1 == tile_id) {
  279. return 1;
  280. } else {
  281. // Update the cur ID to the next tile ID that will be processed,
  282. // which will be the least processed tile
  283. *cur_tile_id = tile_id;
  284. return 0;
  285. }
  286. }