Skip to content

Commit a2e15dd

Browse files
authored
Implements a first version of sort_values. (#24)
* Implements a first version of sort_values. * fix sorting with nan values
1 parent 0fe6e8a commit a2e15dd

File tree

3 files changed

+180
-11
lines changed

3 files changed

+180
-11
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
@brief test log(time=4s)
4+
"""
5+
import os
6+
import unittest
7+
import pandas
8+
from pyquickhelper.pycode import ExtTestCase, get_temp_folder
9+
from pandas_streaming.df import StreamingDataFrame
10+
11+
12+
class TestDataFrameSort(ExtTestCase):
13+
14+
def test_sort_values(self):
15+
temp = get_temp_folder(__file__, "temp_sort_values")
16+
name = os.path.join(temp, "_data_")
17+
df = pandas.DataFrame([dict(a=1, b="eé", c=5.6, ind="a1", ai=1),
18+
dict(a=5, b="f", c=5.7, ind="a2", ai=2),
19+
dict(a=4, b="g", ind="a3", ai=3),
20+
dict(a=8, b="h", c=5.9, ai=4),
21+
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
22+
sdf = StreamingDataFrame.read_df(df, chunksize=2)
23+
sorted_df = df.sort_values(by="a")
24+
res = sdf.sort_values(by="a", temp_file=name)
25+
res_df = res.to_df()
26+
self.assertEqualDataFrame(sorted_df, res_df)
27+
28+
def test_sort_values_twice(self):
29+
temp = get_temp_folder(__file__, "temp_sort_values_twice")
30+
name = os.path.join(temp, "_data_")
31+
df = pandas.DataFrame([dict(a=1, b="eé", c=5.6, ind="a1", ai=1),
32+
dict(a=5, b="f", c=5.7, ind="a2", ai=2),
33+
dict(a=4, b="g", ind="a3", ai=3),
34+
dict(a=8, b="h", c=5.9, ai=4),
35+
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
36+
sdf = StreamingDataFrame.read_df(df, chunksize=2)
37+
sorted_df = df.sort_values(by="a")
38+
res = sdf.sort_values(by="a", temp_file=name)
39+
res_df = res.to_df()
40+
self.assertEqualDataFrame(sorted_df, res_df)
41+
res_df = res.to_df()
42+
self.assertEqualDataFrame(sorted_df, res_df)
43+
44+
def test_sort_values_reverse(self):
45+
temp = get_temp_folder(__file__, "temp_sort_values_reverse")
46+
name = os.path.join(temp, "_data_")
47+
df = pandas.DataFrame([dict(a=1, b="eé", c=5.6, ind="a1", ai=1),
48+
dict(a=5, b="f", c=5.7, ind="a2", ai=2),
49+
dict(a=4, b="g", ind="a3", ai=3),
50+
dict(a=8, b="h", c=5.9, ai=4),
51+
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
52+
sdf = StreamingDataFrame.read_df(df, chunksize=2)
53+
sorted_df = df.sort_values(by="a", ascending=False)
54+
res = sdf.sort_values(by="a", temp_file=name, ascending=False)
55+
res_df = res.to_df()
56+
self.assertEqualDataFrame(sorted_df, res_df)
57+
58+
def test_sort_values_nan_last(self):
59+
temp = get_temp_folder(__file__, "temp_sort_values_nan_last")
60+
name = os.path.join(temp, "_data_")
61+
df = pandas.DataFrame([dict(a=1, b="eé", c=5.6, ind="a1", ai=1),
62+
dict(b="f", c=5.7, ind="a2", ai=2),
63+
dict(b="f", c=5.8, ind="a2", ai=2),
64+
dict(a=4, b="g", ind="a3", ai=3),
65+
dict(a=8, b="h", c=5.9, ai=4),
66+
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
67+
sdf = StreamingDataFrame.read_df(df, chunksize=2)
68+
sorted_df = df.sort_values(by="a", na_position='last')
69+
res = sdf.sort_values(by="a", temp_file=name, na_position='last')
70+
res_df = res.to_df()
71+
self.assertEqualDataFrame(sorted_df, res_df)
72+
73+
def test_sort_values_nan_first(self):
74+
temp = get_temp_folder(__file__, "temp_sort_values_nan_first")
75+
name = os.path.join(temp, "_data_")
76+
df = pandas.DataFrame([dict(a=1, b="eé", c=5.6, ind="a1", ai=1),
77+
dict(b="f", c=5.7, ind="a2", ai=2),
78+
dict(b="f", c=5.8, ind="a2", ai=2),
79+
dict(a=4, b="g", ind="a3", ai=3),
80+
dict(a=8, b="h", c=5.9, ai=4),
81+
dict(a=16, b="i", c=6.2, ind="a5", ai=5)])
82+
sdf = StreamingDataFrame.read_df(df, chunksize=2)
83+
sorted_df = df.sort_values(by="a", na_position='first')
84+
res = sdf.sort_values(by="a", temp_file=name, na_position='first')
85+
res_df = res.to_df()
86+
self.assertEqualDataFrame(sorted_df, res_df)
87+
88+
89+
if __name__ == "__main__":
90+
unittest.main()

_unittests/ut_df/test_streaming_dataframe.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import numpy
1010
from pyquickhelper.pycode import ExtTestCase, get_temp_folder
1111
from pandas_streaming.data import dummy_streaming_dataframe
12-
from pandas_streaming.exc import StreamingInefficientException
1312
from pandas_streaming.df import StreamingDataFrame
1413
from pandas_streaming.df.dataframe import StreamingDataFrameSchemaError
1514

@@ -23,8 +22,6 @@ def test_shape(self):
2322
self.assertEqual(len(dfs), 10)
2423
shape = sdf.shape
2524
self.assertEqual(shape, (100, 2))
26-
self.assertRaise(lambda: sdf.sort_values(
27-
"r"), StreamingInefficientException)
2825

2926
def test_init(self):
3027
sdf = dummy_streaming_dataframe(100)
@@ -557,5 +554,4 @@ def test_set_item_function(self):
557554

558555

559556
if __name__ == "__main__":
560-
# TestStreamingDataFrame().test_describe()
561557
unittest.main()

pandas_streaming/df/dataframe.py

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
@file
44
@brief Defines a streaming dataframe.
55
"""
6+
import pickle
7+
import os
68
from io import StringIO, BytesIO
79
from inspect import isfunction
810
import numpy
911
import numpy.random as nrandom
1012
import pandas
1113
from pandas.testing import assert_frame_equal
1214
from pandas.io.json import json_normalize
13-
from ..exc import StreamingInefficientException
1415
from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming
1516
from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
1617

@@ -67,6 +68,7 @@ class StreamingDataFrame:
6768
"""
6869

6970
def __init__(self, iter_creation, check_schema=True, stable=True):
71+
self._delete_ = []
7072
if isinstance(iter_creation, (pandas.DataFrame, dict,
7173
numpy.ndarray, str)):
7274
raise TypeError(
@@ -407,12 +409,6 @@ def __iter__(self):
407409
rows += it.shape[0]
408410
yield it
409411

410-
def sort_values(self, *args, **kwargs):
411-
"""
412-
Not implemented.
413-
"""
414-
raise StreamingInefficientException(StreamingDataFrame.sort_values)
415-
416412
@property
417413
def shape(self):
418414
"""
@@ -1118,6 +1114,93 @@ def describe(self, percentiles=None, include=None, exclude=None,
11181114
summary = summary.loc[rows, :]
11191115
return pandas.concat([merged, summary])
11201116

1117+
def sort_values(self, by, axis=0, ascending=True, kind='quicksort',
1118+
na_position='last',
1119+
temp_file='_pandas_streaming_sort_values_'):
1120+
"""
1121+
Sorts the streaming dataframe by values.
1122+
1123+
:param by: one column
1124+
:param ascending: order
1125+
:param kind: see :meth:`pandas.DataFrame.sort_values`
1126+
:param na_position: see :meth:`pandas.DataFrame.sort_values`
1127+
:param temp_file: sorting a whole database is impossible
1128+
without storing intermediate results on disk
1129+
unless it can fit into the memory, but in that case,
1130+
it is easier to convert the streaming database into
1131+
a dataframe and sort it
1132+
:return: streaming database
1133+
"""
1134+
if not isinstance(by, str):
1135+
raise NotImplementedError(
1136+
"Only one column can be used to sort not %r." % by)
1137+
keys = {}
1138+
nans = []
1139+
indices = []
1140+
with open(temp_file, 'wb') as f:
1141+
for df in self:
1142+
dfs = df.sort_values(by, ascending=ascending, kind=kind,
1143+
na_position=na_position)
1144+
for tu in dfs[by]:
1145+
if isinstance(tu, float) and numpy.isnan(tu):
1146+
nans.append(len(indices))
1147+
else:
1148+
if tu not in keys:
1149+
keys[tu] = []
1150+
keys[tu].append(len(indices))
1151+
indices.append(f.tell())
1152+
st = BytesIO()
1153+
pickle.dump(dfs, st)
1154+
f.write(st.getvalue())
1155+
1156+
indices.append(f.tell())
1157+
1158+
values = list(keys.items())
1159+
values.sort(reverse=not ascending)
1160+
1161+
def iterate():
1162+
1163+
with open(temp_file, 'rb') as f:
1164+
1165+
if na_position == 'first':
1166+
for p in nans:
1167+
f.seek(indices[p])
1168+
length = indices[p + 1] - indices[p]
1169+
pkl = f.read(length)
1170+
dfs = pickle.load(BytesIO(pkl))
1171+
sub = dfs[numpy.isnan(dfs[by])]
1172+
yield sub
1173+
1174+
for key, positions in values:
1175+
for p in positions:
1176+
f.seek(indices[p])
1177+
length = indices[p + 1] - indices[p]
1178+
pkl = f.read(length)
1179+
dfs = pickle.load(BytesIO(pkl))
1180+
sub = dfs[dfs[by] == key]
1181+
yield sub
1182+
1183+
if na_position == 'last':
1184+
for p in nans:
1185+
f.seek(indices[p])
1186+
length = indices[p + 1] - indices[p]
1187+
pkl = f.read(length)
1188+
dfs = pickle.load(BytesIO(pkl))
1189+
sub = dfs[numpy.isnan(dfs[by])]
1190+
yield sub
1191+
1192+
res = StreamingDataFrame(
1193+
lambda: iterate(), **self.get_kwargs())
1194+
res._delete_.append(lambda: os.remove(temp_file))
1195+
return res
1196+
1197+
def __del__(self):
1198+
"""
1199+
Calls every function in `_delete_`.
1200+
"""
1201+
for f in self._delete_:
1202+
f()
1203+
11211204

11221205
class StreamingSeries(StreamingDataFrame):
11231206
"""

0 commit comments

Comments
 (0)