Skip to content

Commit

Permalink
Merge branch 'master' into fix_fwrite_length
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelChirico authored Sep 3, 2024
2 parents 4e91a21 + d6a9fe7 commit 45bf1d4
Show file tree
Hide file tree
Showing 20 changed files with 116 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .ci/atime/tests.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# A list of performance tests.
#
# See documentation in https://github.com/Rdatatable/data.table/wiki/Performance-testing for best practices.
#
# Each entry in this list corresponds to a performance test and contains a sublist with three mandatory arguments:
# - N: A numeric sequence of data sizes to vary.
# - setup: An expression evaluated for every data size before measuring time/memory.
Expand Down
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@
# performance testing
/.ci/atime/tests.R @tdhock @Anirban166
/.github/workflows/performance-tests.yaml @Anirban166

# docs
/man/openmp-utils.Rd @Anirban166
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

1. Using `print.data.table()` with character truncation using `datatable.prettyprint.char` no longer errors with `NA` entries, [#6441](https://github.com/Rdatatable/data.table/issues/6441). Thanks to @r2evans for the bug report, and @joshhwuu for the fix.

2. `fwrite()` respects `dec=','` for timestamp columns (`POSIXct` or `nanotime`) with sub-second accuracy, [#6446](https://github.com/Rdatatable/data.table/issues/6446). Thanks @kav2k for pointing out the inconsistency and @MichaelChirico for the PR.

## NOTES

1. Tests run again when some Suggests packages are missing, [#6411](https://github.com/Rdatatable/data.table/issues/6411). Thanks @aadler for the note and @MichaelChirico for the fix.
Expand Down
5 changes: 5 additions & 0 deletions inst/tests/other.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -761,3 +761,8 @@ if (loaded[["dplyr"]]) {
DT = data.table(a = 1, b = 2, c = '1,2,3,4', d = 4)
test(30, DT[, c := strsplit(c, ',', fixed = TRUE) %>% lapply(as.integer) %>% as.list]$c, list(1:4)) # nolint: pipe_call_linter. Mimicking MRE as filed.
}

if (loaded[["nanotime"]]) {
# respect dec=',' for nanotime, related to #6446, corresponding to tests 2281.*
test(31, fwrite(data.table(as.nanotime(.POSIXct(0))), dec=',', sep=';'), output="1970-01-01T00:00:00,000000000Z")
}
4 changes: 4 additions & 0 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -19064,3 +19064,7 @@ test(2280.1, internal_error("broken"), error="Internal error.*broken")
test(2280.2, internal_error("broken %d%s", 1, "2"), error="Internal error.*broken 12")
foo = function(...) internal_error("broken")
test(2280.3, foo(), error="Internal error in foo: broken")

# fwrite respects dec=',' for sub-second timestamps, #6446
test(2281.1, fwrite(data.table(a=.POSIXct(0.001)), dec=',', sep=';'), output="1970-01-01T00:00:00,001Z")
test(2281.2, fwrite(data.table(a=.POSIXct(0.0001)), dec=',', sep=';'), output="1970-01-01T00:00:00,000100Z")
9 changes: 5 additions & 4 deletions man/openmp-utils.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@
\item\file{cj.c} - \code{\link{CJ}()}
\item\file{coalesce.c} - \code{\link{fcoalesce}()}
\item\file{fifelse.c} - \code{\link{fifelse}()}
\item\file{fread.c} - \code{\link{fread}()}
\item\file{fread.c}, \file{freadR.c} - \code{\link{fread}(). Parallelized across row-based chunks of the file.}
\item\file{forder.c}, \file{fsort.c}, and \file{reorder.c} - \code{\link{forder}()} and related
\item\file{froll.c}, \file{frolladaptive.c}, and \file{frollR.c} - \code{\link{froll}()} and family
\item\file{fwrite.c} - \code{\link{fwrite}()}
\item\file{gsumm.c} - GForce in various places, see \link{GForce}
\item\file{fwrite.c} - \code{\link{fwrite}(). Parallelized across rows.}
\item\file{gsumm.c} - GForce in various places, see \link{GForce}. Parallelized across groups.
\item\file{nafill.c} - \code{\link{nafill}()}
\item\file{subset.c} - Used in \code{\link[=data.table]{[.data.table}} subsetting
\item\file{types.c} - Internal testing usage
}
We endeavor to keep this list up to date, but note that the canonical reference here is the source code itself.
}
\examples{
getDTthreads(verbose=TRUE)
}
\keyword{ data }
6 changes: 6 additions & 0 deletions src/between.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "data.table.h"

/*
OpenMP is used here to parallelize:
- The loops that check if each element of the vector provided is between
the specified lower and upper bounds, for INTSXP and REALSXP types
- The checking and handling of undefined values (such as NaNs)
*/
SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, SEXP checkArg) {
int nprotect = 0;
R_len_t nx = length(x), nl = length(lower), nu = length(upper);
Expand Down
6 changes: 6 additions & 0 deletions src/cj.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "data.table.h"

/*
OpenMP is used here to parallelize:
- The element assignment in vectors
- The memory copying operations (blockwise replication of data using memcpy)
- The creation of all combinations of the input vectors over the cross-product space
*/
SEXP cj(SEXP base_list) {
int ncol = LENGTH(base_list);
SEXP out = PROTECT(allocVector(VECSXP, ncol));
Expand Down
6 changes: 6 additions & 0 deletions src/coalesce.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "data.table.h"

/*
OpenMP is used here to parallelize:
- The operation that iterates over the rows to coalesce the data
- The replacement of NAs with non-NA values from subsequent vectors
- The conditional checks within parallelized loops
*/
SEXP coalesce(SEXP x, SEXP inplaceArg) {
if (TYPEOF(x)!=VECSXP) internal_error(__func__, "input is list(...) at R level"); // # nocov
if (!IS_TRUE_OR_FALSE(inplaceArg)) internal_error(__func__, "argument 'inplaceArg' must be TRUE or FALSE"); // # nocov
Expand Down
6 changes: 6 additions & 0 deletions src/fifelse.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "data.table.h"

/*
OpenMP is being used here to parallelize loops that perform conditional
checks along with assignment operations over the elements of the
supplied logical vector based on the condition (test) and values
provided for the remaining arguments (yes, no, and na).
*/
SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
if (!isLogical(l)) {
error(_("Argument 'test' must be logical."));
Expand Down
13 changes: 13 additions & 0 deletions src/forder.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ uint64_t dtwiddle(double x) //const void *p, int i)

void radix_r(const int from, const int to, const int radix);

/*
OpenMP is used here to parallelize multiple operations that come together to
sort a data.table using the Radix algorithm. These include:
- The counting of unique values and recursively sorting subsets of data
across different threads
- The process of finding the range and distribution of data for efficient
grouping and sorting
- Creation of histograms which are used to sort data based on significant
bits (each thread processes a separate batch of the data, computes the
MSB of each element, and then increments the corresponding bins), with
the distribution and merging of buckets
*/
SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsArg, SEXP ascArg, SEXP naArg)
// sortGroups TRUE from setkey and regular forder, FALSE from by= for efficiency so strings don't have to be sorted and can be left in appearance order
// when sortGroups is TRUE, ascArg contains +1/-1 for ascending/descending of each by column; when FALSE ascArg is ignored
Expand Down
8 changes: 8 additions & 0 deletions src/fread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,14 @@ static int detect_types( const char **pch, int8_t type[], int ncol, bool *bumped
//
// Returns 1 if it finishes successfully, and 0 otherwise.
//
// OpenMP is used here to:
// - Parallelize the reading of data in chunks
// - Avoid race conditions or concurrent writes to the output data.table by having atomic
// operations on the string data
// - Manage synchronized updates to the progress bar and serialize the output to the console
// This function is highly optimized in reading and processing data with both large numbers of
// rows and columns, but the efficiency is more pronounced across rows.
//
//=================================================================================================
int freadMain(freadMainArgs _args) {
args = _args; // assign to global for use by DTPRINT() in other functions
Expand Down
9 changes: 9 additions & 0 deletions src/froll.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
#include "data.table.h"

/*
OpenMP is used here to parallelize the loops in frollmean and frollsum.
These functions benefit more in terms of speedup when the data has a large
number of columns, primarily due to the efficient memory access patterns
(cache-friendly) used when processing the data for each column
sequentially in memory to compute the rolling statistic.
*/

/* fast rolling mean - router
* early stopping for window bigger than input
* also handles 'align' in single place
Expand Down
4 changes: 4 additions & 0 deletions src/fsort.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ int qsort_cmp(const void *a, const void *b) {
return (x<y)-(x>y); // largest first in a safe branchless way casting long to int
}

/*
OpenMP is used here to find the range and distribution of data for efficient
grouping and sorting.
*/
SEXP fsort(SEXP x, SEXP verboseArg) {
double t[10];
t[0] = wallclock();
Expand Down
17 changes: 13 additions & 4 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,15 @@ void writePOSIXct(const void *col, int64_t row, char **pch)
// don't use writeInteger() because it doesn't 0 pad which we need here
// integer64 is big enough for squash with milli but not micro; trunc (not round) micro when squash
m /= 1000;
*ch++ = '.';
*ch++ = dec;
ch -= squashDateTime;
*(ch+2) = '0'+m%10; m/=10;
*(ch+1) = '0'+m%10; m/=10;
*ch = '0'+m;
ch += 3;
} else if (m) {
// microseconds are present and !squashDateTime
*ch++ = '.';
*ch++ = dec;
*(ch+5) = '0'+m%10; m/=10;
*(ch+4) = '0'+m%10; m/=10;
*(ch+3) = '0'+m%10; m/=10;
Expand Down Expand Up @@ -488,7 +488,7 @@ void writeNanotime(const void *col, int64_t row, char **pch)
*ch++ = 'T';
ch -= squashDateTime;
write_time(s, &ch);
*ch++ = '.';
*ch++ = dec;
ch -= squashDateTime;
for (int i=8; i>=0; i--) { *(ch+i) = '0'+n%10; n/=10; } // always 9 digits for nanoseconds
ch += 9;
Expand Down Expand Up @@ -588,7 +588,16 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour
}
#endif

// main fwrite function ----
/*
main fwrite function ----
OpenMP is used here primarily to parallelize the process of writing rows
to the output file, but error handling and compression (if enabled) are
also managed within the parallel region. Special attention is paid to
thread safety and synchronization, especially in the ordered sections
where output to the file and handling of errors is serialized to maintain
the correct sequence of rows.
*/
void fwriteMain(fwriteMainArgs args)
{
double startTime = wallclock();
Expand Down
5 changes: 5 additions & 0 deletions src/gsumm.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ static int nbit(int n)
return nb;
}

/*
Functions with GForce optimization are internally parallelized to speed up
grouped summaries over a large data.table. OpenMP is used here to
parallelize operations involved in calculating common group-wise statistics.
*/
SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) {
double started = wallclock();
const bool verbose = GetVerbose();
Expand Down
5 changes: 5 additions & 0 deletions src/nafill.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ void nafillInteger64(int64_t *x, uint_fast64_t nx, unsigned int type, int64_t fi
snprintf(ans->message[0], 500, _("%s: took %.3fs\n"), __func__, omp_get_wtime()-tic);
}

/*
OpenMP is being used here to parallelize the loop that fills missing values
over columns of the input data. This includes handling different data types
and applying the designated filling method to each column in parallel.
*/
SEXP nafillR(SEXP obj, SEXP type, SEXP fill, SEXP nan_is_na_arg, SEXP inplace, SEXP cols) {
int protecti=0;
const bool verbose = GetVerbose();
Expand Down
5 changes: 5 additions & 0 deletions src/reorder.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "data.table.h"

/*
OpenMP is used here to reorder a vector or each column in a list of vectors
(such as in a data.table) based on a given vector that dictates the new
ordering of elements
*/
SEXP reorder(SEXP x, SEXP order)
{
// For internal use only by setkey().
Expand Down
3 changes: 2 additions & 1 deletion src/subset.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,9 @@ static void checkCol(SEXP col, int colNum, int nrow, SEXP x)
* 2) Originally for subsetting vectors in fcast and now the beginnings of [.data.table ported to C
* 3) Immediate need is for R 3.1 as lglVec[1] now returns R's global TRUE and we don't want := to change that global [think 1 row data.tables]
* 4) Could do it other ways but may as well go to C now as we were going to do that anyway
*
* OpenMP is used here to parallelize the loops that perform the subsetting of vectors, with conditional checks and filtering of data.
*/

SEXP subsetDT(SEXP x, SEXP rows, SEXP cols) { // API change needs update NEWS.md and man/cdt.Rd
int nprotect=0;
if (!isNewList(x)) internal_error(__func__, "Argument '%s' to %s is type '%s' not '%s'", "x", "CsubsetDT", type2char(TYPEOF(rows)), "list"); // # nocov
Expand Down
7 changes: 7 additions & 0 deletions src/types.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ void testRaiseMsg(ans_t *ans, int istatus, bool verbose) {
}
ans->int_v[0] = ans->status;
}
/*
This caters to internal tests (not user-facing), and OpenMP is being used
here to test a message printing function inside a nested loop which has
been collapsed into a single loop of the combined iteration space using
collapse(2), along with specification of dynamic scheduling for distributing
the iterations in a way that can balance the workload among the threads.
*/
SEXP testMsgR(SEXP status, SEXP x, SEXP k) {
if (!isInteger(status) || !isInteger(x) || !isInteger(k))
internal_error(__func__, "status, nx, nk must be integer"); // # nocov
Expand Down

0 comments on commit 45bf1d4

Please sign in to comment.