@@ -65,6 +65,20 @@ def global_position(event_id)
6565
6666 protected
6767
68+ def find_event_id_in_stream ( specification_event_id , specification_stream_name )
69+ stream_entries
70+ . by_stream_and_event_id ( specification_stream_name , specification_event_id )
71+ . fetch ( :id )
72+ rescue ::ROM ::TupleCountMismatchError
73+ raise EventNotFound . new ( specification_event_id )
74+ end
75+
76+ def find_event_id_globally ( specification_event_id )
77+ events . by_event_id ( specification_event_id ) . one! . fetch ( :id )
78+ rescue ::ROM ::TupleCountMismatchError
79+ raise EventNotFound . new ( specification_event_id )
80+ end
81+
6882 def read_scope ( specification )
6983 direction = specification . forward? ? :forward : :backward
7084
@@ -73,20 +87,14 @@ def read_scope(specification)
7387 end
7488
7589 if specification . stream . global?
76- offset_entry_id = events . by_event_id ( specification . start ) . one! . fetch ( :id ) if specification . start
77- stop_entry_id = events . by_event_id ( specification . stop ) . one! . fetch ( :id ) if specification . stop
90+ offset_entry_id = find_event_id_globally ( specification . start ) if specification . start
91+ stop_entry_id = find_event_id_globally ( specification . stop ) if specification . stop
7892
7993 query = events . ordered ( direction , offset_entry_id , stop_entry_id , specification . time_sort_by )
8094 query = query . map_with ( :event_to_serialized_record , auto_struct : false )
8195 else
82- offset_entry_id =
83- stream_entries
84- . by_stream_and_event_id ( specification . stream , specification . start )
85- . fetch ( :id ) if specification . start
86- stop_entry_id =
87- stream_entries
88- . by_stream_and_event_id ( specification . stream , specification . stop )
89- . fetch ( :id ) if specification . stop
96+ offset_entry_id = find_event_id_in_stream ( specification . start , specification . stream ) if specification . start
97+ stop_entry_id = find_event_id_in_stream ( specification . stop , specification . stream ) if specification . stop
9098
9199 query =
92100 stream_entries . ordered (
0 commit comments