From 239d6827a25a60bee9a22d68a7010cc7b8043ca4 Mon Sep 17 00:00:00 2001 From: Michael Chirico Date: Thu, 29 Aug 2024 17:24:46 +0000 Subject: [PATCH] cbindlist add cbind by reference, timing R prototype of mergelist wording use lower overhead funs stick to int32 for now, correct R_alloc bmerge C refactor for codecov and one loop for speed address revealed codecov gaps refactor vecseq for codecov seqexp helper, some alloccol export on C bmerge codecov, types handled in R bmerge already better comment seqexp bmerge mult=error #655 multiple new C utils swap if branches explain new C utils comments mostly reduce conflicts to PR #4386 comment C code address multiple matches during update-on-join #3747 Revert "address multiple matches during update-on-join #3747" This reverts commit b64c0c3480fe9415bbda6729c361621e60da6e01. merge.dt has temporarily mult arg, for testing minor changes to cbindlist c dev mergelist, for single pair now add quiet option to cc() mergelist tests add check for names to perhaps.dt rm mult from merge.dt method rework, clean, polish multer, fix righ and full joins make full join symmetric mergepair inner function to loop on extra check for symmetric mergelist manual ensure no df-dt passed where list expected comments and manual handle 0 cols tables more tests more tests and debugging move more logic closer to bmerge, simplify mergepair more tests revert not used changes reduce not needed checks, cleanup copy arg behavior, manual, no tests yet cbindlist manual, export both cleanup processing bmerge to dtmatch test function match order for easier preview vecseq gets short-circuit batch test allow browser big cleanup remmove unneeded stuff, reduce diff more cleanup, minor manual fixes add proper test scripts Merge branch 'master' into cbind-merge-list comment out not used code for coverage more tests, some nocopy opts rename sql test script, should fix codecov simplify dtmatch inner branch more precise copy, now copy only T or F unused arg not yet in api, wording comments and refer issues codecov hasindex coverage codecov gap tests for join using key, cols argument fix missing import forderv more tests, improve missing on handling more tests for order of inner and full join for long keys new allow.cartesian option, #4383, #914 reduce diff, improve codecov reduce diff, comments need more DT, not lists, mergelist 3+ tbls proper escape heavy check unit tests more tests, address overalloc failure mergelist and cbindlist retain index manual, examples fix manual minor clarify in manual retain keys, right outer join for snowflake schema joins duplicates in cbindlist recycling in cbindlist escape 0 input in copyCols empty input handling closing cbindlist vectorized _on_ and _join.many_ arg rename dtmatch to dtmerge vectorized args: how, mult push down input validation add support for cross join, semi join, anti join full join, reduce overhead for mult=error mult default value dynamic fix manual add "see details" to Rd mention shared on in arg description amend feedback from Michael semi and anti joins will not reorder x columns Merge branch 'master' into cbind-merge-list spelling, thx to @jan-glx check all new funs used and add comments bugfix, sort=T needed for now Merge branch 'master' into cbind-merge-list Update NEWS.md Merge branch 'master' into cbind-merge-list Merge branch 'master' into cbind-merge-list NEWS placement numbering ascArg->order Merge remote-tracking branch 'origin/cbind-merge-list' into cbind-merge-list attempt to restore from master Update to stopf() error style Need isFrame for now More quality checks: any(!x)->!all(x); use vapply_1{b,c,i} really restore from master try to PROTECT() before duplicate() update error message in test appease the rchk gods extraneous space missing ';' use catf simplify perhapsDataTableR move sqlite.Rraw.manual into other.Rraw simplify for loop Merge remote-tracking branch 'origin/cbind-merge-list' into cbind-merge-list --- NAMESPACE | 1 + NEWS.md | 4 + R/mergelist.R | 96 ++++++++++++++ inst/tests/other.Rraw | 296 +++++++++++++++++++++++++++++++++++++++++- man/mergelist.Rd | 189 +++++++++++++++++++++++++++ 5 files changed, 585 insertions(+), 1 deletion(-) create mode 100644 man/mergelist.Rd diff --git a/NAMESPACE b/NAMESPACE index be238d75b..d20f87450 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -59,6 +59,7 @@ export(setnafill) export(.Last.updated) export(fcoalesce) export(cbindlist) +export(mergelist) export(substitute2) #export(DT) # mtcars |> DT(i,j,by) #4872 #5472 diff --git a/NEWS.md b/NEWS.md index 8b15449e8..d5d433226 100644 --- a/NEWS.md +++ b/NEWS.md @@ -6,6 +6,10 @@ 1. In `DT[, variable := value]`, when value is class `POSIXlt`, we automatically coerce it to class `POSIXct` instead, [#1724](https://github.com/Rdatatable/data.table/issues/1724). Thanks to @linzhp for the report, and Benjamin Schwendinger for the fix. +## NEW FEATURES + +1. (add example here?) New functions `cbindlist` and `mergelist` have been implemented and exported. Works like `cbind`/`merge` but takes `list` of data.tables on input. `merge` happens in `Reduce` fashion. Supports `how` (_left_, _inner_, _full_, _right_, _semi_, _anti_, _cross_) joins and `mult` argument, closes [#599](https://github.com/Rdatatable/data.table/issues/599) and [#2576](https://github.com/Rdatatable/data.table/issues/2576). + ## NOTES 1. Tests run again when some Suggests packages are missing, [#6411](https://github.com/Rdatatable/data.table/issues/6411). Thanks @aadler for the note and @MichaelChirico for the fix. diff --git a/R/mergelist.R b/R/mergelist.R index 47725d64a..2a5c00e54 100644 --- a/R/mergelist.R +++ b/R/mergelist.R @@ -235,5 +235,101 @@ mergepair = function(lhs, rhs, on, how, mult, lhs.cols=names(lhs), rhs.cols=name setDT(out) } +mergelist = function(l, on, cols, how=c("left","inner","full","right","semi","anti","cross"), mult, copy=TRUE, join.many=getOption("datatable.join.many")) { + verbose = getOption("datatable.verbose") + if (verbose) + p = proc.time()[[3L]] + { + if (!is.list(l) || is.data.frame(l)) + stopf("'l' must be a list") + if (!all(vapply_1b(l, is.data.table))) + stopf("Every element of 'l' list must be data.table objects") + if (!all(lengths(l))) + stopf("Tables in 'l' argument must be non-zero columns tables") + if (any(vapply_1i(l, function(x) anyDuplicated(names(x))))) + stopf("Some of the tables in 'l' have duplicated column names") + } ## l + if (!isTRUEorFALSE(copy)) + stopf("'%s' must be TRUE or FALSE", "copy") + n = length(l) + if (n<2L) { + out = if (!n) as.data.table(l) else l[[1L]] + if (copy) out = copy(out) + if (verbose) + catf("mergelist: merging %d table(s), took %.3fs\n", n, proc.time()[[3L]]-p) + return(out) + } + { + if (!is.list(join.many)) + join.many = rep(list(join.many), n-1L) + if (length(join.many)!=n-1L || !all(vapply_1b(join.many, isTRUEorFALSE))) + stopf("'join.many' must be TRUE or FALSE, or a list of such which length must be length(l)-1L") + } ## join.many + { + if (missing(mult)) + mult = NULL + if (!is.list(mult)) + mult = rep(list(mult), n-1L) + if (length(mult)!=n-1L || !all(vapply_1b(mult, function(x) is.null(x) || (is.character(x) && length(x)==1L && !anyNA(x) && x %chin% c("error","all","first","last"))))) + stopf("'mult' must be one of [error, all, first, last] or NULL, or a list of such which length must be length(l)-1L") + } ## mult + { + if (missing(how) || is.null(how)) + how = match.arg(how) + if (!is.list(how)) + how = rep(list(how), n-1L) + if (length(how)!=n-1L || !all(vapply_1b(how, function(x) is.character(x) && length(x)==1L && !anyNA(x) && x %chin% c("left","inner","full","right","semi","anti","cross")))) + stopf("'how' must be one of [left, inner, full, right, semi, anti, cross], or a list of such which length must be length(l)-1L") + } ## how + { + if (missing(cols) || is.null(cols)) { + cols = vector("list", n) + } else { + if (!is.list(cols)) + stopf("'%s' must be a list", "cols") + if (length(cols) != n) + stopf("'cols' must be same length as 'l'") + skip = vapply_1b(cols, is.null) + if (!all(vapply_1b(cols[!skip], function(x) is.character(x) && !anyNA(x) && !anyDuplicated(x)))) + stopf("'cols' must be a list of non-zero length, non-NA, non-duplicated, character vectors, or eventually NULLs (all columns)") + if (any(mapply(function(x, icols) !all(icols %chin% names(x)), l[!skip], cols[!skip]))) + stopf("'cols' specify columns not present in corresponding table") + } + } ## cols + { + if (missing(on) || is.null(on)) { + on = vector("list", n-1L) + } else { + if (!is.list(on)) + on = rep(list(on), n-1L) + if (length(on)!=n-1L || !all(vapply_1b(on, function(x) is.character(x) && !anyNA(x) && !anyDuplicated(x)))) ## length checked in dtmerge + stopf("'on' must be non-NA, non-duplicated, character vector, or a list of such which length must be length(l)-1L") + } + } ## on + + l.mem = lapply(l, vapply, address, "") + out = l[[1L]] + out.cols = cols[[1L]] + for (join.i in seq_len(n-1L)) { + rhs.i = join.i + 1L + out = mergepair( + lhs = out, rhs = l[[rhs.i]], + on = on[[join.i]], + how = how[[join.i]], mult = mult[[join.i]], + lhs.cols = out.cols, rhs.cols = cols[[rhs.i]], + copy = FALSE, ## avoid any copies inside, will copy once below + join.many = join.many[[join.i]], + verbose = verbose + ) + out.cols = copy(names(out)) + } + out.mem = vapply_1c(out, address) + if (copy) + .Call(CcopyCols, out, colnamesInt(out, names(out.mem)[out.mem %chin% unique(unlist(l.mem, recursive=FALSE))])) + if (verbose) + catf("mergelist: merging %d tables, took %.3fs\n", n, proc.time()[[3L]]-p) + out +} + seqexp = function(x) .Call(Cseqexp, x) perhaps.data.table = function(x) .Call(CperhapsDataTableR, x) diff --git a/inst/tests/other.Rraw b/inst/tests/other.Rraw index eb3e461f7..33f1dc2eb 100644 --- a/inst/tests/other.Rraw +++ b/inst/tests/other.Rraw @@ -1,4 +1,4 @@ -pkgs = c("ggplot2", "hexbin", "plyr", "dplyr", "caret", "zoo", "xts", "gdata", "nlme", "bit64", "knitr", "parallel", "sf", "nanotime", "R.utils", "yaml") +pkgs = c("ggplot2", "hexbin", "plyr", "dplyr", "caret", "zoo", "xts", "gdata", "nlme", "bit64", "knitr", "parallel", "sf", "nanotime", "R.utils", "yaml", "DBI", "RSQLite") # First expression of this file must be as above: .gitlab-ci.yml uses parse(,n=1L) to read one expression from this file and installs pkgs. # So that these dependencies of other.Rraw are maintained in a single place. # TEST_DATA_TABLE_WITH_OTHER_PACKAGES is off by default so this other.Rraw doesn't run on CRAN. It is run by GLCI, locally in dev, and by @@ -761,3 +761,297 @@ if (loaded[["dplyr"]]) { DT = data.table(a = 1, b = 2, c = '1,2,3,4', d = 4) test(30, DT[, c := strsplit(c, ',', fixed = TRUE) %>% lapply(as.integer) %>% as.list]$c, list(1:4)) # nolint: pipe_call_linter. Mimicking MRE as filed. } + +# NB: currently, RSQLite requires DBI, so partially redundant, but future-proof. +if (loaded[["DBI"]] && loaded[["RSQLite"]]) { + # mergelist join tester vs SQLite, based on v1.9.8 non-equi join tester + + # funs ---- + + # produce SQL statement + # ln, rn: lhs names, rhs names, symmult: symmetric mult + mult_all = function(tbl, cols, ...) sprintf( + "(\n SELECT %s FROM %s\n) %s", + paste(setdiff(cols,"row_id"), collapse=", "), tbl, tbl + ) + mult_one = function(tbl, cols, on, mult) sprintf( + "(SELECT %s FROM (\n SELECT *, ROW_NUMBER() OVER (PARTITION BY %s ORDER BY row_id %s) AS rownum FROM %s\n) %s WHERE rownum=1) %s", + paste(setdiff(cols,c("row_id","rownum")), collapse=", "), + paste(on, collapse=", "), + if (mult=="first") "ASC" else "DESC", + tbl, tbl, tbl + ) + sql = function(how, on, mult, ln, rn, symmult=FALSE, notjoin=FALSE) { + stopifnot(length(on)==1L) + # building sql query + if (how=="full") { + return(sprintf( + "%s\nUNION ALL\n%s", + sql("left", on, mult, ln, rn, symmult=mult%in%c("first","last")), + sql("right", on, mult, ln, rn, symmult=mult%in%c("first","last"), notjoin=TRUE) + )) + } + nm = list() + nm[["lhs"]] = ln; nm[["rhs"]] = rn + using = sprintf("USING (%s)", paste(on, collapse=", ")) + lhs = "lhs"; rhs = "rhs" + join = if (how=="inner") { + if (mult=="all") sprintf("%s\nINNER JOIN\n%s\n%s", mult_all(lhs, nm[[lhs]]), mult_all(rhs, nm[[rhs]]), using) + else sprintf("%s\nINNER JOIN\n%s\n%s", mult_one(lhs, nm[[lhs]], on, mult), mult_one(rhs, nm[[rhs]], on, mult), using) + } else if (how=="left") { + if (mult=="all") sprintf("%s\nLEFT JOIN\n%s\n%s", mult_all(lhs, nm[[lhs]]), mult_all(rhs, nm[[rhs]]), using) + else sprintf("%s\nLEFT JOIN\n%s\n%s", (if (symmult) mult_one else mult_all)(lhs, nm[[lhs]], on, mult), mult_one(rhs, nm[[rhs]], on, mult), using) + } else if (how=="right") { ## lhs-rhs swap happens here, mult_one is applied on new rhs + if (mult=="all") sprintf("%s\nLEFT JOIN\n%s\n%s", mult_all(rhs, nm[[rhs]]), mult_all(lhs, nm[[lhs]]), using) + else sprintf("%s\nLEFT JOIN\n%s\n%s", (if (symmult) mult_one else mult_all)(rhs, nm[[rhs]], on, mult), mult_one(lhs, nm[[lhs]], on, mult), using) + } + if (how=="right") {lhs = "rhs"; rhs = "lhs"} ## this name swap is for notjoin and select below + where = if (!notjoin) "" else sprintf("\nWHERE %s IS NULL", paste(rhs, on, sep=".")) + select = sprintf("%s, %s, %s", paste(lhs, on, sep="."), + paste("lhs", setdiff(nm[["lhs"]], c("row_id",on)),sep=".",collapse=", "), + paste("rhs", setdiff(nm[["rhs"]], c("row_id",on)),sep=".",collapse=", ")) + sprintf("SELECT %s FROM\n%s%s", select, join, where) + } + + # .conn SQLite connection, if provided it will use it instead of creating temporary one + # .drop logical TRUE (default) will drop db tables before and after and populate new, when FALSE it expects tables to be populated + join.sql.equal = function(l, on, how="inner", mult="all", allow.cartesian=TRUE, .conn, .drop=TRUE, .debug=interactive(), ans, err=FALSE) { + nm = names(l) + stopifnot(is.null(nm) || identical(nm, c("x","i")) || identical(nm, c("lhs","rhs"))) + names(l) = c("lhs","rhs") + lhs = l[["lhs"]]; rhs = l[["rhs"]] + stopifnot(is.data.table(lhs), is.data.table(rhs), + is.character(how), is.character(mult), length(mult)==1L, + is.character(on), + is.logical(allow.cartesian), is.logical(.drop)) + if (err && mult=="error") { + dt = try(silent=TRUE, mergelist(list(lhs, rhs), on=on, how=how, mult=mult)) + if (!inherits(dt, "try-error")) { + if (.debug) browser() + stop("no error returned from mergelist(mult='error') but err flag set to TRUE in join.sql.equal") + } + err_msg = "mult='error' and multiple matches during merge" + if (!identical(attr(dt, "condition", TRUE)[["message"]], err_msg)) { + if (.debug) browser() + stop("different error returned than expected: ", attr(dt, "condition", TRUE)[["message"]]) + } + return(TRUE) + } + # row_id column required as SQL is not ordered, creating on R side + if (!"row_id" %in% names(lhs)) lhs = copy(lhs)[, "row_id" := seq_len(.N)] + if (!"row_id" %in% names(rhs)) rhs = copy(rhs)[, "row_id" := seq_len(.N)] + # preparing sql environment + conn = if (new.conn <- missing(.conn)) DBI::dbConnect(RSQLite::SQLite()) else .conn + if (.drop) { + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")), silent=TRUE) + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")), silent=TRUE) + DBI::dbWriteTable(conn, name="lhs", value=lhs) + DBI::dbWriteTable(conn, name="rhs", value=rhs) + } + # building sql query + s = sql(how, on, mult, names(lhs), names(rhs)) + s = paste0(s,";\n") + # run data.table and SQLite + dt = mergelist(list(lhs[,!"row_id"], rhs[,!"row_id"]), on=on, how=how, mult=mult) + sq = try(silent=TRUE, as.data.table(DBI::dbGetQuery(conn, s))) + if (inherits(sq, "try-error")) { + if (.debug) {message("error during sql statement"); browser()} + stop("error during sql statement") + } + if (!is.data.table(dt) || !is.data.table(sq)) { + if (.debug) {message("dt and sq must be data.table already"); browser()} + stop("dt and sq must be data.table already") + } + if (how %in% c("inner","full")) { + dt2 = mergelist(list(rhs[,!"row_id"], lhs[,!"row_id"]), on=on, how=how, mult=mult) + setcolorder(dt2, neworder=names(dt)) + setattr(dt, "index", integer()) + setattr(dt2, "index", integer()) + r = all.equal(dt, dt2, ignore.row.order=TRUE) + ## check it is symetric + if (!isTRUE(r)) { + if (.debug) {message("mergelist is not symmetric for ", how); browser()} + stop("mergelist is not symmetric for ", how) + } + } + setattr(sq, "index", integer()) + setattr(dt, "index", integer()) + # compare results + a = all.equal(dt, sq, ignore.row.order=TRUE) + b = all.equal(dt, sq, ignore.row.order=TRUE, ignore.col.order=TRUE) + if (!missing(ans)) { + r = all.equal(ans, sq, ignore.row.order=TRUE) + if (!isTRUE(r)) { + if (.debug) browser() + stop("sql does not match to reference answer") + } + } + if (.drop) { + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")) + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")) + } + if (new.conn) suppressWarnings(DBI::dbDisconnect(conn)) + if (isTRUE(b) && !isTRUE(a)) { + if (.debug) browser() + stop("only column order mismatch") + } + if (!isTRUE(a)) { + if (.debug) browser() + cat(sep="\n",c( + sprintf("# dtq:\nmergelist(l, on='%s', how='%s', mult='%s')", paste(on, collapse=", "), how, mult), + sprintf("# sql:\n%s", s), + a, "\n")) + } + isTRUE(a) + } + + batch.join.sql.equal = function(cases, on, hows=c("inner","left","right","full"), mults=c("all","first","last"), .debug=FALSE) { + if ("error" %in% mults) stop("mult=error is not supported") + p = proc.time()[[3L]] + conn = DBI::dbConnect(RSQLite::SQLite()) + ans = list() + dup_n = 0L + for (case in cases) { + l = data(case) + stopifnot(c("lhs","rhs") %in% names(l)) + case = as.character(case) + lhs = l$lhs; rhs = l$rhs + ans[[case]] = list() + # reuse tables, to test if affects sqlite efficiency + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")), silent = TRUE) + try(suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")), silent = TRUE) + # row_id column required as SQL is not ordered, creating on R side + if (!"row_id" %in% names(lhs)) lhs = copy(lhs)[, "row_id" := seq_len(.N)] + if (!"row_id" %in% names(rhs)) rhs = copy(rhs)[, "row_id" := seq_len(.N)] + DBI::dbWriteTable(conn, name="lhs", value=lhs) + DBI::dbWriteTable(conn, name="rhs", value=rhs) + len = prod(length(cases), length(hows), length(mults)) + if (len > (len.warn <- getOption("tests.length.warning", 1e3))) + warning(sprintf("You are about to run %s number of tests. To suppress this warning use 'tests.length.warning' option, set to numeric threshold or Inf.", len.warn)) + for (how in hows) { + ans[[case]][[how]] = list() + for (mult in mults) { + if (!is.null(ans[[case]][[how]][[mult]])) { + dup_n = dup_n+1L + next #warning("Some tests are duplicated, so far ", dup_n) + } + ans[[case]][[how]][[mult]] = join.sql.equal(list(lhs=lhs, rhs=rhs), on=on, how=how, mult=mult, .conn=conn, .drop=FALSE, .debug=.debug) + } + } + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE lhs;")) + suppressWarnings(DBI::dbSendQuery(conn, "DROP TABLE rhs;")) + } + suppressWarnings(DBI::dbDisconnect(conn)) + cat(sprintf("batch.join.sql.equal: %s%s tests completed in %.1fs\n", + len, if (dup_n) sprintf(" (%s duplicated)", dup_n) else "", proc.time()[[3L]] - p)) + ans + } + data = function(case) { + set.seed(108) + if (case == 1L) { # 2 match + lhs = data.table(id = c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(2L,4L,3L,5L), v2=1:4) + } else if (case == 2L) { # 4 match + lhs = data.table(id = c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(7L,5L,3L,1L), v2=1:4) + } else if (case == 3L) { # 1 match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(1L,2L,4L,6L), v2=1:4) + } else if (case == 4L) { # 0 match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(0L,2L,4L,6L), v2=1:4) + } else if (case == 5L) { # 0 match dup + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(0L,2L,2L,6L), v2=1:4) + } else if (case == 6L) { # 1 match dup + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(1L,2L,2L,6L), v2=1:4) + } else if (case == 7L) { # 1 match dup match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id =c(3L,3L,4L,6L), v2=1:4) + } else if (case == 8L) { # 2 match 2 dup match + lhs = data.table(id =c(1L,5L,3L,7L), v1=1:4) + rhs = data.table(id = c(3L,3L,7L,7L), v2=1:4) + } else if (case == 9L) { # 2 dup 2 dup + lhs = data.table(id = c(1L,5L,1L,5L), v1=1:4) + rhs = data.table(id = c(5L,5L,1L,1L), v2=1:4) + } else if (case == 10L) { # 4 dup 4 dup match + lhs = data.table(id = c(1L,1L,1L,1L), v1=1:4) + rhs = data.table(id = c(1L,1L,1L,1L), v2=1:4) + } else if (case == 11L) { # 4 dup 4 dup nomatch + lhs = data.table(id = c(1L,1L,1L,1L), v1=1:4) + rhs = data.table(id = c(2L,2L,2L,2L), v2=1:4) + } else if (case == 12L) { # no match, no overlap + lhs = data.table(id = c(1:4), v1=1:4) + rhs = data.table(id = c(6:9), v2=1:4) + } else if (case == 13L) { # all i matches + lhs = data.table(id = c(1L,5L,3L,7L,9L), v1=1:5) + rhs = data.table(id = c(7L,5L,3L,1L), v2=1:4) + } else if (case == 14L) { # dup match and 1 non-match + ## inner join short circuit test + ## what if some row is excluded but another is duplicated? nrow(i) match + lhs = data.table(id = c(1L,5L,3L,7L,3L), v1=1:5) + rhs = data.table(id = c(7L,5L,3L,2L), v2=1:4) + } else if (case == 15L) { + # does not raise error on mult="error" because dups '13' does not have matching rows! + lhs = data.table(id = as.integer(c(17,14,11,10,5,1,19,7,16,15)), v1=1:10) + rhs = data.table(id = as.integer(c(6,20,13,1,8,13,3,10,17,9)), v2=1:10) + } else if (case == 16L) { + lhs = data.table(id = sample(10L, 10L, TRUE), v1=1:10) + rhs = data.table(id = sample(10L, 10L, TRUE), v2=1:10) + } else if (case == 17L) { + lhs = data.table(id = sample(1e2L, 1e2L, TRUE), v1=1:1e2) + rhs = data.table(id = sample(1e2L, 1e2L, TRUE), v2=1:1e2) + } else if (case == 18L) { + lhs = data.table(id = sample(1e2L, 1e2L, TRUE), v1=1:1e2) + rhs = data.table(id = sample(10L, 20L, TRUE), v2=1:1e2) + } else if (case==19L) { + lhs = as.data.table(list(id=sample(1e3), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3), v2=1:1e3)) + } else if (case==20L) { + lhs = as.data.table(list(id=sample(1e3*2L, 1e3), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3*2L, 1e3), v2=1:1e3)) + } else if (case==21L) { + lhs = as.data.table(list(id=sample(1e3, 1e3*2L, TRUE), v1=1:1e3)) + rhs = as.data.table(list(id=sample(1e3, 1e3*2L, TRUE), v2=1:1e3)) + } else if (case==22L) { ## LHS equals RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=1:2, v2=1:2) + } else if (case==23L) { ## cross join + lhs = data.table(id=c(1L,1L), v1=1:2) + rhs = data.table(id=c(1L,1L), v2=1:2) + } else if (case==24L) { ## cartesian match, dups on both sides of match + lhs = data.table(id=c(1L,1:2), v1=1:3) + rhs = data.table(id=c(1L,1L,3L), v2=1:3) + } else if (case==25L) { ## duplicates in RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=c(2L,2:3), v2=1:3) + } else if (case==26L) { ## duplicates in RHS and LHS, some RHS dups does not have matches in LHS (merge.data.table+mult fails) + lhs = data.table(id=c(1:3,3L), v1=1:4) + rhs = data.table(id=c(1L,1L,3:4,4L), v2=1:5) + } else if (case==27L) { ## duplicates in RHS and LHS, some LHS dups does not have matches in RHS + lhs = data.table(id=c(1L,1L,3:4,4L), v1=1:5) + rhs = data.table(id=c(1:3,3L), v2=1:4) + } else if (case==28L) { ## duplicates in RHS and LHS + lhs = data.table(id=c(1:3,3L), v1=1:4) + rhs = data.table(id=c(1L,1L,3:4), v2=1:4) + } else if (case==29L) { ## duplicates in RHS + lhs = data.table(id=1:2, v1=1:2) + rhs = data.table(id=c(2L,2:3), v2=1:3) + } else if (case==30L) { ## duplicates in LHS + lhs = data.table(id=c(1:2,2L), v1=1:3) + rhs = data.table(id=2:3, v2=1:2) + } else if (case==31L) { + lhs = data.table(id=integer(), v1=integer()) + rhs = data.table(id=integer(), v2=integer()) + } else stop("case not found") + list(lhs=lhs, rhs=rhs) + } + + # tests ---- + + y = batch.join.sql.equal(cases=1:31, on="id", hows=c("inner","left","right","full"), mults=c("all","first","last"), .debug=interactive()) + y = rapply(y, isTRUE) + if (!all(y)) + stop(sprintf("join tests failed for %s cases:\n%s", sum(!y), paste(" ", names(y)[!y], collapse="\n"))) +} diff --git a/man/mergelist.Rd b/man/mergelist.Rd new file mode 100644 index 000000000..bfee1aae1 --- /dev/null +++ b/man/mergelist.Rd @@ -0,0 +1,189 @@ +\name{mergelist} +\alias{mergelist} +\title{Merge multiple data.tables} +\description{ + Faster merge of multiple \code{data.table}s. +} +\usage{ + mergelist(l, on, cols, + how = c("left","inner","full","right","semi","anti","cross"), + mult, copy = TRUE, + join.many = getOption("datatable.join.many")) +} +\arguments{ + \item{l}{ \code{list} of \code{data.table}s to merge. } + \item{on}{ \code{character} vector of column names to merge on; when missing, the \emph{key} of \emph{join-to} table is used. } + \item{cols}{ \code{list} of \code{character} column names corresponding to tables in \code{l}, used to subset columns during merges. } + \item{how}{ \code{character} scalar, controls how to merge tables. Allowed values are \code{"left"} (default), \code{"inner"}, \code{"full"}, \code{"right"}, \code{"semi"}, \code{"anti"}, \code{"cross"}. See Details. } + \item{mult}{ \code{character} scalar, controls how to proceed when multiple rows in \emph{join-to} table match to the row in \emph{join-from} table. Allowed values are \code{"error"}, \code{"all"}, \code{"first"}, \code{"last"}. Default depends on \code{how}, described in \emph{details} below. See examples on how to detect duplicated matches. Using \code{"all"} is recommended together with \code{join.many=FALSE}, unless rows explosion or cartesian product are intended. } + \item{copy}{ \code{logical}, defaults to \code{TRUE}, when \code{FALSE}, then resulting object may share columns with tables in \code{l}, depending on matches. } + \item{join.many}{ \code{logical}, defaults to \code{getOption("datatable.join.many")}, which is \code{TRUE} by default; when \code{FALSE} and \code{mult="all"}, then extra check is made to ensure no \emph{many-to-many} matches exist between tables, and if they exist, then exception is raised. Works similarly to \code{allow.cartesian} option in \code{[.data.table} but is more strict. An option \code{"datatable.join.many"} controls that globally for \code{mergelist} and \code{[.data.table}. } +} +\details{ + Function should be considered experimental. Users are encouraged to provide feedback in our issue tracker. + + Merging is performed sequentially, for \code{l} of 3 tables, it will do something like \code{merge(merge(l[[1L]], l[[2L]]), l[[3L]])}. Merging does not support \emph{non-equi joins}, column names to merge on must be common in both tables on each merge. + + Arguments \code{on}, \code{how}, \code{mult}, \code{join.many} could be lists as well, each of length \code{length(l)-1L}, to provide argument to be used for each single tables pair to merge, see examples. + + Terms \emph{join-to} and \emph{join-from} depends on \code{how} argument: + \enumerate{ + \item{ \code{how="left|semi|anti"}: \emph{join-to} is \emph{RHS}, \emph{join-from} is \emph{LHS}. } + \item{ \code{how="inner|full|cross"}: treats \emph{LHS} and \emph{RHS} tables equally, terms applies to both tables. } + \item{ \code{how="right"}: \emph{join-to} is \emph{LHS}, \emph{join-from} is \emph{RHS}. } + } + + Using \code{mult="error"} will raise exception when multiple rows in \emph{join-to} table match to the row in \emph{join-from} table. It should not be used to just detect duplicates, as duplicates might not have matching row, and in such case exception will not be raised. + + Default value for argument \code{mult} depends on \code{how} argument: + \enumerate{ + \item{ \code{how="left|inner|full|right"}: sets \code{mult="error"}. } + \item{ \code{how="semi|anti"}: sets \code{mult="last"}, although works same as \code{mult="first"}. } + \item{ \code{how="cross"}: sets \code{mult="all"}. } + } + + When \code{on} argument is missing, then columns to join on will be decided based on \emph{key} depending on \code{how} argument: + \enumerate{ + \item{ \code{how="left|right|semi|anti"}: key columns of \emph{join-to} table. } + \item{ \code{how="inner|full"}: if only one table has key, then this key is used, if both tables have key, then \code{intersect(key(lhs), key(rhs))}, having its order aligned to shorter key. } + } + + When joining tables that are not directly linked to single table, e.g. snowflake schema, \emph{right} outer join can be used to optimize the sequence of merges, see examples. +} +\value{ + A new \code{data.table} based on the merged objects. +} +\note{ + Using \code{how="inner|full"} together with \code{mult!="all"} is sub-efficient. Unlike during join in \code{[.data.table}, it will apply \code{mult} on both tables. It is to ensure that the join is symmetric so \emph{LHS} and \emph{RHS} tables can be swapped, regardless of \code{mult} argument. It is always possible to apply \code{mult}-like filter manually and join using \code{mult="all"}. + + Using \code{join.many=FALSE} is sub-efficient. Note that it only takes effect when \code{mult="all"}. If input data are verified to not have duplicated matches, then this can safely use the default \code{TRUE}. Otherwise for \code{mult="all"} merges it is recommended to use \code{join.many=FALSE}, unless of course \emph{many-to-many} join, that duplicates rows, is intended. +} +\seealso{ + \code{\link{[.data.table}}, \code{\link{merge.data.table}} +} +\examples{ +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8), + data.table(id1 = 2:3, v2 = 1:2), + data.table(id1 = 3:5, v3 = 1:3) +) +mergelist(l, on="id1") + +## using keys +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8), + data.table(id1 = 3:5, id2 = 1:3, v2 = 1:3, key="id1"), + data.table(id2 = 1:4, v3 = 4:1, key="id2") +) +mergelist(l) + +## select columns +l = list( + data.table(id1 = c(1:4,2:5), v1 = 1:8, v2 = 8:1), + data.table(id1 = 3:5, v3 = 1:3, v4 = 3:1, v5 = 1L, key="id1") +) +mergelist(l, cols = list(NULL, c("v3","v5"))) + +## different arguments for each merge pair +l = list( + data.table(id1=1:4, id2=4:1), + data.table(id1=c(1:3,1:2), v2=c(1L,1L,1:2,2L)), + data.table(id2=4:5) +) +mergelist(l, + on = list("id1", "id2"), ## first merge on id1, second on id2 + how = list("inner", "anti"), ## first inner join, second anti join + mult = list("last", NULL)) ## use default 'mult' in second join + +## detecting duplicates matches +l = list( + data.table(id1=c(1:4,2:5), v1=1:8), ## dups in LHS are fine + data.table(id1=c(2:3,2L), v2=1:3), ## dups in RHS + data.table(id1=3:5, v3=1:3) +) +#mergelist(l, on="id1") # ERROR: mult='error' and multiple matches during merge +lapply(l[-1L], `[`, j = if (.N>1L) .SD, by = "id1") ## duplicated rows + +## 'star schema' and 'snowflake schema' examples + +### populate fact: US population by state and date + +gt = state.x77[,"Population"] +gt = data.table(state_id=seq_along(state.name), p=gt[state.name]/sum(gt), k=1L) +tt = as.IDate(paste0(as.integer(time(uspop)),"-01-01")) +tt = as.data.table(stats::approx(tt, c(uspop), tt[1L]:tt[length(tt)])) +tt = tt[, .(date=as.IDate(x), date_id=seq_along(x), pop=y, k=1L)] +fact = tt[gt, on="k", allow.cartesian=TRUE, + .(state_id=i.state_id, date_id=x.date_id, population = x.pop * i.p)] +setkeyv(fact, c("state_id","date_id")) + +### populate dimensions: time and geography + +time = data.table(key = "date_id", + date_id = seq_along(tt$date), date = tt$date, + month_id = month(tt$date), month = month.name[month(tt$date)], + year_id = year(tt$date)-1789L, year = as.character(year(tt$date)), + week_id = week(tt$date), week = as.character(week(tt$date)), + weekday_id = wday(tt$date)-1L, weekday = weekdays(tt$date) +)[weekday_id==0L, weekday_id:=7L][] +geog = data.table(key = "state_id", + state_id = seq_along(state.name), state_abb=state.abb, state_name=state.name, + division_id = as.integer(state.division), + division_name = as.character(state.division), + region_id = as.integer(state.region), + region_name = as.character(state.region) +) +rm(gt, tt) + +### denormalize 'star schema' + +l = list(fact, time, geog) +ans = mergelist(l) + +rm(l, ans) + +### turn 'star schema' into 'snowflake schema' + +make.lvl = function(x, cols) { + stopifnot(is.data.table(x)) + lvl = x[, unique(.SD), .SDcols=cols] + setkeyv(lvl, cols[1L]) + setindexv(lvl, as.list(cols)) +} +time = list( + date = make.lvl(time, c("date_id","date","year_id","month_id","week_id", + "weekday_id")), + weekday = make.lvl(time, c("weekday_id","weekday")), + week = make.lvl(time, c("week_id","week")), + month = make.lvl(time, c("month_id","month")), + year = make.lvl(time, c("year_id","year")) +) +geog = list( + state = make.lvl(geog, c("state_id","state_abb","state_name","division_id")), + division = make.lvl(geog, c("division_id","division_name","region_id")), + region = make.lvl(geog, c("region_id","region_name")) +) + +### denormalize 'snowflake schema' + +#### left join all +l = c(list(fact=fact), time, geog) +ans = mergelist(l) + +rm(ans) +#### merge hierarchies alone, reduce sizes in merges of geog dimension +ans = mergelist(list( + fact, + mergelist(time), + mergelist(rev(geog), how="right") +)) + +rm(ans) +#### same but no unnecessary copies +ans = mergelist(list( + fact, + mergelist(time, copy=FALSE), + mergelist(rev(geog), how="right", copy=FALSE) +)) +} +\keyword{ data }