@@ -12,22 +12,31 @@ let apply_delta tbl = function
1212
1313let apply_deltas tbl deltas = List. iter (apply_delta tbl) deltas
1414
15+ (* * {1 Statistics} *)
16+
17+ type stats = {mutable updates_received : int ; mutable updates_emitted : int }
18+
19+ let create_stats () = {updates_received = 0 ; updates_emitted = 0 }
20+
1521(* * {1 Reactive Collection} *)
1622
1723type ('k, 'v) t = {
1824 subscribe : (('k , 'v ) delta -> unit ) -> unit ;
1925 iter : ('k -> 'v -> unit ) -> unit ;
2026 get : 'k -> 'v option ;
2127 length : unit -> int ;
28+ stats : stats ;
2229}
2330(* * A reactive collection that can emit deltas and be read.
24- All collections share this interface, enabling composition. *)
31+ All collections share this interface, enabling composition.
32+ [stats] tracks updates received/emitted for diagnostics. *)
2533
2634(* * {1 Collection operations} *)
2735
2836let iter f t = t.iter f
2937let get t k = t.get k
3038let length t = t.length ()
39+ let stats t = t.stats
3140
3241(* * {1 FlatMap} *)
3342
@@ -47,8 +56,12 @@ let flatMap (source : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
4756 in
4857 let target : ('k2, 'v2) Hashtbl.t = Hashtbl. create 256 in
4958 let subscribers : (('k2, 'v2) delta -> unit) list ref = ref [] in
59+ let my_stats = create_stats () in
5060
51- let emit delta = List. iter (fun h -> h delta) ! subscribers in
61+ let emit delta =
62+ my_stats.updates_emitted < - my_stats.updates_emitted + 1 ;
63+ List. iter (fun h -> h delta) ! subscribers
64+ in
5265
5366 let recompute_target k2 =
5467 match Hashtbl. find_opt contributions k2 with
@@ -102,6 +115,7 @@ let flatMap (source : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
102115 in
103116
104117 let handle_delta delta =
118+ my_stats.updates_received < - my_stats.updates_received + 1 ;
105119 let downstream =
106120 match delta with
107121 | Remove k1 ->
@@ -135,6 +149,7 @@ let flatMap (source : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
135149 iter = (fun f -> Hashtbl. iter f target);
136150 get = (fun k -> Hashtbl. find_opt target k);
137151 length = (fun () -> Hashtbl. length target);
152+ stats = my_stats;
138153 }
139154
140155(* * {1 Lookup} *)
@@ -147,10 +162,15 @@ let flatMap (source : ('k1, 'v1) t) ~f ?merge () : ('k2, 'v2) t =
147162let lookup (source : ('k, 'v) t ) ~key : ('k , 'v ) t =
148163 let current : ('k, 'v option) Hashtbl.t = Hashtbl. create 1 in
149164 let subscribers : (('k, 'v) delta -> unit) list ref = ref [] in
165+ let my_stats = create_stats () in
150166
151- let emit delta = List. iter (fun h -> h delta) ! subscribers in
167+ let emit delta =
168+ my_stats.updates_emitted < - my_stats.updates_emitted + 1 ;
169+ List. iter (fun h -> h delta) ! subscribers
170+ in
152171
153172 let handle_delta delta =
173+ my_stats.updates_received < - my_stats.updates_received + 1 ;
154174 match delta with
155175 | Set (k , v ) when k = key ->
156176 Hashtbl. replace current key (Some v);
@@ -188,6 +208,7 @@ let lookup (source : ('k, 'v) t) ~key : ('k, 'v) t =
188208 match Hashtbl. find_opt current key with
189209 | Some (Some _ ) -> 1
190210 | _ -> 0 );
211+ stats = my_stats;
191212 }
192213
193214(* * {1 Join} *)
@@ -221,8 +242,12 @@ let join (left : ('k1, 'v1) t) (right : ('k2, 'v2) t)
221242 in
222243 let target : ('k3, 'v3) Hashtbl.t = Hashtbl. create 256 in
223244 let subscribers : (('k3, 'v3) delta -> unit) list ref = ref [] in
245+ let my_stats = create_stats () in
224246
225- let emit delta = List. iter (fun h -> h delta) ! subscribers in
247+ let emit delta =
248+ my_stats.updates_emitted < - my_stats.updates_emitted + 1 ;
249+ List. iter (fun h -> h delta) ! subscribers
250+ in
226251
227252 let recompute_target k3 =
228253 match Hashtbl. find_opt contributions k3 with
@@ -326,6 +351,7 @@ let join (left : ('k1, 'v1) t) (right : ('k2, 'v2) t)
326351 in
327352
328353 let handle_left_delta delta =
354+ my_stats.updates_received < - my_stats.updates_received + 1 ;
329355 let downstream =
330356 match delta with
331357 | Set (k1 , v1 ) ->
@@ -337,6 +363,7 @@ let join (left : ('k1, 'v1) t) (right : ('k2, 'v2) t)
337363 in
338364
339365 let handle_right_delta delta =
366+ my_stats.updates_received < - my_stats.updates_received + 1 ;
340367 (* When right changes, reprocess all left entries that depend on it *)
341368 let downstream =
342369 match delta with
@@ -368,6 +395,7 @@ let join (left : ('k1, 'v1) t) (right : ('k2, 'v2) t)
368395 iter = (fun f -> Hashtbl. iter f target);
369396 get = (fun k -> Hashtbl. find_opt target k);
370397 length = (fun () -> Hashtbl. length target);
398+ stats = my_stats;
371399 }
372400
373401(* * {1 Union} *)
@@ -388,8 +416,12 @@ let union (left : ('k, 'v) t) (right : ('k, 'v) t) ?merge () : ('k, 'v) t =
388416 let right_values : ('k, 'v) Hashtbl.t = Hashtbl. create 64 in
389417 let target : ('k, 'v) Hashtbl.t = Hashtbl. create 128 in
390418 let subscribers : (('k, 'v) delta -> unit) list ref = ref [] in
419+ let my_stats = create_stats () in
391420
392- let emit delta = List. iter (fun h -> h delta) ! subscribers in
421+ let emit delta =
422+ my_stats.updates_emitted < - my_stats.updates_emitted + 1 ;
423+ List. iter (fun h -> h delta) ! subscribers
424+ in
393425
394426 let recompute_key k =
395427 match (Hashtbl. find_opt left_values k, Hashtbl. find_opt right_values k) with
@@ -406,6 +438,7 @@ let union (left : ('k, 'v) t) (right : ('k, 'v) t) ?merge () : ('k, 'v) t =
406438 in
407439
408440 let handle_left_delta delta =
441+ my_stats.updates_received < - my_stats.updates_received + 1 ;
409442 let downstream =
410443 match delta with
411444 | Set (k , v ) ->
@@ -419,6 +452,7 @@ let union (left : ('k, 'v) t) (right : ('k, 'v) t) ?merge () : ('k, 'v) t =
419452 in
420453
421454 let handle_right_delta delta =
455+ my_stats.updates_received < - my_stats.updates_received + 1 ;
422456 let downstream =
423457 match delta with
424458 | Set (k , v ) ->
@@ -448,6 +482,7 @@ let union (left : ('k, 'v) t) (right : ('k, 'v) t) ?merge () : ('k, 'v) t =
448482 iter = (fun f -> Hashtbl. iter f target);
449483 get = (fun k -> Hashtbl. find_opt target k);
450484 length = (fun () -> Hashtbl. length target);
485+ stats = my_stats;
451486 }
452487
453488(* * {1 Fixpoint} *)
@@ -840,8 +875,12 @@ let fixpoint ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () : ('k, unit) t
840875 =
841876 let state = Fixpoint. create () in
842877 let subscribers : (('k, unit) delta -> unit) list ref = ref [] in
878+ let my_stats = create_stats () in
843879
844- let emit delta = List. iter (fun h -> h delta) ! subscribers in
880+ let emit delta =
881+ my_stats.updates_emitted < - my_stats.updates_emitted + 1 ;
882+ List. iter (fun h -> h delta) ! subscribers
883+ in
845884
846885 let emit_changes (added , removed ) =
847886 List. iter (fun k -> emit (Set (k, () ))) added;
@@ -850,12 +889,14 @@ let fixpoint ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () : ('k, unit) t
850889
851890 (* Handle init deltas *)
852891 let handle_init_delta delta =
892+ my_stats.updates_received < - my_stats.updates_received + 1 ;
853893 let changes = Fixpoint. apply_init_delta state delta in
854894 emit_changes changes
855895 in
856896
857897 (* Handle edges deltas *)
858898 let handle_edges_delta delta =
899+ my_stats.updates_received < - my_stats.updates_received + 1 ;
859900 let changes = Fixpoint. apply_edges_delta state delta in
860901 emit_changes changes
861902 in
@@ -888,4 +929,5 @@ let fixpoint ~(init : ('k, unit) t) ~(edges : ('k, 'k list) t) () : ('k, unit) t
888929 iter = (fun f -> Hashtbl. iter f state.current);
889930 get = (fun k -> Hashtbl. find_opt state.current k);
890931 length = (fun () -> Hashtbl. length state.current);
932+ stats = my_stats;
891933 }
0 commit comments