Skip to content

Commit ed29f4d

Browse files
committed
Use batch processing for ReactiveFileCollection
Process all files as a single Batch delta instead of individual deltas. This dramatically reduces the number of updates flowing through reactive combinators during bulk loading. Key changes: - Add process_files_batch to ReactiveFileCollection (emits single Batch) - Add process_file_silent helper (processes without emitting) - Update ReactiveAnalysis.process_files to use batch processing Performance improvement on deadcode-benchmark: - Before: fixpoint recv=601,563 updates - After: fixpoint recv=13 updates (batches) This means downstream combinators process all file data together instead of handling 600K+ individual updates one at a time.
1 parent 2f6ef60 commit ed29f4d

File tree

5 files changed

+50
-33
lines changed

5 files changed

+50
-33
lines changed

analysis/reactive/src/Reactive.ml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,9 +487,7 @@ let join (left : ('k1, 'v1) t) (right : ('k2, 'v2) t)
487487
| Batch entries ->
488488
(* Collect all affected right keys, then process *)
489489
let right_keys =
490-
entries
491-
|> List.map (fun (k, _) -> k)
492-
|> List.sort_uniq compare
490+
entries |> List.map (fun (k, _) -> k) |> List.sort_uniq compare
493491
in
494492
let all_downstream = right_keys |> List.concat_map process_right_key in
495493
(* Deduplicate *)
@@ -1104,8 +1102,7 @@ let fixpoint ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () : ('k, unit) t
11041102
(* Net changes: added if in added_set but not removed_set, etc. *)
11051103
let net_added =
11061104
Hashtbl.fold
1107-
(fun k () acc ->
1108-
if Hashtbl.mem removed_set k then acc else k :: acc)
1105+
(fun k () acc -> if Hashtbl.mem removed_set k then acc else k :: acc)
11091106
added_set []
11101107
in
11111108
let net_removed =
@@ -1144,8 +1141,7 @@ let fixpoint ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () : ('k, unit) t
11441141
List.iter (fun k -> Hashtbl.replace removed_set k ()) !all_removed;
11451142
let net_added =
11461143
Hashtbl.fold
1147-
(fun k () acc ->
1148-
if Hashtbl.mem removed_set k then acc else k :: acc)
1144+
(fun k () acc -> if Hashtbl.mem removed_set k then acc else k :: acc)
11491145
added_set []
11501146
in
11511147
let net_removed =

analysis/reactive/src/ReactiveFileCollection.ml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,31 @@ let process_if_changed t path =
7070
emit t (Reactive.Set (path, value));
7171
true (* changed *)
7272

73-
(** Process multiple files *)
73+
(** Process multiple files (emits individual deltas) *)
7474
let process_files t paths =
7575
List.iter (fun path -> ignore (process_if_changed t path)) paths
7676

77+
(** Process a file without emitting. Returns batch entry if changed. *)
78+
let process_file_silent t path =
79+
let new_id = get_file_id path in
80+
match Hashtbl.find_opt t.internal.cache path with
81+
| Some (old_id, _) when not (file_changed ~old_id ~new_id) ->
82+
None (* unchanged *)
83+
| _ ->
84+
let raw = t.internal.read_file path in
85+
let value = t.internal.process path raw in
86+
Hashtbl.replace t.internal.cache path (new_id, value);
87+
Some (Reactive.set path value)
88+
89+
(** Process multiple files and emit as a single batch.
90+
More efficient than process_files when processing many files at once. *)
91+
let process_files_batch t paths =
92+
let entries =
93+
paths |> List.filter_map (fun path -> process_file_silent t path)
94+
in
95+
if entries <> [] then emit t (Reactive.Batch entries);
96+
List.length entries
97+
7798
(** Remove a file *)
7899
let remove t path =
79100
Hashtbl.remove t.internal.cache path;

analysis/reactive/src/ReactiveFileCollection.mli

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ val to_collection : ('raw, 'v) t -> (string, 'v) Reactive.t
4040
(** {1 Processing} *)
4141

4242
val process_files : ('raw, 'v) t -> string list -> unit
43-
(** Process files, emitting deltas for changed files. *)
43+
(** Process files, emitting individual deltas for each changed file. *)
44+
45+
val process_files_batch : ('raw, 'v) t -> string list -> int
46+
(** Process files, emitting a single [Batch] delta with all changes.
47+
Returns the number of files that changed.
48+
More efficient than [process_files] when processing many files at once,
49+
as downstream combinators can process all changes together. *)
4450

4551
val process_if_changed : ('raw, 'v) t -> string -> bool
4652
(** Process a file if changed. Returns true if file was processed. *)

analysis/reactive/test/ReactiveTest.ml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1759,13 +1759,7 @@ let test_batch_flatmap () =
17591759
| Remove k -> received_entries := [(k, None)] @ !received_entries);
17601760

17611761
(* Send a batch *)
1762-
emit
1763-
(Batch
1764-
[
1765-
Reactive.set "a" 1;
1766-
Reactive.set "b" 2;
1767-
Reactive.set "c" 3;
1768-
]);
1762+
emit (Batch [Reactive.set "a" 1; Reactive.set "b" 2; Reactive.set "c" 3]);
17691763

17701764
Printf.printf "Received batches: %d, entries: %d\n" !received_batches
17711765
(List.length !received_entries);

analysis/reanalyze/src/ReactiveAnalysis.ml

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -74,27 +74,27 @@ let create ~config : t =
7474
process_cmt_infos ~config ~cmtFilePath:path cmt_infos)
7575

7676
(** Process all files incrementally using ReactiveFileCollection.
77-
First run processes all files. Subsequent runs only process changed files. *)
77+
First run processes all files. Subsequent runs only process changed files.
78+
Uses batch processing to emit all changes as a single Batch delta. *)
7879
let process_files ~(collection : t) ~config:_ cmtFilePaths : all_files_result =
7980
Timing.time_phase `FileLoading (fun () ->
80-
let processed = ref 0 in
81-
let from_cache = ref 0 in
82-
83-
(* Add/update all files in the collection *)
84-
cmtFilePaths
85-
|> List.iter (fun cmtFilePath ->
86-
let was_in_collection =
87-
ReactiveFileCollection.mem collection cmtFilePath
88-
in
89-
let changed =
90-
ReactiveFileCollection.process_if_changed collection cmtFilePath
91-
in
92-
if changed then incr processed
93-
else if was_in_collection then incr from_cache);
81+
let total_files = List.length cmtFilePaths in
82+
let cached_before =
83+
cmtFilePaths
84+
|> List.filter (fun p -> ReactiveFileCollection.mem collection p)
85+
|> List.length
86+
in
87+
88+
(* Process all files as a batch - emits single Batch delta *)
89+
let processed =
90+
ReactiveFileCollection.process_files_batch collection cmtFilePaths
91+
in
92+
let from_cache = total_files - processed in
9493

9594
if !Cli.timing then
96-
Printf.eprintf "Reactive: %d files processed, %d from cache\n%!"
97-
!processed !from_cache;
95+
Printf.eprintf
96+
"Reactive: %d files processed, %d from cache (was cached: %d)\n%!"
97+
processed from_cache cached_before;
9898

9999
(* Collect results from the collection *)
100100
let dce_data_list = ref [] in

0 commit comments

Comments
 (0)