Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 22 additions & 88 deletions StatTools/analysis/dpcca.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def dpcca_worker(
arr: Union[np.ndarray, None],
step: float,
pd: int,
buffer_in_use: bool,
gc_params: tuple = None,
short_vectors=False,
n_integral=1,
Expand All @@ -76,36 +75,33 @@ def dpcca_worker(
gc.set_threshold(10, 2, 2)
s_current = [s] if not isinstance(s, Iterable) else s

if buffer_in_use:
cumsum_arr = SharedBuffer.get("ARR")
else:
cumsum_arr = arr
for _ in range(n_integral):
cumsum_arr = np.cumsum(cumsum_arr, axis=1)
cumsum_arr = arr
for _ in range(n_integral):
Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Эту логику имеет смысл вынести во внешнюю функцию: смысла повторно интегрировать для каждого окна нет

cumsum_arr = np.cumsum(cumsum_arr, axis=1)

shape = cumsum_arr.shape if buffer_in_use else arr.shape
shape = arr.shape

F = np.zeros((len(s_current), shape[0], shape[0]), dtype=float)
R = np.zeros((len(s_current), shape[0], shape[0]), dtype=float)
P = np.zeros((len(s_current), shape[0], shape[0]), dtype=float)

for s_i, s_val in enumerate(s_current):

V = np.arange(0, shape[1] - s_val, int(step * s_val))
V = np.arange(0, shape[1] - s_val + 1, int(step * s_val))
Xw = np.arange(s_val, dtype=int)
Y = np.zeros((shape[0], len(V)), dtype=object)

signal_view = np.lib.stride_tricks.sliding_window_view(
cumsum_arr, s_val, axis=1
)
signal_view = signal_view[:, :: int(step * s_val)]
for n in range(cumsum_arr.shape[0]):
for v_i, v in enumerate(V):
W = cumsum_arr[n][v : v + s_val]
for m_i, W in enumerate(signal_view[n]):
if len(W) == 0:
print(f"\tFor s = {s_val} W is an empty slice!")
return P, R, F

p = np.polyfit(Xw, W, deg=pd)
Z = np.polyval(p, Xw)
Y[n][v_i] = Z - W

Y[n][m_i] = Z - W
if gc_params is not None:
if n % gc_params[0] == 0:
gc.collect(gc_params[1])
Expand All @@ -119,41 +115,6 @@ def dpcca_worker(
return P, R, F


def start_pool_with_buffer(
buffer: SharedBuffer,
processes: int,
s_by_workers: np.ndarray,
pd: int,
step: float,
gc_params: tuple = None,
n_integral=1,
):

for _ in range(n_integral):
buffer.apply_in_place(np.cumsum, by_1st_dim=True)

with closing(
Pool(
processes=processes,
initializer=buffer.buffer_init,
initargs=({"ARR": buffer},),
)
) as pool:
pool_result = pool.map(
partial(
dpcca_worker,
arr=None,
pd=pd,
step=step,
buffer_in_use=True,
gc_params=gc_params,
),
s_by_workers,
)

return pool_result


def concatenate_3d_matrices(p: np.ndarray, r: np.ndarray, f: np.ndarray):
P = np.concatenate(p, axis=1)[0]
R = np.concatenate(r, axis=1)[0]
Expand Down Expand Up @@ -191,15 +152,14 @@ def dpcca(
step (float): share of S - value. It's set usually as 0.5. The integer part of the number will be taken
s (Union[int, Iterable]): points where fluctuation function F(s) is calculated. More on that in the article.
processes (int, optional): num of workers to spawn. Defaults to 1.
buffer (Union[bool, SharedBuffer], optional): allows to share input array between processes. Defaults to False.
buffer (Union[bool, SharedBuffer], optional): Deprecated. Do not considered. Defaults to False.
gc_params (tuple, optional): _description_. Defaults to None.
short_vectors (bool, optional): _description_. Defaults to False.
n_integral (int, optional): Number of cumsum operation before computation. Defaults to 1.

Raises:
ValueError: All input S values are larger than vector shape / 4.
ValueError: Cannot use S > L / 4.
ValueError: Wrong type of input buffer, if buffer is not SharedBuffer

Returns:
tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: [P, R, F^2, S], where
Expand All @@ -217,7 +177,6 @@ def dpcca(
arr,
step,
pd,
buffer_in_use=False,
gc_params=gc_params,
short_vectors=True,
n_integral=n_integral,
Expand Down Expand Up @@ -249,7 +208,6 @@ def dpcca(
arr,
step,
pd,
buffer_in_use=False,
gc_params=gc_params,
n_integral=n_integral,
)
Expand All @@ -263,42 +221,18 @@ def dpcca(
S = np.array(s, dtype=int) if not isinstance(s, np.ndarray) else s
S_by_workers = np.array_split(S, processes)

if isinstance(buffer, bool):
if buffer:
shared_input = SharedBuffer(arr.shape, c_double)
shared_input.write(arr)

pool_result = start_pool_with_buffer(
shared_input,
processes,
S_by_workers,
pd,
step,
gc_params,
with closing(Pool(processes=processes)) as pool:
pool_result = pool.map(
partial(
dpcca_worker,
arr=arr,
pd=pd,
step=step,
gc_params=gc_params,
n_integral=n_integral,
)

else:
with closing(Pool(processes=processes)) as pool:
pool_result = pool.map(
partial(
dpcca_worker,
arr=arr,
pd=pd,
step=step,
buffer_in_use=False,
gc_params=gc_params,
n_integral=n_integral,
),
S_by_workers,
)

elif isinstance(buffer, SharedBuffer):
pool_result = start_pool_with_buffer(
buffer, processes, S_by_workers, pd, step, gc_params, n_integral=n_integral
),
S_by_workers,
)
else:
raise ValueError("Wrong type of input buffer!")

P, R, F = np.array([]), np.array([]), np.array([])

Expand Down