From 556b42cfb758250452eaa5670f7d41ba1ebdd92c Mon Sep 17 00:00:00 2001 From: Arthur Chan Date: Fri, 27 Dec 2024 19:53:25 +0000 Subject: [PATCH] Fix logic Signed-off-by: Arthur Chan --- .clusterfuzzlite/build.sh | 2 +- src/fuzz_introspector/frontends/core.py | 2 +- .../frontends/frontend_rust.py | 147 +++++++++++++++++- src/fuzz_introspector/frontends/oss_fuzz.py | 22 ++- 4 files changed, 165 insertions(+), 8 deletions(-) diff --git a/.clusterfuzzlite/build.sh b/.clusterfuzzlite/build.sh index 3b023beb..fe10b2ae 100755 --- a/.clusterfuzzlite/build.sh +++ b/.clusterfuzzlite/build.sh @@ -16,7 +16,7 @@ hidden_imports="--hidden-import=yaml \ --hidden-import=tree_sitter_c \ --hidden-import=tree_sitter_go \ --hidden-import=tree_sitter_java \ - --hidden-import=tree_sitter_go \ + --hidden-import=tree_sitter_rust \ --hidden-import=pkg_resources.extern \ --hidden-import=rust_demangler" #fuzzers="test_fuzz_cfg_load.py test_fuzz_report_generation.py" diff --git a/src/fuzz_introspector/frontends/core.py b/src/fuzz_introspector/frontends/core.py index a8729f7c..31cd3ad8 100644 --- a/src/fuzz_introspector/frontends/core.py +++ b/src/fuzz_introspector/frontends/core.py @@ -16,7 +16,7 @@ import logging from fuzz_introspector.frontends import (frontend_c, frontend_cpp, frontend_go, - frontend_jvm) + frontend_jvm, frontend_rust) logger = logging.getLogger(name=__name__) diff --git a/src/fuzz_introspector/frontends/frontend_rust.py b/src/fuzz_introspector/frontends/frontend_rust.py index 936e0f24..80922fb5 100644 --- a/src/fuzz_introspector/frontends/frontend_rust.py +++ b/src/fuzz_introspector/frontends/frontend_rust.py @@ -15,7 +15,7 @@ ################################################################################ """Fuzz Introspector Light frontend for Rust""" -from typing import Optional +from typing import Any, Optional import os import pathlib @@ -110,6 +110,15 @@ def has_libfuzzer_harness(self) -> bool: return False + def get_entry_method_name(self) -> Optional[str]: + """Returns the entry method name of the harness if found.""" + if self.has_libfuzzer_harness(): + for func in self.functions: + if func.is_entry_method: + return func.name + + return None + class RustFunction(): """Wrapper for a General Declaration for function""" @@ -154,10 +163,41 @@ def __init__(self, def _process_declaration(self): """Internal helper to process the function/method declaration.""" - for child in self.root.children: - # Process name - if child.type == 'identifier': - self.name = child.text.decode() + # Process name + self.name = self.root.child_by_field_name('name').text.decode() + + # Process return type + return_type = self.root.child_by_field_name('return_type') + if return_type: + self.return_type = return_type.text.decode() + else: + self.return_type = 'void' + + # Process arguments + parameters = self.root.child_by_field_name('parameters') + for param in parameters.children: + if param.type == 'parameter': + for item in param.children: + if item.type == 'identifier': + self.arg_names.append(item.text.decode()) + elif 'type' in item.type: + self.arg_types.append(item.text.decode()) + + # Process signature + signature = self.root.text.decode().split('{')[0] + self.sig = ''.join(line.strip() for line in signature.splitlines() if line.strip()) + + print('@@@@@') + print(self.sig) + print(signature) + print('@@@@@') + +# for child in self.root.children: +# # Process name +# if child.type == 'identifier': +# self.name = child.text.decode() +# +# print(f'{child.type}:{child.text.decode()}') def _process_macro_declaration(self): """Internal helper to process the macro declaration for fuzzing @@ -172,6 +212,103 @@ def _process_macro_declaration(self): # token_tree for body +class Project(): + """Wrapper for doing analysis of a collection of source files.""" + + def __init__(self, source_code_files: list[SourceCodeFile]): + self.source_code_files = source_code_files + + def dump_module_logic(self, + report_name: str, + harness_name: Optional[str] = None): + """Dumps the data for the module in full.""" + logger.info('Dumping project-wide logic.') + report: dict[str, Any] = {'report': 'name'} + report['sources'] = [] + + func_list = [] + for source_code in self.source_code_files: + # Log entry method if provided + entry_method = source_code.get_entry_method_name() + if entry_method: + report['Fuzzing method'] = entry_method + + # Retrieve project information + func_names = [func.name for func in source_code.functions] + report['sources'].append({ + 'source_file': source_code.source_file, + 'function_names': func_names, + }) + + # Process all project methods + for func in source_code.functions: + func_dict: dict[str, Any] = {} + func_dict['functionName'] = func.name + func_dict['functionSourceFile'] = source_code.source_file + func_dict['functionLinenumber'] = func.start_line + func_dict['functionLinenumberEnd'] = func.end_line + func_dict['linkageType'] = '' + func_dict['func_position'] = { + 'start': func.start_line, + 'end': func.end_line + } + func_dict['CyclomaticComplexity'] = func.complexity + func_dict['EdgeCount'] = func_dict['CyclomaticComplexity'] + func_dict['ICount'] = func.icount + func_dict['argNames'] = func.arg_names + func_dict['argTypes'] = func.arg_types + func_dict['argCount'] = len(func_dict['argTypes']) + func_dict['returnType'] = func.return_type + func_dict['BranchProfiles'] = [] + func_dict['Callsites'] = func.detailed_callsites + func_dict['functionUses'] = 0 + func_dict['functionDepth'] = 0 + func_dict['constantsTouched'] = [] + func_dict['BBCount'] = 0 + func_dict['signature'] = func.sig + callsites = func.base_callsites + reached = set() + for cs_dst, _ in callsites: + reached.add(cs_dst) + func_dict['functionsReached'] = list(reached) + + func_list.append(func_dict) + + if func_list: + report['All functions'] = {} + report['All functions']['Elements'] = func_list + + with open(report_name, 'w', encoding='utf-8') as f: + f.write(yaml.dump(report)) + + def extract_calltree(self, + source_file: str, + source_code: SourceCodeFile, + func: Optional[str] = None, + visited_funcs: Optional[set[str]] = None, + depth: int = 0, + line_number: int = -1) -> str: + """Extracts calltree string of a calltree so that FI core can use it.""" + if not visited_funcs: + visited_funcs = set() + + if not func: + func = source_code.get_entry_method_name() + + # TODO Add calltree extraction logic + + return '' + + def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]: + """Gets the source codes that holds libfuzzer harnesses.""" + harnesses = [] + for source_code in self.source_code_files: + if source_code.has_libfuzzer_harness(): + harnesses.append(source_code) + + return harnesses + + def capture_source_files_in_tree(directory_tree: str) -> list[str]: """Captures source code files in a given directory.""" exclude_directories = [ diff --git a/src/fuzz_introspector/frontends/oss_fuzz.py b/src/fuzz_introspector/frontends/oss_fuzz.py index 58d9547c..f73a9b20 100644 --- a/src/fuzz_introspector/frontends/oss_fuzz.py +++ b/src/fuzz_introspector/frontends/oss_fuzz.py @@ -187,11 +187,31 @@ def process_rust_project(target_dir, out): source_files = [] source_files = frontend_rust.capture_source_files_in_tree(target_dir) - # Process tree sitter for go source files + # Process tree sitter for rust source files logger.info('Found %d files to include in analysis', len(source_files)) logger.info('Loading tree-sitter trees') source_codes = frontend_rust.load_treesitter_trees(source_files) + # Create and dump project + logger.info('Creating base project.') + project = frontend_rust.Project(source_codes) + + # Process calltree and method data + for harness in project.get_source_codes_with_harnesses(): + harness_name = harness.source_file.split('/')[-1].split('.')[0] + + # Method data + logger.info(f'Dump methods for {harness_name}') + target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml') + project.dump_module_logic(target, harness_name) + + # Calltree + logger.info(f'Extracting calltree for {harness_name}') + calltree = project.extract_calltree(harness.source_file, harness) + target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data') + with open(target, 'w', encoding='utf-8') as f: + f.write(f'Call tree\n{calltree}') + def analyse_folder(language, directory, entrypoint, out='', module_only=False): if language == 'c':