Skip to content

Commit

Permalink
Fix logic
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Chan <arthur.chan@adalogics.com>
  • Loading branch information
arthurscchan committed Dec 27, 2024
1 parent 02fc32f commit 556b42c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .clusterfuzzlite/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ hidden_imports="--hidden-import=yaml \
--hidden-import=tree_sitter_c \
--hidden-import=tree_sitter_go \
--hidden-import=tree_sitter_java \
--hidden-import=tree_sitter_go \
--hidden-import=tree_sitter_rust \
--hidden-import=pkg_resources.extern \
--hidden-import=rust_demangler"
#fuzzers="test_fuzz_cfg_load.py test_fuzz_report_generation.py"
Expand Down
2 changes: 1 addition & 1 deletion src/fuzz_introspector/frontends/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging

from fuzz_introspector.frontends import (frontend_c, frontend_cpp, frontend_go,
frontend_jvm)
frontend_jvm, frontend_rust)

logger = logging.getLogger(name=__name__)

Expand Down
147 changes: 142 additions & 5 deletions src/fuzz_introspector/frontends/frontend_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
################################################################################
"""Fuzz Introspector Light frontend for Rust"""

from typing import Optional
from typing import Any, Optional

import os
import pathlib
Expand Down Expand Up @@ -110,6 +110,15 @@ def has_libfuzzer_harness(self) -> bool:

return False

def get_entry_method_name(self) -> Optional[str]:
"""Returns the entry method name of the harness if found."""
if self.has_libfuzzer_harness():
for func in self.functions:
if func.is_entry_method:
return func.name

return None


class RustFunction():
"""Wrapper for a General Declaration for function"""
Expand Down Expand Up @@ -154,10 +163,41 @@ def __init__(self,

def _process_declaration(self):
"""Internal helper to process the function/method declaration."""
for child in self.root.children:
# Process name
if child.type == 'identifier':
self.name = child.text.decode()
# Process name
self.name = self.root.child_by_field_name('name').text.decode()

# Process return type
return_type = self.root.child_by_field_name('return_type')
if return_type:
self.return_type = return_type.text.decode()
else:
self.return_type = 'void'

# Process arguments
parameters = self.root.child_by_field_name('parameters')
for param in parameters.children:
if param.type == 'parameter':
for item in param.children:
if item.type == 'identifier':
self.arg_names.append(item.text.decode())
elif 'type' in item.type:
self.arg_types.append(item.text.decode())

# Process signature
signature = self.root.text.decode().split('{')[0]
self.sig = ''.join(line.strip() for line in signature.splitlines() if line.strip())

print('@@@@@')
print(self.sig)
print(signature)
print('@@@@@')

# for child in self.root.children:
# # Process name
# if child.type == 'identifier':
# self.name = child.text.decode()
#
# print(f'{child.type}:{child.text.decode()}')

def _process_macro_declaration(self):
"""Internal helper to process the macro declaration for fuzzing
Expand All @@ -172,6 +212,103 @@ def _process_macro_declaration(self):
# token_tree for body


class Project():
"""Wrapper for doing analysis of a collection of source files."""

def __init__(self, source_code_files: list[SourceCodeFile]):
self.source_code_files = source_code_files

def dump_module_logic(self,
report_name: str,
harness_name: Optional[str] = None):
"""Dumps the data for the module in full."""
logger.info('Dumping project-wide logic.')
report: dict[str, Any] = {'report': 'name'}
report['sources'] = []

func_list = []
for source_code in self.source_code_files:
# Log entry method if provided
entry_method = source_code.get_entry_method_name()
if entry_method:
report['Fuzzing method'] = entry_method

# Retrieve project information
func_names = [func.name for func in source_code.functions]
report['sources'].append({
'source_file': source_code.source_file,
'function_names': func_names,
})

# Process all project methods
for func in source_code.functions:
func_dict: dict[str, Any] = {}
func_dict['functionName'] = func.name
func_dict['functionSourceFile'] = source_code.source_file
func_dict['functionLinenumber'] = func.start_line
func_dict['functionLinenumberEnd'] = func.end_line
func_dict['linkageType'] = ''
func_dict['func_position'] = {
'start': func.start_line,
'end': func.end_line
}
func_dict['CyclomaticComplexity'] = func.complexity
func_dict['EdgeCount'] = func_dict['CyclomaticComplexity']
func_dict['ICount'] = func.icount
func_dict['argNames'] = func.arg_names
func_dict['argTypes'] = func.arg_types
func_dict['argCount'] = len(func_dict['argTypes'])
func_dict['returnType'] = func.return_type
func_dict['BranchProfiles'] = []
func_dict['Callsites'] = func.detailed_callsites
func_dict['functionUses'] = 0
func_dict['functionDepth'] = 0
func_dict['constantsTouched'] = []
func_dict['BBCount'] = 0
func_dict['signature'] = func.sig
callsites = func.base_callsites
reached = set()
for cs_dst, _ in callsites:
reached.add(cs_dst)
func_dict['functionsReached'] = list(reached)

func_list.append(func_dict)

if func_list:
report['All functions'] = {}
report['All functions']['Elements'] = func_list

with open(report_name, 'w', encoding='utf-8') as f:
f.write(yaml.dump(report))

def extract_calltree(self,
source_file: str,
source_code: SourceCodeFile,
func: Optional[str] = None,
visited_funcs: Optional[set[str]] = None,
depth: int = 0,
line_number: int = -1) -> str:
"""Extracts calltree string of a calltree so that FI core can use it."""
if not visited_funcs:
visited_funcs = set()

if not func:
func = source_code.get_entry_method_name()

# TODO Add calltree extraction logic

return ''

def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]:
"""Gets the source codes that holds libfuzzer harnesses."""
harnesses = []
for source_code in self.source_code_files:
if source_code.has_libfuzzer_harness():
harnesses.append(source_code)

return harnesses


def capture_source_files_in_tree(directory_tree: str) -> list[str]:
"""Captures source code files in a given directory."""
exclude_directories = [
Expand Down
22 changes: 21 additions & 1 deletion src/fuzz_introspector/frontends/oss_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,31 @@ def process_rust_project(target_dir, out):
source_files = []
source_files = frontend_rust.capture_source_files_in_tree(target_dir)

# Process tree sitter for go source files
# Process tree sitter for rust source files
logger.info('Found %d files to include in analysis', len(source_files))
logger.info('Loading tree-sitter trees')
source_codes = frontend_rust.load_treesitter_trees(source_files)

# Create and dump project
logger.info('Creating base project.')
project = frontend_rust.Project(source_codes)

# Process calltree and method data
for harness in project.get_source_codes_with_harnesses():
harness_name = harness.source_file.split('/')[-1].split('.')[0]

# Method data
logger.info(f'Dump methods for {harness_name}')
target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml')
project.dump_module_logic(target, harness_name)

# Calltree
logger.info(f'Extracting calltree for {harness_name}')
calltree = project.extract_calltree(harness.source_file, harness)
target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data')
with open(target, 'w', encoding='utf-8') as f:
f.write(f'Call tree\n{calltree}')


def analyse_folder(language, directory, entrypoint, out='', module_only=False):
if language == 'c':
Expand Down

0 comments on commit 556b42c

Please sign in to comment.