Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 95 additions & 99 deletions imap_processing/lo/l1b/lo_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -1450,112 +1450,26 @@ def resweep_histogram_data(
3D array of exposure factors (epoch, azimuth, esa_step) indicating how many
ESA steps were reswept during resweeping.
"""
# The sweep table contains the mapping of dates to the LUT table which shows how
# the ESA steps should be reswept.
sweep_df = lo_ancillary.read_ancillary_file(
next(str(s) for s in anc_dependencies if "sweep-table" in str(s))
)
lut_df = lo_ancillary.read_ancillary_file(
next(str(s) for s in anc_dependencies if "esa-mode-lut" in str(s))
)

# Get the time information to compare the epochs to the sweep table dates
sweep_dates = sweep_df["Date"].astype(str)
epochs = l1b_histrates["epoch"].values
epoch_utc = et_to_utc(ttj2000ns_to_et(epochs))
energy_mapping = _get_esa_level_indices(epochs, anc_dependencies=anc_dependencies)

# initialize the reswept counts arrays
h_counts_reswept = np.zeros_like(l1b_histrates["h_counts"].values)
o_counts_reswept = np.zeros_like(l1b_histrates["o_counts"].values)
exposure_factor = np.zeros_like(h_counts_reswept, dtype=int)

# Get the number of azimuth bins from the l1b_histrates dataset
num_azimuth = l1b_histrates.sizes["spin_bin_6"]
# initialize exposure factor to 1 as this will be used to scale (multiply)
# the exposure time later
exposure_factor = np.full(
(len(epochs), l1b_histrates.sizes["esa_step"], num_azimuth), 1, dtype=int
# Place potentially multiple esa_steps into the same energy level bin
np.add.at(
h_counts_reswept,
(slice(None), energy_mapping, slice(None)),
l1b_histrates["h_counts"].values,
)

for epoch_idx, epoch in enumerate(epoch_utc):
# Get only the date portion of the epoch string for comparison with the
# sweep table
epoch_date_only = epoch.split("T")[0]

# if the epoch dat is not in the sweep table, raise an error
if epoch_date_only not in sweep_dates.values:
raise ValueError(
f"No sweep table entry found for date {epoch} at epoch idx {epoch_idx}"
)

# Get the matching sweep table entry for the epoch date and its LUT table index
matching_sweep = sweep_df[sweep_dates == epoch_date_only]
unique_lut_tables = matching_sweep["LUT_table"].unique()

# There should only be one unique LUT table for each date
if len(unique_lut_tables) != 1:
raise ValueError(
f"Expected exactly 1 unique LUT_table value for date {epoch_date_only},"
f" but found {len(unique_lut_tables)}: {unique_lut_tables}"
)

# Get the LUT entries for the identified LUT table index
lut_table_idx = unique_lut_tables[0]
lut_entries = lut_df[lut_df["Tbl_Idx"] == lut_table_idx].copy()

# If there are no LUT entries for the identified LUT table, log a warning
# and skip resweeping for this epoch
if len(lut_entries) == 0:
logger.warning(f"No LUT entries found for table index {lut_table_idx}")
h_counts_reswept[epoch_idx] = l1b_histrates["h_counts"].values[epoch_idx]
o_counts_reswept[epoch_idx] = l1b_histrates["o_counts"].values[epoch_idx]
continue

# Sort the LUT entries by E-Step_Idx to ensure correct mapping order
lut_entries = lut_entries.sort_values("E-Step_Idx")

# Create a mapping of original ESA step index to true ESA step
energy_step_mapping = {}
# Loop through the LUT entries and populate the mapping
for _, row in lut_entries.iterrows():
# Original ESA step index is 1-based, convert to 0-based
esa_idx = int(row["E-Step_Idx"]) - 1
# True ESA step is 1-based
true_esa_step = int(row["E-Step_lvl"])
# Populate the mapping
energy_step_mapping[esa_idx] = true_esa_step

# TODO: Change all instances of azimuth to spin bin
# Resweep the counts for each spin bin using the energy step mapping
for az_idx in range(num_azimuth):
h_original = l1b_histrates["h_counts"].values[epoch_idx, :, az_idx]
o_original = l1b_histrates["o_counts"].values[epoch_idx, :, az_idx]

# Loop through the original ESA step indices and map to the true ESA steps
for orig_idx, true_esa_step in energy_step_mapping.items():
# Check that the original index and true ESA step are within bounds
if orig_idx < len(h_original) and 1 <= true_esa_step <= 7:
# Resweep the counts into the true ESA step
# (convert to 0-based index)
reswept_idx = true_esa_step - 1
h_counts_reswept[epoch_idx, reswept_idx, az_idx] += h_original[
orig_idx
]
o_counts_reswept[epoch_idx, reswept_idx, az_idx] += o_original[
orig_idx
]
# If a reswept was needed for this index, increment the exposure
# factor to so the exposure time can be scaled accordingly
if orig_idx != reswept_idx:
exposure_factor[epoch_idx, reswept_idx, az_idx] += 1

else:
logger.warning(
f"Original ESA index {orig_idx} or "
f"true ESA step {true_esa_step}"
f" out of bounds at epoch idx {epoch_idx}, "
f"spin bin idx {az_idx}"
)

np.add.at(
o_counts_reswept,
(slice(None), energy_mapping, slice(None)),
l1b_histrates["o_counts"].values,
)
np.add.at(exposure_factor, (slice(None), energy_mapping, slice(None)), 1)
l1b_histrates["h_counts"].values = h_counts_reswept
l1b_histrates["o_counts"].values = o_counts_reswept
l1b_histrates.attrs["energy_step_correction"] = (
Expand Down Expand Up @@ -1650,3 +1564,85 @@ def calculate_histogram_rates(
)

return l1b_histrates


def _get_esa_level_indices(epochs: np.ndarray, anc_dependencies: list) -> np.ndarray:
"""
Get the ESA level indices (reswept indices) for the given epochs.

This will always return a 7-element array mapping the original ESA step
indices (0-6) to the true ESA levels after resweeping. i.e. we could have
taken two measurements in a row at the same energy level, so the mapping
would be [0, 0, 1, 1, 2, 2, 3] potentially. The nominal stepping is
[0, 1, 2, 3, 4, 5, 6].

Parameters
----------
epochs : np.ndarray
Array of epochs in TTJ2000ns format.
anc_dependencies : list
List of ancillary file paths.

Returns
-------
esa_level_indices : np.ndarray
Array of ESA level indices for each epoch.
"""
# The sweep table contains the mapping of dates to the LUT table which shows how
# the ESA steps should be reswept.
sweep_df = lo_ancillary.read_ancillary_file(
next(str(s) for s in anc_dependencies if "sweep-table" in str(s))
)
lut_df = lo_ancillary.read_ancillary_file(
next(str(s) for s in anc_dependencies if "esa-mode-lut" in str(s))
)

# Get the time information to compare the epochs to the sweep table dates
sweep_dates = sweep_df["Date"].astype(str)
# Get only the date portion of the epoch string for comparison with the sweep table
# NOTE: We only use the first epoch here since the LUT mapping should be
# constant through the entire dataset
epoch_date_only = et_to_utc(ttj2000ns_to_et(epochs[0])).split("T")[0]

# Get the matching sweep table entry for the epoch date and its LUT table index
matching_sweep = sweep_df[sweep_dates == epoch_date_only]
# if the epoch date is not in the sweep table, raise an error
if len(matching_sweep) == 0:
raise ValueError(f"No sweep table entry found for date {epoch_date_only}")

unique_lut_tables = matching_sweep["LUT_table"].unique()

# There should only be one unique LUT table for each date
if len(unique_lut_tables) != 1:
raise ValueError(
f"Expected exactly 1 unique LUT_table value for date {epoch_date_only},"
f" but found {len(unique_lut_tables)}: {unique_lut_tables}"
)

# Get the LUT entries for the identified LUT index
lut_table_idx = unique_lut_tables[0]
lut_entries = lut_df[lut_df["Tbl_Idx"] == lut_table_idx].copy()

# If there are no LUT entries for the identified LUT table, log a warning
# and return the default mapping
if len(lut_entries) == 0:
logger.warning(f"No LUT entries found for table index {lut_table_idx}")
return np.arange(7)

# Sort the LUT entries by E-Step_Idx to ensure correct mapping order
lut_entries = lut_entries.sort_values("E-Step_Idx")

# TODO: It seems like this is also given to us in the main sweep table
# Can we just take the last 7 entries of the sweep table for that
# date and use those values instead of this extra work with the
# separate LUT ancillary file?
energy_step_mapping = np.zeros(7, dtype=int)
# Loop through the LUT entries and populate the mapping
for _, row in lut_entries.iterrows():
# Original ESA step index is 1-based, convert to 0-based
esa_idx = int(row["E-Step_Idx"]) - 1
true_esa_step = int(row["E-Step_lvl"]) - 1
# Populate the mapping
energy_step_mapping[esa_idx] = true_esa_step

return energy_step_mapping
10 changes: 8 additions & 2 deletions imap_processing/tests/lo/test_lo_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,12 @@ def test_resweep_histogram_success(anc_dependencies):
"spin_bin_6": np.arange(60),
},
)
# Exposure factor is how many times we saw each ESA step during the sweep
# The first ESA level was repeated twice during the sweep and the second
# ESA level was skipped entirely. [1, 1, 3, 4, 5, 6, 7]
exposure_factor_expected = np.full((2, 7, 60), 1)
exposure_factor_expected[:, 0, :] = 2
exposure_factor_expected[:, 1, :] = 0

l1b_histrate.h_counts[0, 0, 0] = 5
l1b_histrate.h_counts[0, 1, 0] = 10
Expand All @@ -835,6 +839,9 @@ def test_resweep_histogram_success(anc_dependencies):
assert l1b_histrates.o_counts[1, 2, 0] == 4

assert np.array_equal(exposure_factor, exposure_factor_expected)
# Sanity check on making sure we get 7 ESA levels for each sweep
# regardless of which bin they are in
np.testing.assert_equal(np.sum(exposure_factor, axis=1), 7)


def test_resweep_histogram_no_date(anc_dependencies):
Expand All @@ -860,8 +867,7 @@ def test_resweep_histogram_no_date(anc_dependencies):

with pytest.raises(
ValueError,
match="No sweep table entry found for date "
"2025-04-25T02:00:00.000 at epoch idx 0",
match="No sweep table entry found for date 2025-04-25",
):
resweep_histogram_data(l1b_histrate, anc_dependencies)

Expand Down