Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 90 additions & 59 deletions src/forder.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,20 @@ static int ustr_alloc = 0;
static int ustr_n = 0;
static int ustr_maxlen = 0;
static int sortType = 0; // 0 just group; -1 descending, +1 ascending
static int nalast = 0; // 1 (true i.e. last), 0 (false i.e. first), -1 (na i.e. remove)

static int nradix = 0;
static uint8_t **key = NULL;
static int *anso = NULL;
static bool notFirst=false;

static int lgl_arg_to_int(int lgl_arg) {
if (lgl_arg == NA_LOGICAL) {
return -1;
}
return lgl_arg;
}


static char msg[1001];
// use STOP in this file (not error()) to ensure cleanup() is called first
// snprintf to msg first in case nrow (just as an example) is provided in the message because cleanup() sets nrow to 0
Expand Down Expand Up @@ -440,7 +448,7 @@ uint64_t dtwiddle(double x) //const void *p, int i)
STOP(_("Unknown non-finite value; not NA, NaN, -Inf or +Inf")); // # nocov
}

void radix_r(const int from, const int to, int radix);
void radix_r(const int from, const int to, int radix, bool na_remove);

/*
OpenMP is used here to parallelize multiple operations that come together to
Expand Down Expand Up @@ -527,7 +535,8 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
STOP(_("At least one of retGrp= or sort= must be TRUE"));
if (!isLogical(naArg) || LENGTH(naArg) != 1)
STOP(_("na.last must be logical TRUE, FALSE or NA of length 1")); // # nocov # covered in reuseSorting forder
nalast = (LOGICAL(naArg)[0] == NA_LOGICAL) ? -1 : LOGICAL(naArg)[0]; // 1=na last, 0=na first (default), -1=remove na
const int nalast = lgl_arg_to_int(LOGICAL(naArg)[0]); // 1=na last, 0=na first (default), -1=remove na
const bool na_remove = (nalast == -1);

if (nrow==0) {
// empty vector or 0-row DT is always sorted
Expand Down Expand Up @@ -803,7 +812,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
free(TMP); free(UGRP); // # nocov
STOP(_("Failed to allocate TMP or UGRP or they weren't cache line aligned: nth=%d"), nth); // # nocov
}

if (retgrp) {
gs_thread = calloc(nth, sizeof(*gs_thread)); // thread private group size buffers
gs_thread_alloc = calloc(nth, sizeof(*gs_thread_alloc));
Expand All @@ -814,7 +823,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
}
}
if (nradix) {
radix_r(0, nrow-1, 0); // top level recursive call: (from, to, radix)
radix_r(0, nrow-1, 0, na_remove); // top level recursive call: (from, to, radix)
} else {
push(&nrow, 1);
}
Expand Down Expand Up @@ -906,7 +915,7 @@ static bool sort_ugrp(uint8_t *x, const int n)
return skip;
}

void radix_r(const int from, const int to, int radix) {
void radix_r(const int from, const int to, int radix, bool na_remove) {
for (;;) {
TBEG();
const int my_n = to-from+1;
Expand Down Expand Up @@ -1038,7 +1047,7 @@ void radix_r(const int from, const int to, int radix) {
continue;
} else {
for (int i=0, f=from; i<ngrp; i++) {
radix_r(f, f+my_gs[i]-1, radix+1);
radix_r(f, f+my_gs[i]-1, radix+1, na_remove);
f+=my_gs[i];
}
}
Expand Down Expand Up @@ -1097,7 +1106,7 @@ void radix_r(const int from, const int to, int radix) {
}

int *restrict my_TMP = TMP + omp_get_thread_num()*UINT16_MAX; // Allocated up front to save malloc calls which i) block internally and ii) could fail
if (radix==0 && nalast!=-1) {
if (radix==0 && !na_remove) {
// anso contains 1:n so skip reading and copying it. Only happens when nrow<65535. Saving worth the branch (untested) when user repeatedly calls a small-n small-cardinality order.
for (int i=0; i<my_n; i++) anso[my_starts[my_key[i]]++] = i+1; // +1 as R is 1-based.
// The loop counter could be uint_fast16_t since max i here will be UINT16_MAX-1 (65534), hence ++ after last iteration won't overflow 16bits. However, have chosen signed
Expand Down Expand Up @@ -1147,7 +1156,7 @@ void radix_r(const int from, const int to, int radix) {
} else {
// this single thread will now descend and resolve all groups, now that the groups are close in cache
for (int i=0, my_from=from; i<ngrp; i++) {
radix_r(my_from, my_from+my_gs[i]-1, radix+1);
radix_r(my_from, my_from+my_gs[i]-1, radix+1, na_remove);
my_from+=my_gs[i];
}
}
Expand All @@ -1170,59 +1179,70 @@ void radix_r(const int from, const int to, int radix) {
bool skip=true;
const int n_rem = nradix-radix-1; // how many radix are remaining after this one
TEND(16)
#pragma omp parallel num_threads(getDTthreads(nBatch, false))
bool otmp_ktmp_failed_to_allocate = false;
// 200805 supported || reduction
#if defined(_OPENMP) && _OPENMP >= 200805
#pragma omp parallel num_threads(getDTthreads(nBatch, false)) reduction(|| : otmp_ktmp_failed_to_allocate)
#endif
{
int *my_otmp = malloc(sizeof(*my_otmp) * batchSize); // thread-private write
uint8_t *my_ktmp = malloc(sizeof(*my_ktmp) * batchSize * n_rem);
if (!my_otmp || !my_ktmp) {
free(my_otmp); free(my_ktmp);
STOP(_("Failed to allocate 'my_otmp' and/or 'my_ktmp' arrays (%d bytes)."), (int)((sizeof(*my_otmp) + sizeof(*my_ktmp)) * batchSize));
otmp_ktmp_failed_to_allocate = true;
}
// TODO: move these up above and point restrict[me] to them. Easier to Error that way if failed to alloc.
#pragma omp for
for (int batch=0; batch<nBatch; batch++) {
const int my_n = (batch==nBatch-1) ? lastBatchSize : batchSize; // lastBatchSize == batchSize when my_n is a multiple of batchSize
const int my_from = from + batch*batchSize;
uint16_t *restrict my_counts = counts + batch*256;
uint8_t *restrict my_ugrp = ugrps + batch*256;
int my_ngrp = 0;
bool my_skip = true;
const uint8_t *restrict my_key = key[radix] + my_from;
const uint8_t *restrict byte = my_key;
for (int i=0; i<my_n; i++, byte++) {
if (++my_counts[*byte]==1) { // always true first time when i==0
my_ugrp[my_ngrp++] = *byte;
} else if (my_skip && byte[0]!=byte[-1]) { // include 'my_skip &&' to save != comparison after it's realized this batch is not grouped
my_skip=false;
}
}
ngrps[batch] = my_ngrp; // write once to this shared cache line
if (!my_skip) {
skip = false; // naked write to this shared byte is ok because false is only value written
// gather this batch's anso and remaining keys. If we sorting too, urgrp is sorted later for that. Here we want to benefit from skip within batch
// as much as possible which is a good chance since batchSize is relatively small (65535)
for (int i=0, sum=0; i<my_ngrp; i++) { int tmp = my_counts[my_ugrp[i]]; my_counts[my_ugrp[i]]=sum; sum+=tmp; } // cumulate counts of this batch
const int *restrict osub = anso+my_from;
byte = my_key;
if (!otmp_ktmp_failed_to_allocate) {
// TODO: move these up above and point restrict[me] to them. Easier to Error that way if failed to alloc.
#if defined(_OPENMP) && _OPENMP >= 200805
#pragma omp for
#endif
for (int batch=0; batch<nBatch; batch++) {
const int my_n = (batch==nBatch-1) ? lastBatchSize : batchSize; // lastBatchSize == batchSize when my_n is a multiple of batchSize
const int my_from = from + batch*batchSize;
uint16_t *restrict my_counts = counts + batch*256;
uint8_t *restrict my_ugrp = ugrps + batch*256;
int my_ngrp = 0;
bool my_skip = true;
const uint8_t *restrict my_key = key[radix] + my_from;
const uint8_t *restrict byte = my_key;
for (int i=0; i<my_n; i++, byte++) {
int dest = my_counts[*byte]++;
my_otmp[dest] = *osub++; // wastefully copies out 1:n when radix==0, but do not optimize as unlikely worth code complexity. my_otmp is not large, for example. Use first TEND() to decide.
for (int r=0; r<n_rem; r++) my_ktmp[r*my_n + dest] = key[radix+1+r][my_from+i]; // reorder remaining keys
if (++my_counts[*byte]==1) { // always true first time when i==0
my_ugrp[my_ngrp++] = *byte;
} else if (my_skip && byte[0]!=byte[-1]) { // include 'my_skip &&' to save != comparison after it's realized this batch is not grouped
my_skip=false;
}
}
// or could do multiple passes through my_key like in the my_n<=65535 approach above. Test which is better depending on if TEND() points here.
ngrps[batch] = my_ngrp; // write once to this shared cache line
if (!my_skip) {
skip = false; // naked write to this shared byte is ok because false is only value written
// gather this batch's anso and remaining keys. If we sorting too, urgrp is sorted later for that. Here we want to benefit from skip within batch
// as much as possible which is a good chance since batchSize is relatively small (65535)
for (int i=0, sum=0; i<my_ngrp; i++) { int tmp = my_counts[my_ugrp[i]]; my_counts[my_ugrp[i]]=sum; sum+=tmp; } // cumulate counts of this batch
const int *restrict osub = anso+my_from;
byte = my_key;
for (int i=0; i<my_n; i++, byte++) {
int dest = my_counts[*byte]++;
my_otmp[dest] = *osub++; // wastefully copies out 1:n when radix==0, but do not optimize as unlikely worth code complexity. my_otmp is not large, for example. Use first TEND() to decide.
for (int r=0; r<n_rem; r++) my_ktmp[r*my_n + dest] = key[radix+1+r][my_from+i]; // reorder remaining keys
}
// or could do multiple passes through my_key like in the my_n<=65535 approach above. Test which is better depending on if TEND() points here.

// we haven't completed all batches, so we don't know where these groups should place yet
// So for now we write the thread-private small now-grouped buffers back in-place. The counts and groups across all batches will be used below to move these blocks.
memcpy(anso+my_from, my_otmp, my_n*sizeof(*anso));
for (int r=0; r<n_rem; r++) memcpy(key[radix+1+r]+my_from, my_ktmp+r*my_n, my_n*sizeof(uint8_t));
// we haven't completed all batches, so we don't know where these groups should place yet
// So for now we write the thread-private small now-grouped buffers back in-place. The counts and groups across all batches will be used below to move these blocks.
memcpy(anso+my_from, my_otmp, my_n*sizeof(*anso));
for (int r=0; r<n_rem; r++) memcpy(key[radix+1+r]+my_from, my_ktmp+r*my_n, my_n*sizeof(uint8_t));

// revert cumulate back to counts ready for vertical cumulate
for (int i=0, last=0; i<my_ngrp; i++) { int tmp = my_counts[my_ugrp[i]]; my_counts[my_ugrp[i]]-=last; last=tmp; }
// revert cumulate back to counts ready for vertical cumulate
for (int i=0, last=0; i<my_ngrp; i++) { int tmp = my_counts[my_ugrp[i]]; my_counts[my_ugrp[i]]-=last; last=tmp; }
}
}
}
free(my_otmp);
free(my_ktmp);
}
if (otmp_ktmp_failed_to_allocate) {
STOP(_("Failed to allocate 'my_otmp' and/or 'my_ktmp' arrays (%d bytes)."), (int)((sizeof(int) + sizeof(uint8_t)) * batchSize));
}
TEND(17 + notFirst*3) // 3 timings in this section: 17,18,19 first main split; 20,21,22 thereon

// If my_n input is grouped and ugrp is sorted too (to illustrate), status now would be :
Expand Down Expand Up @@ -1357,7 +1377,7 @@ void radix_r(const int from, const int to, int radix) {
// each in parallel here and they're all dealt with in parallel. There is no nestedness here.
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
radix_r(start, start+my_gs[i]-1, radix+1, na_remove);
flush();
}
TEND(24)
Expand All @@ -1369,7 +1389,7 @@ void radix_r(const int from, const int to, int radix) {
#pragma omp parallel for ordered schedule(dynamic) num_threads(MIN(nth, ngrp)) // #5077
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
radix_r(start, start+my_gs[i]-1, radix+1, na_remove);
#pragma omp ordered
flush();
}
Expand All @@ -1378,7 +1398,7 @@ void radix_r(const int from, const int to, int radix) {
#pragma omp parallel for schedule(dynamic) num_threads(MIN(nth, ngrp)) // #5077
for (int i=0; i<ngrp; i++) {
int start = from + starts[ugrp[i]];
radix_r(start, start+my_gs[i]-1, radix+1);
radix_r(start, start+my_gs[i]-1, radix+1, na_remove);
}
}
TEND(25)
Expand Down Expand Up @@ -1671,8 +1691,15 @@ bool GetAutoIndex(void) {
}

// attr(idx, "anyna")>0 || attr(idx, "anyinfnan")>0
bool idxAnyNF(SEXP idx) {
return INTEGER(getAttrib(idx, sym_anyna))[0]>0 || INTEGER(getAttrib(idx, sym_anyinfnan))[0]>0;
static inline bool idxAnyNF(SEXP idx) {
SEXP any_na = getAttrib(idx, sym_anyna);
// Defensive: #7519 to avoid segfaults on CRAN
if (!isInteger(any_na) || xlength(any_na) != 1) return true;

SEXP any_infnan = getAttrib(idx, sym_anyinfnan);
if (!isInteger(any_infnan) || xlength(any_infnan) != 1) return true;

return INTEGER(any_na)[0]>0 || INTEGER(any_infnan)[0]>0;
}

// forder, re-use existing key or index if possible, otherwise call forder
Expand All @@ -1697,7 +1724,11 @@ SEXP forderReuseSorting(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP
bool sortGroups = (bool)LOGICAL(sortGroupsArg)[0];
if (!isLogical(naArg) || LENGTH(naArg) != 1)
error(_("na.last must be logical TRUE, FALSE or NA of length 1"));
bool na = (bool)LOGICAL(naArg)[0];
int na_arg = lgl_arg_to_int(LOGICAL(naArg)[0]);

const bool na_first = (na_arg == 0); // na.last=FALSE
const bool na_last = (na_arg == 1); // na.last=TRUE
const bool na_remove = (na_arg == -1); // na.last=NA
if (!isInteger(ascArg))
error(_("order must be integer")); // # nocov # coerced to int in R
if (!isLogical(reuseSortingArg) || LENGTH(reuseSortingArg) != 1)
Expand Down Expand Up @@ -1728,18 +1759,18 @@ SEXP forderReuseSorting(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP
opt = 0;
}
SEXP ans = R_NilValue;
if (opt == -1 && !na && !retGrp && colsKeyHead(DT, by)) {
if (opt == -1 && na_first && !retGrp && colsKeyHead(DT, by)) {
opt = 1; // keyOpt
ans = PROTECT(allocVector(INTSXP, 0)); protecti++;
if (verbose)
Rprintf(_("forderReuseSorting: using key: %s\n"), CHAR(STRING_ELT(idxName(DT, by), 0)));
}
if (opt == -1 && GetUseIndex()) {
SEXP idx = getIndex(DT, by);
if (!isNull(idx)) {
bool hasStats = !isNull(getAttrib(idx, sym_anyna));
if (!na || // na.last=FALSE
(hasStats && !idxAnyNF(idx))) { // na.last=TRUE && !anyNA
if (!isNull(idx) && !na_remove) {
bool hasStats = !isNull(getAttrib(idx, sym_anyna)) && !isNull(getAttrib(idx, sym_anyinfnan));
if (na_first || // na.last=FALSE
(na_last && hasStats && !idxAnyNF(idx))) { // na.last=TRUE && !anyNA
bool hasGrp = !isNull(getAttrib(idx, sym_starts));
if (hasGrp && !hasStats)
internal_error_with_cleanup(__func__, "index has 'starts' attribute but not 'anyna', please report to issue tracker"); // # nocov
Expand Down Expand Up @@ -1798,7 +1829,7 @@ SEXP forderReuseSorting(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP
if (opt < 1) {
ans = PROTECT(forder(DT, by, retGrpArg, retStatsArg, sortGroupsArg, ascArg, naArg)); protecti++;
if (opt == -1 && // opt==0 means that arguments (sort, asc) were not of type index, or reuseSorting=FALSE
(!na || (retStats && !idxAnyNF(ans))) && // lets create index even if na.last=T used but no NAs detected!
(na_first || (retStats && !idxAnyNF(ans))) && // lets create index even if na.last=T used but no NAs detected!
GetUseIndex() &&
GetAutoIndex()) { // disabled by default, use datatable.forder.auto.index=T to enable, do not export/document, use for debugging only
putIndex(DT, by, ans);
Expand Down
Loading