|
| 1 | +use rayon::{ |
| 2 | + prelude::ParallelIterator, |
| 3 | + slice::{ParallelSlice, ParallelSliceMut}, |
| 4 | +}; |
| 5 | + |
1 | 6 | use crate::{ |
2 | 7 | asyncjob::{AsyncJob, RunParams}, |
3 | 8 | error::Result, |
4 | 9 | sync::{self, CommitId, RepoPath, SharedCommitFilterFn}, |
5 | 10 | AsyncGitNotification, ProgressPercent, |
6 | 11 | }; |
7 | 12 | use std::{ |
8 | | - sync::{Arc, Mutex}, |
| 13 | + sync::{atomic::AtomicUsize, Arc, Mutex}, |
9 | 14 | time::{Duration, Instant}, |
10 | 15 | }; |
11 | 16 |
|
@@ -69,41 +74,73 @@ impl AsyncCommitFilterJob { |
69 | 74 | commits: Vec<CommitId>, |
70 | 75 | params: &RunParams<AsyncGitNotification, ProgressPercent>, |
71 | 76 | ) -> JobState { |
72 | | - let response = sync::repo(repo_path) |
73 | | - .map(|repo| self.filter_commits(&repo, commits, params)) |
| 77 | + let result = self |
| 78 | + .filter_commits(repo_path, commits, params) |
74 | 79 | .map(|(start, result)| CommitFilterResult { |
75 | 80 | result, |
76 | 81 | duration: start.elapsed(), |
77 | 82 | }); |
78 | 83 |
|
79 | | - JobState::Response(response) |
| 84 | + JobState::Response(result) |
80 | 85 | } |
81 | 86 |
|
82 | 87 | fn filter_commits( |
83 | 88 | &self, |
84 | | - repo: &git2::Repository, |
| 89 | + repo_path: &RepoPath, |
85 | 90 | commits: Vec<CommitId>, |
86 | 91 | params: &RunParams<AsyncGitNotification, ProgressPercent>, |
87 | | - ) -> (Instant, Vec<CommitId>) { |
| 92 | + ) -> Result<(Instant, Vec<CommitId>)> { |
88 | 93 | let total_amount = commits.len(); |
89 | 94 | let start = Instant::now(); |
90 | 95 |
|
91 | | - let result = commits |
92 | | - .into_iter() |
93 | | - .enumerate() |
94 | | - .filter_map(|(idx, c)| { |
95 | | - Self::update_progress( |
96 | | - params, |
97 | | - ProgressPercent::new(idx, total_amount), |
98 | | - ); |
99 | | - |
100 | | - (*self.filter)(repo, &c) |
101 | | - .ok() |
102 | | - .and_then(|res| res.then_some(c)) |
103 | | - }) |
104 | | - .collect::<Vec<_>>(); |
105 | | - |
106 | | - (start, result) |
| 96 | + //note: for some reason >4 threads degrades search performance |
| 97 | + let pool = |
| 98 | + rayon::ThreadPoolBuilder::new().num_threads(4).build()?; |
| 99 | + |
| 100 | + let idx = AtomicUsize::new(0); |
| 101 | + |
| 102 | + let mut result = pool.install(|| { |
| 103 | + commits |
| 104 | + .into_iter() |
| 105 | + .enumerate() |
| 106 | + .collect::<Vec<(usize, CommitId)>>() |
| 107 | + .par_chunks(1000) |
| 108 | + .filter_map(|c| { |
| 109 | + //TODO: error log repo open errors |
| 110 | + sync::repo(repo_path).ok().map(|repo| { |
| 111 | + c.iter() |
| 112 | + .filter_map(|(e, c)| { |
| 113 | + let idx = idx.fetch_add( |
| 114 | + 1, |
| 115 | + std::sync::atomic::Ordering::Relaxed, |
| 116 | + ); |
| 117 | + |
| 118 | + Self::update_progress( |
| 119 | + params, |
| 120 | + ProgressPercent::new( |
| 121 | + idx, |
| 122 | + total_amount, |
| 123 | + ), |
| 124 | + ); |
| 125 | + |
| 126 | + (*self.filter)(&repo, c) |
| 127 | + .ok() |
| 128 | + .and_then(|res| { |
| 129 | + res.then_some((*e, *c)) |
| 130 | + }) |
| 131 | + }) |
| 132 | + .collect::<Vec<_>>() |
| 133 | + }) |
| 134 | + }) |
| 135 | + .flatten() |
| 136 | + .collect::<Vec<_>>() |
| 137 | + }); |
| 138 | + |
| 139 | + result.par_sort_by(|a, b| a.0.cmp(&b.0)); |
| 140 | + |
| 141 | + let result = result.into_iter().map(|c| c.1).collect(); |
| 142 | + |
| 143 | + Ok((start, result)) |
107 | 144 | } |
108 | 145 |
|
109 | 146 | fn update_progress( |
|
0 commit comments