-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCode
227 lines (204 loc) · 11.8 KB
/
Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#Preliminary: load dependencies manually
needed <- c("RcppRoll", "optiRum")
for(pckg in needed){
if(!pckg %in% installed.packages()){
print(pckg)
install.packages(pckg)
}
library(pckg, character.only = T)
}
#' Expand by periods function
#' Adds to a data table columns that represent the psat value, rolling past average or ratios of other columns.
#' IMPORTANT NOTE: please notice that if doGrid == TRUE (which is by default), the outputted data table will have more rows that the input one and may be required to filter them afterwards.
#'
#' @param dt data.table to expand. The function will modify it
#' @param colsToExpand Columns that will be expanded
#' @param nPeriods Number of periods that will be expanded (admits several values on a vector on a single call)
#' Positive values represent past expansions and negative values represent future expansions
#' @param timeSeriesIDcolNames Name of the column (or columns) that contains the time series identifier (e.g client ID).
#' @param periodColName Name of the column that contains the period. The natural R order of this column should be chronological (large values the latest)
#' @param expandMethods How the columns will be expanded, admits one or more of: "shift", "mean", "median", "min", "max", "prod", "sum", "sd" and "var"
#' @param includeCurrentPeriod Whether the expanded aggregatons will include info about the current period or not.
#' Does not apply for the "shift" expanding method
#' @param colsToRatio Columns of which the ratio will be computed. Ratio is currentValue/expandedValue
#' @param methodsToRatio For which expandMethods the ratios should be computed.
#' @param periodsToRatio For which periods the ratios should be computed.
#' @param doGrid If TRUE, synthetic rows will be created with all combinations of client IDs and periods. This way, if a period is missing for a client the aggregation will be performed with an NA for that period.
#' If FALSE, the n most recent periods will be expanded independently of any temporal gaps.
#' IMPORTANT NOTE: please notice that if set to TRUE, the outputted data table will have more rows that the input one and may be required to filter them afterwards.
#' @param doSort The data.table must be sorted by (timeSeriesIDcolNames, periodColName) for the expanding to work. If the data is already sorted this can be called with FALSE and save some time.
#' @param suffixNewVars Suffix that will be used to name expanded variables, the default values are internally handled to support negative periods and should be fine in most cases
#' @param suffixRatios Suffix that will be used to name the ratios
#' @param verbose If TRUE, prints some information about its execution
#' @param ... Other parameters to pass to aggregating functions (e.g na.rm)
#'
#' @return Returns the provided data table with the added expanded columns
#' @export
#'
#' @examples
#'
#'
#'testDT <- data.table(clientID = c(rep("Client1", 5), rep("Client2", 4)),
#' month_id = c(seqMonth("201701", "201705"), setdiff(seqMonth("201701", "201705"), "201704")),
#' consumption = c(c(0,1,1,2,0), c(8,7,2,1)),
#' billing = c(c(10,12,7,NA,0), c(0,0,0,1))
#')
#'
#'print(expandByPeriods(testDT,
#' colsToExpand = c("consumption", "billing"),
#' nPeriods = c(2,3),
#' timeSeriesIDcolNames = "clientID",
#' periodColName = "month_id",
#' colsToRatio = c("consumption", "billing"),
#' na.rm = T
#'))
#'
#'testDT <- data.table(clientID = c(rep("Client1", 5), rep("Client2", 4)),
#' prodID = c(rep("Prod1", 3), rep("Prod2", 6)),
#' month_id = c(seqMonth("201701", "201705"), setdiff(seqMonth("201701", "201705"), "201704")),
#' consumption = c(c(0,1,1,2,0), c(8,7,2,1)),
#' billing = c(c(10,12,7,NA,0), c(0,0,0,1))
#')
#'
#'print(expandByPeriods(testDT,
#' colsToExpand = c("consumption", "billing"),
#' nPeriods = c(2,3),
#' timeSeriesIDcolNames = c("clientID", "prodID"),
#' periodColName = "month_id",
#' colsToRatio = c("consumption", "billing"),
#' na.rm = T
#'))
#'
#'
expandByPeriods <- function(dt,
colsToExpand,
nPeriods,
timeSeriesIDcolNames,
periodColName = "month_id",
expandMethods = "mean",
includeCurrentPeriod = TRUE,
colsToRatio = c(),
methodsToRatio = expandMethods,
periodsToRatio = nPeriods,
doGrid = TRUE,
doSort = TRUE,
suffixNewVars = ifelse(expandMethods == "shift", "_p", "_" %+% expandMethods %+% "Prev"),
suffixRatios = ifelse(methodsToRatio == "shift", "_p", "_" %+% methodsToRatio %+% "Prev") %+% "Ratio",
verbose = TRUE,
...){
#Error control
if(doGrid == TRUE & doSort == FALSE){
warning("doSort should not be FALSE when doGrid = TRUE because doing the grid may alter the table order. doSort will be set to TRUE")
doSort <- TRUE
}
possibleMethods <- c("shift", "mean", "median", "min", "max", "prod", "sum", "sd", "var")
badMethods <- setdiff(expandMethods, possibleMethods)
if(length(badMethods) != 0){
stop("expandMethods " %+% paste(badMethods, collapse = ", ") %+% " not recognized")
}
if(length(setdiff(methodsToRatio, expandMethods)) != 0){
warning("Ratios won't be computed for methods that are not in expandMethods, please add them in expandMethods")
}
if(length(setdiff(periodsToRatio, nPeriods)) != 0){
warning("Ratios won't be computed for periods that are not in nPeriods, please add them in nPeriods")
}
#Create a grid of all cilent and period combinations to fill time gaps in the data
if(doGrid){
cond_cat(verbose, "Creating grid...\n")
tic()
#There are two methods to do the grid
if(length(timeSeriesIDcolNames) == 1){
#CJ function does a faster grid but can add too many extra rows when there are more than one column to identify the time series.
#That is because it takes all possible combinations even if they dont exist beforehand
#This is done with do.call because that way it works for more than 1 time series ID column even though this section is only run when there's 1 column
dt.grid <- do.call(CJ, lapply(c(timeSeriesIDcolNames, periodColName),
function(colName){ dt[, unique(get(colName))]}))
}else{
#CJ.dt is slower (30% more time in preliminary tests) but allows us to consider only time series IDs columns combinations that exist on the dataset
dt.grid <- CJ.dt(unique(dt[, timeSeriesIDcolNames, with=FALSE]),
unique(dt[, .(get(periodColName))]))
}
names(dt.grid) <- c(timeSeriesIDcolNames, periodColName)
dt <- dt[dt.grid, on = c(timeSeriesIDcolNames, periodColName)]
toc(quiet = !verbose)
gc()
}
#Sort the table so that we expand the correct periods of time
if(doSort){
cond_cat(verbose, "Sorting table...\n")
tic()
setorderv(dt,
c(timeSeriesIDcolNames, periodColName),
c(rep(1, length(timeSeriesIDcolNames)), 1))
toc(quiet = !verbose)
}else{
warning("Calling expandByPeriods with doSort = FALSE will generate a corrupt output if the data was not sorted beforehand")
}
cond_cat(verbose, "Expanding periods...\n")
tic()
for(currentMethod in expandMethods){
#Get the name of the function we will use to expand the periods
if(currentMethod == "shift"){
#Use shift function if we just want to shift a column
expandFunName <- "shift"
}else{
#For rolling aggregates, use the corresponding function from the RcppRoll package
expandFunName <- "roll_" %+% currentMethod
}
for(currentPeriod in nPeriods){
#Names on the columns we will create. We need to expand to compute ratios as well
currentSuffix <- suffixNewVars[currentMethod == expandMethods]
#If the default suffixes are being used and the expand is negative (adding forward info), change the suffixes to reflect that they're forward
if(all(suffixNewVars == ifelse(expandMethods == "shift", "_p", "_" %+% expandMethods %+% "Prev")) & currentPeriod < 0){
currentSuffix <- str_replace_all(currentSuffix, "Prev", "Fwd")
currentSuffix <- str_replace_all(currentSuffix, "_p", "_f")
}
#We need to expand also columns of which we only need the ratio in order to compute the ratio
colsToExpand_all <- unique(c(colsToExpand, colsToRatio))
expandedColsNames <- colsToExpand_all %+% currentSuffix %+% abs(currentPeriod)
#Expand the periods
if(expandFunName == "shift"){
dt[, (expandedColsNames) := lapply(.SD, expandFunName, n = abs(currentPeriod), type = ifelse(currentPeriod >= 0 , "lag", "lead")), .SDcols = colsToExpand_all]
}else{
dt[, (expandedColsNames) := lapply(.SD, expandFunName, n = abs(currentPeriod), fill = NA, align = ifelse(currentPeriod >= 0 , "right", "left"), ...), .SDcols = colsToExpand_all]
}
#Shift the newly created vars if the current period should not be included
if(!includeCurrentPeriod & expandFunName != "shift"){
dt[, (expandedColsNames) := lapply(.SD, shift, n = 1, type = ifelse(currentPeriod >= 0 , "lag", "lead")), .SDcols = expandedColsNames]
}
#Set rows that dont have enough past periods to expand them to NA.
#If there are several columns that identify the time series ID create an auxiliary ID on a single column
#This makes it easier to put the NAs on the new columns when any of the time series ID columns changes
if(length(timeSeriesIDcolNames) > 1){
dt[, auxTimeSeriesID_EBP := Reduce(function(x,y){paste(x, y, sep = "__EBPsep__")}, .SD), .SDcols = timeSeriesIDcolNames]
}else{
dt[, auxTimeSeriesID_EBP := get(timeSeriesIDcolNames)]
}
dt[auxTimeSeriesID_EBP != shift(auxTimeSeriesID_EBP,
n = abs(currentPeriod) - ifelse(expandFunName == "shift" | !includeCurrentPeriod, 0, 1),
type = ifelse(currentPeriod >= 0 , "lag", "lead"))
, (expandedColsNames) := NA]
#Delete the auxiliary time series ID that we created
dt[, auxTimeSeriesID_EBP := NULL]
#Compute the ratios when corresponds
if(currentMethod %in% methodsToRatio & currentPeriod %in% periodsToRatio){
currentSuffixRatios <- suffixRatios[currentMethod == methodsToRatio]
#If the default suffixes are being used and the expand is negative (adding forward info), change the suffixes to reflect that they're forward
if(all(suffixRatios == ifelse(methodsToRatio == "shift", "_p", "_" %+% methodsToRatio %+% "Prev") %+% "Ratio") & currentPeriod < 0){
currentSuffixRatios <- str_replace_all(currentSuffixRatios, "Prev", "Fwd")
currentSuffixRatios <- str_replace_all(currentSuffixRatios, "_p", "_f")
}
for(colName in colsToRatio){
ratioColName <- colName %+% currentSuffixRatios %+% abs(currentPeriod)
dt[, (ratioColName) := get(colName) / get(colName %+% currentSuffix %+% abs(currentPeriod))]
#If for that variable only the ratio and not the expand was required, remove the expand
if(!colName %in% colsToExpand){
dt[, (colName %+% currentSuffix %+% abs(currentPeriod)) := NULL]
}
}
}
}
}
toc(quiet = !verbose)
return(dt)
}