Skip to content

Commit

Permalink
groovy
Browse files Browse the repository at this point in the history
  • Loading branch information
luisas committed Dec 17, 2024
1 parent 194166c commit 5f3872e
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 60 deletions.
3 changes: 1 addition & 2 deletions assets/samplesheet.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
id,fasta,reference,optional_data
seatoxin-ref,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/setoxin-ref.fa,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/setoxin.ref,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/structures/seatoxin-ref.tar.gz
toxin-ref,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/toxin-ref.fa,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/toxin.ref,
seatoxin-ref,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/setoxin-ref.fa,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/setoxin.ref,https://raw.githubusercontent.com/nf-core/test-datasets/multiplesequencealign/testdata/structures/seatoxin-ref.tar.gz
1 change: 0 additions & 1 deletion assets/toolsheet.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
tree,args_tree,aligner,args_aligner
,,FOLDMASON,
FAMSA,,FAMSA,
1 change: 1 addition & 0 deletions cleaned_trace.csv

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions co2footprint_summary_20241213-53303754.txt

This file was deleted.

6 changes: 0 additions & 6 deletions co2footprint_summary_20241213-53488757.txt

This file was deleted.

1 change: 1 addition & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def co2_timestamp = new java.util.Date().format( 'yyyy-MM-dd_HH-mm-ss')
co2footprint {
traceFile = "${params.outdir}/pipeline_info/co2footprint_trace_${co2_timestamp}.txt"
reportFile = "${params.outdir}/pipeline_info/co2footprint_report_${co2_timestamp}.html"
summaryFile = "${params.outdir}/pipeline_info/co2footprint_summary_${co2_timestamp}.txt"
}

validation {
Expand Down
3 changes: 2 additions & 1 deletion test_merged.csv
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
[]
id,seqlength_mean,seqlength_median,seqlength_max,n_sequences,perc_sim,plddt,tree,args_tree,args_tree_clean,aligner,args_aligner,args_aligner_clean,sp,total_gaps,avg_gaps,TCS,tc,EVALUATED,APDB,iRMSD,NiRMSD,name,realtime_tree,%cpu_tree,rss_tree,peak_rss_tree,vmem_tree,peak_mem_tree,rchar_tree,wchar_tree,cpus_tree,energy_consumption_tree,CO2e_tree,powerdraw_cpu_tree,cpu_model_tree,requested_memory_tree,realtime_aligner,%cpu_aligner,rss_aligner,peak_rss_aligner,vmem_aligner,peak_mem_aligner,rchar_aligner,wchar_aligner,cpus_aligner,energy_consumption_aligner,CO2e_aligner,powerdraw_cpu_aligner,cpu_model_aligner,requested_memory_aligner
seatoxin-ref,47.0,48.0,49,5,46.20,0.833333,null,,default,FAMSA,,default,81.0,20,4,835,46.9,84.62,45.97,1.23,1.45,NFCORE_MULTIPLESEQUENCEALIGN:MULTIPLESEQUENCEALIGN:ALIGN:FAMSA_ALIGN (seatoxin-ref tree: FAMSA),0.016666666666666666,36.9%,0.003,3 MB,5.4 MB,null,77.6 KB,3.5 KB,2,4.28 mWh,2.03 mg,12.0,Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz,12.0 GB,0.0,42.1%,0.0031,3.1 MB,5.4 MB,null,77.7 KB,3.6 KB,2,0.0 pWh,0.0 pg,12.0,Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz,12.0 GB
295 changes: 251 additions & 44 deletions test_merging.groovy
Original file line number Diff line number Diff line change
@@ -1,18 +1,60 @@
@Grab('com.xlson.groovycsv:groovycsv:1.3')
import static com.xlson.groovycsv.CsvParser.parseCsv

def cleanTrace(trace) {

/**
* Saves a list of maps to a CSV file.
*
* @param data The list of maps to be saved. Each map represents a row in the CSV.
* @param fileName The name of the file to save the CSV data to.
*/
def saveMapToCsv(List<Map> data, String fileName) {
if (data.isEmpty()) {
println "No data to write"
return
}

// Extract headers from the keys of the first map
def headers = data[0].keySet().join(',')

// Generate CSV content by joining the values of each map with commas
def csvContent = data.collect { row ->
row.values().join(',')
}.join('\n')

// Write headers and CSV content to the specified file
new File(fileName).withWriter { writer ->
writer.write(headers + '\n' + csvContent)
}
}


/**
* Cleans the trace data by converting each row into a mutable map
* and performing necessary transformations.
*
* The following transformations are performed:
* - Extract the tag from the 'name' column using a regex pattern
* - Extract 'id' and 'args' from the tag
* - Process the 'full_name' to extract workflow and process details
*
* @param trace The trace data to be cleaned.
* @return The cleaned trace data.
*/
def cleanTrace(ArrayList trace) {

// Convert each row into a mutable map for dynamic property addition
def cleanedTrace = trace.collect { row ->
def mutableRow = row.toMap()

def mutableRow = new LinkedHashMap(row)

// Extract the tag from the 'name' column using a regex pattern
def tagMatch = (mutableRow.name =~ /\((.*)\)/)
mutableRow.tag = tagMatch ? tagMatch[0][1] : null

// Extract 'id' and 'args' from the tag safely
mutableRow.id = mutableRow.tag?.tokenize(' ')?.first()
mutableRow.args = mutableRow.tag?.split("args:")?.with { it.size() > 1 ? it[1].trim() : null }
mutableRow.args = mutableRow.tag?.split("args:")?.with { it.size() > 1 ? it[1].trim() : "default" }

// Process the 'full_name' to extract workflow and process details
mutableRow.full_name = mutableRow.name.split(/\(/)?.first()?.trim()
Expand All @@ -27,15 +69,35 @@ def cleanTrace(trace) {
}
}

// if args_tree is null, default
if (mutableRow.args == null) {
mutableRow.args = "default"
}

return mutableRow
}

// Return the cleaned trace
return cleanedTrace.findAll { it != null }
}

// Utility function to convert time strings to minutes
def convertTime(String timeStr) {

/**
* Utility function to convert time strings to minutes.
*
* This function takes a time string in the format of hours, minutes, seconds, and milliseconds,
* and converts it to a total number of minutes.
*
* Example input formats:
* - "1h 30m"
* - "45m 30s"
* - "2h 15m 10s 500ms"
*
* @param timeStr The time string to be converted.
* @return The total time in minutes as a double.
* @throws IllegalArgumentException if the time string is not in the correct format.
*/
def convertTime(String timeStr) {
def pattern = /((?<hours>\d+(\.\d+)?)h)?\s*((?<minutes>\d+(\.\d+)?)m)?\s*((?<seconds>\d+(\.\d+)?)s)?\s*((?<milliseconds>\d+(\.\d+)?)ms)?/
def matcher = timeStr.trim() =~ pattern

Expand All @@ -51,8 +113,20 @@ def convertTime(String timeStr) {
return (hours * 60) + minutes + (seconds / 60) + (milliseconds / 60000)
}

// Utility function to convert memory to GB
def convertMemory(String memory) {
/**
* Utility function to convert memory to GB.
*
* This function takes a memory string with units (GB, MB, KB) and converts it to gigabytes (GB).
*
* Example input formats:
* - "16GB"
* - "2048MB"
* - "1048576KB"
*
* @param memory The memory string to be converted.
* @return The memory in gigabytes as a double, or null if the input is invalid.
*/
def convertMemory(String memory) {
if (!memory) return null

if (memory.contains("GB")) {
Expand All @@ -65,65 +139,198 @@ def convertMemory(String memory) {
return null
}

// Prepare trace trees
def prepTreeTrace(trace) {
def traceTrees = trace.findAll { it.subworkflow == "COMPUTE_TREES" }
traceTrees.each { row ->
row.args_tree = row.args
row.tree = row.process.replace("_GUIDETREE", "")
row.time_tree = convertTime(row.realtime)
row.memory_tree = convertMemory(row.rss)
row.cpus_tree = row.cpus

def prepTrace(trace, suffix_to_replace, subworkflow, keys) {

// Extract the tree and align traces separately
def trace_subworkflow = trace.findAll { it.subworkflow == subworkflow }

// For each row, create a new row with the necessary keys and values
trace_subworkflow.each { row ->
def newRow = [:]

// Clean the names (remove the unnecessary suffix)
newRow.tree = row.process.replace(suffix_to_replace, "")

def suffix = ""
if(subworkflow == "ALIGN") {
suffix = "_aligner"
specific_key = "aligner"
} else if(subworkflow == "COMPUTE_TREES") {
suffix = "_tree"
specific_key = "tree"
}


keys.each { key ->


def newKey = key + suffix

if (key in ['id', 'name', "tree", "aligner"]) {
newKey = key
}

row[specific_key] = row.process.replace(suffix_to_replace, "")

if ((key == 'realtime' || key == 'rss')) {
newRow[newKey] = (key == 'realtime') ? convertTime(row[key]) : convertMemory(row[key])
}else if(key == "args") {
newRow[newKey+"_clean"] = row.args
}else {
newRow[newKey] = row[key]
}
}

row.clear()
row.putAll(newRow)
}
return traceTrees
return trace_subworkflow
}

// Prepare align traces
def prepAlignTrace(trace) {
def traceAlign = trace.findAll { it.subworkflow == "ALIGN" }
traceAlign.each { row ->
row.args_aligner = row.args
row.aligner = row.process.replace("_ALIGN", "")
row.time_align = convertTime(row.realtime)
row.memory_align = convertMemory(row.rss)
row.cpus_align = row.cpus


/**
* Processes the latest trace file in the specified directory based on the given pattern.
*
* This function identifies and parses the latest trace file, filters lines related to evaluation,
* and converts the trace data into CSV format.
*
* @param traceDirPath The path to the directory containing trace files.
* @param filePattern The pattern to identify the trace files.
* @return The parsed CSV data from the trace file.
*/
def latesTraceFileToCSV(String traceDirPath, String filePattern) {
// Identify and parse the latest trace file based on the given pattern
def traceFile = new File(traceDirPath).listFiles().findAll { it.name.startsWith(filePattern) }.sort { -it.lastModified() }.take(1)[0]

// Keep only the lines that report running times related to evaluation
def header = traceFile.readLines()[0].replaceAll("\t", ",")
def traceFileAlign = traceFile.readLines().findAll { it.contains("COMPLETED") && it.contains("MULTIPLESEQUENCEALIGN:ALIGN") }.collect { it.replaceAll("\t", ",") }.join("\n")
def trace = header + "\n" + traceFileAlign

// Parse the trace data into CSV format
def traceCsv = parseCsv(trace)

return traceCsv
}


def keepKeysFromIterator(iterator, keysToKeep) {
def modifiedData = iterator.collect { row ->
def mutableRow = row.toMap().findAll { key, value ->
keysToKeep.contains(key)
}
return mutableRow
}
return traceAlign
// conver back to iterator
modifiedData = modifiedData.collect { it as Map }
return modifiedData
}

def merge_summary_and_traces(summary_file, trace_dir_path, outFileName){

// Read the summary file with the scientific evaluation
def data = new File(summary_file).readLines()
def mergeListsById(list1, list2, idKey) {

// Identify and parse the latest trace file
def trace_file = new File("${trace_dir_path}").listFiles().findAll { it.name.startsWith("execution_trace") }.sort { -it.lastModified() }.take(1)[0]
def map1 = list1.collectEntries { [(it[idKey]): it] }

// Keep only the lines that report running times related to evaluation
def header = trace_file.readLines()[0].replaceAll("\t", ",")
def trace_file_align = trace_file.readLines().findAll { it.contains("CACHED") && it.contains("MULTIPLESEQUENCEALIGN:ALIGN") }.collect { it.replaceAll("\t", ",") }.join("\n")
def trace = header + "\n" + trace_file_align
def trace_csv = parseCsv(trace)
def mergedList = list2.collect { row ->
def id = row[idKey]
def mergedRow = map1.containsKey(id) ? map1[id] + row : row
return mergedRow
}

// convert back to iterator
return mergedList
}

/**
* Processes the latest trace file in the specified directory.
*
* This function identifies and parses the latest trace file based on the given pattern, filters lines related to evaluation,
* cleans the trace data, and extracts tree and alignment traces separately.
*
* @param traceDirPath The path to the directory containing trace files.
* @param filePattern The pattern to identify the trace files.
* @return A map containing the tree traces and alignment traces.
*/
def processLatestTraceFile(String traceDirPath) {

def traceCsv = latesTraceFileToCSV(traceDirPath, "execution_trace")
def co2Csv = latesTraceFileToCSV(traceDirPath, "co2footprint_trace")

co2Csv = keepKeysFromIterator(co2Csv, ["name", "energy_consumption", "CO2e", "powerdraw_cpu", "cpu_model", "requested_memory"])
trace_co2_csv = mergeListsById(traceCsv.collect { it.toMap() }, co2Csv, "name")

keys = ["id","name", "args", "tree", "aligner", "realtime", "%cpu", "rss", "peak_rss", "vmem", "peak_mem", "rchar", "wchar", "cpus", "energy_consumption", "CO2e", "powerdraw_cpu", "cpu_model", "requested_memory"]
// Extract the information from the tag
def cleanTraceData = cleanTrace(trace_co2_csv)

def cleanTraceData = cleanTrace(trace_csv)
def traceTrees = prepTreeTrace(cleanTraceData)
def traceAlign = prepAlignTrace(cleanTraceData)
// Extract the tree and align traces separately
def traceTrees = prepTrace(cleanTraceData, suffix_to_replace = "_GUIDETREE", subworkflow = "COMPUTE_TREES", keys)
def traceAlign = prepTrace(cleanTraceData, suffix_to_replace = "_ALIGN", subworkflow = "ALIGN", keys)


// Return the extracted traces as a map
return [traceTrees: traceTrees, traceAlign: traceAlign]
}



def merge_summary_and_traces(summary_file, trace_dir_path, outFileName){

// -------------------
// TRACE FILE
// -------------------

// 1. Identify and parse the latest trace file
// 2. Clean the trace (only completed tasks, keep only needed columns)
// 3. Extract tree and align traces separately
def trace_file = processLatestTraceFile(trace_dir_path)

// -------------------
// SUMMARY FILE
// -------------------

// Parse the summary data (scientific accuracy file: SP, TC etc.)
def data = parseCsv(new File(summary_file).readLines().collect { it.replaceAll("\t", ",") }.join("\n"))
data = data.collect { row ->
def mutableRow = row.toMap()
return mutableRow
}

print("##############################################################")
print("\n")
print(trace_file)
print("\n\n")
// Merge the summary data with the trace data
def mergedData = []
data.each { row ->
def treeMatch = traceTrees.find { it.id == row.id && it.tree == row.tree && it.args_tree == row.args_tree }
def alignMatch = traceAlign.find { it.id == row.id && it.aligner == row.aligner && it.args_aligner == row.args_aligner }


print("##### matching -----------------------------------------------------------")
print("\n")
print(row)
print("\n")
def treeMatch = trace_file.traceTrees.find { it.id == row.id && it.tree == row.tree && it.args_tree_clean == row.args_tree_clean }
print(treeMatch)
print("\n")
def alignMatch = trace_file.traceAlign.find { it.id == row.id && it.aligner == row.aligner && it.args_aligner_clean == row.args_aligner_clean }
print(alignMatch)
print("\n")
def mergedRow = row + (treeMatch ?: [:]) + (alignMatch ?: [:])
mergedData << mergedRow
}
new File(outFileName).withWriter { writer -> writer.write(mergedData as String) }

// Save the merged data to a file
saveMapToCsv(mergedData, outFileName)

}

outdir = "/home/luisasantus/Desktop/multiplesequencealign/results"
outdir = "/home/luisasantus/Desktop/multiplesequencealign/res2"

def summary_file = "${outdir}/summary/complete_summary_stats_eval.csv"
def outFileName = "${outdir}/../test_merged.csv"
def trace_dir_path = "${outdir}/pipeline_info/"
def co2_path = "${outdir}/pipeline_info/execution_trace_CO2"

merge_summary_and_traces(summary_file, trace_dir_path, outFileName)
Loading

0 comments on commit 5f3872e

Please sign in to comment.