diff --git a/examples/contracts/dev.contract b/examples/contracts/dev.contract index 6eb5f6516..4e0fe5609 100644 --- a/examples/contracts/dev.contract +++ b/examples/contracts/dev.contract @@ -88,7 +88,7 @@ component PIDThrottleSystem(target_dist, max_speed): return {"throttle": float(throttle)} -component ThrottleSafetyFilter(min_dist, min_slowdown, max_brake=5, buffer_padding=0): +component ThrottleSafetyFilter(min_dist, max_brake=5, buffer_padding=0): """ A component that modulates actions, passing them through unchanged unless we are dangerously close to the car in front of us, in which case the actions are swapped to brake with maximum force. @@ -115,8 +115,8 @@ component ThrottleSafetyFilter(min_dist, min_slowdown, max_brake=5, buffer_paddi # If we are in the "danger zone", brake HARD. Otherwise, pass through the inputs actions action. rel_speed = (state.last_dist - dist)/timestep - stopping_time = math.ceil(rel_speed/min_slowdown)+1 - rel_dist_covered = stopping_time*speed + (max_brake - min_slowdown)*(stopping_time*(stopping_time+1))/2 + stopping_time = ceil(rel_speed/max_brake)+1 + rel_dist_covered = stopping_time*rel_speed danger_dist = min_dist + buffer_padding + max(0, rel_dist_covered) # Update last_dist @@ -167,7 +167,7 @@ component ActionGenerator(): return {"control_action": action} -component ControlSystem(target_dist, max_speed, min_dist, min_slowdown): +component ControlSystem(target_dist, max_speed, min_dist, max_brake): """ The control system for a car, combining a PID controller with a safety filter to generate actions. """ @@ -184,7 +184,7 @@ component ControlSystem(target_dist, max_speed, min_dist, min_slowdown): compose: # Create sub-components pid_ts = PIDThrottleSystem(target_dist, max_speed) - tsf = ThrottleSafetyFilter(min_dist, min_slowdown) + tsf = ThrottleSafetyFilter(min_dist, max_brake) pid_ss = PIDSteeringSystem() ag = ActionGenerator() @@ -217,12 +217,12 @@ component CarControls(): actions: control_action: RegulatedControlAction -component Car(stddev, target_dist, max_speed, min_dist, min_slowdown): +component Car(stddev, target_dist, max_speed, min_dist, max_brake): compose: ps = NoisyDistanceSystem(stddev) sm = Speedometer() ds = DirectionSystem() - cs = ControlSystem(target_dist, max_speed, min_dist, min_slowdown) + cs = ControlSystem(target_dist, max_speed, min_dist, max_brake) cc = CarControls() # Connect sensors inputss to controller @@ -314,8 +314,8 @@ contract SafeThrottleFilter(perception_distance, min_dist, abs_dist_err=0.5, abs relative_speed = ((next dist) - dist)/self.timestep if (next(next dist)) else ((next dist) - dist)/self.timestep true_relative_speed = lead_car.speed - speed - stopping_time = math.ceil(speed/max_brake) - rel_dist_covered = stopping_time*speed + (true_relative_speed+abs_speed_err)(max_brake-min_slowdown)*(stopping_time*(stopping_time+1))/2 + stopping_time = ceil(speed/max_brake) + rel_dist_covered = stopping_time*(speed + abs_speed_err) buffer_dist = min_dist + rel_dist_covered guarantees: @@ -337,7 +337,7 @@ contract PassthroughBrakingActionGenerator(): guarantees: always throttle == -1 implies (control_action.throttle == 0 and control_action.brake == 1) -contract MaxBrakingForce(min_slowdown): +contract MaxBrakingForce(max_brake): actions: control_action: RegulatedControlAction @@ -351,7 +351,7 @@ contract MaxBrakingForce(min_slowdown): ) ## Overall System Spec ## -contract KeepsDistance(perception_distance, min_dist, max_brake=float("inf"), max_accel=float("inf")): +contract KeepsDistance(perception_distance, min_dist, max_speed, abs_dist_err, abs_speed_err, max_brake=float("inf"), max_accel=float("inf")): """ A contract stating we stay a safe distance from the car in front of us.""" globals: objects @@ -366,9 +366,9 @@ contract KeepsDistance(perception_distance, min_dist, max_brake=float("inf"), ma true_relative_speed = lead_car.speed - self.speed - stopping_time = math.ceil(self.speed/max_brake) + stopping_time = ceil(self.speed/max_brake) rel_dist_covered = stopping_time*self.speed + (true_relative_speed) - delta_stopping_time = math.ceil((self.speed+max_accel)/max_brake) + delta_stopping_time = ceil((self.speed+max_accel)/max_brake) max_rdc_delta = delta_stopping_time*self.speed + (true_relative_speed + max_brake + max_accel + abs_speed_err) buffer_dist = min_dist + (max(0, max_rdc_delta + rel_dist_covered)) @@ -401,19 +401,19 @@ contract KeepsDistance(perception_distance, min_dist, max_brake=float("inf"), ma ## Verification Steps ## # Constants STDDEV = 0.1 -TARGET_DIST = 10 -MAX_SPEED = 26.8224 # 60 mph in m/s MIN_DIST = 5 -MIN_SLOWDOWN = 4.57 # 15 feet per second in m/s -MAX_BRAKE = 5 # TODO: Find reasonable value -MAX_ACCEL = 5 # TODO: Find reasonable value -PERCEPTION_DISTANCE = 250 +MAX_BRAKE = 0.9 +MAX_ACCEL = 0.5 +PERCEPTION_DISTANCE = 1000 +ABS_DIST_ERR = 1 +ABS_SPEED_ERR = 0.4 +MAX_SPEED = 5.4 + +TARGET_DIST = 10 -ABS_DIST_ERR = 0.5 -ABS_SPEED_ERR = 4 # Instantiate Car component and link to object. -implement "EgoCar" with Car(STDDEV, TARGET_DIST, MAX_SPEED, MIN_DIST, MIN_SLOWDOWN) as car +implement "EgoCar" with Car(STDDEV, TARGET_DIST, MAX_SPEED, MIN_DIST, MAX_BRAKE) as car accurate_distance = test car.ps satisfies AccurateDistance( PERCEPTION_DISTANCE, @@ -438,14 +438,13 @@ cs_safety = compose over car.cs: assume car.cs.tsf satisfies SafeThrottleFilter( PERCEPTION_DISTANCE, MIN_DIST, - MIN_SLOWDOWN, abs_speed_err=ABS_SPEED_ERR, max_brake=MAX_BRAKE) # TODO: Replace with proof assume car.cs.ag satisfies PassthroughBrakingActionGenerator() -max_braking_force = assume car.cc satisfies MaxBrakingForce(MIN_SLOWDOWN), with correctness 0.99, with confidence 0.99 +max_braking_force = assume car.cc satisfies MaxBrakingForce(MAX_BRAKE), with correctness 0.99, with confidence 0.99 keeps_distance_raw = compose over car: use ps_accuracy @@ -454,7 +453,10 @@ keeps_distance_raw = compose over car: keeps_distance = refine keeps_distance_raw as KeepsDistance(PERCEPTION_DISTANCE, MIN_DIST, + MAX_SPEED, + ABS_DIST_ERR, + ABS_SPEED_ERR, max_brake=MAX_BRAKE, - max_accel=MAX_ACCEL) using LeanRefinementProof("KeepsDistanceRefinement") + max_accel=MAX_ACCEL) using LeanRefinementProof(localPath("KeepsDistanceRefinement")) verify keeps_distance diff --git a/src/scenic/contracts/composition.py b/src/scenic/contracts/composition.py index 455e16efe..848824e87 100644 --- a/src/scenic/contracts/composition.py +++ b/src/scenic/contracts/composition.py @@ -48,7 +48,7 @@ def __init__( # Compute general port-variable mapping # Encoding map is a dictionary mapping tuples (port_name, subcomponent_name) # to a temporary variable. Top level ports have None as subcomponent name. - encoding_map = {} + self.encoding_map = {} self.var_num = 0 # Assign variables to all connections @@ -56,12 +56,14 @@ def __init__( # Check if we already have a temporary variable name and if not # come up with a new intermediate variable name. temp_var_name = ( - encoding_map[source] if source in encoding_map else self.tempVarName() + self.encoding_map[source] + if source in self.encoding_map + else self.tempVarName() ) # Map both variables to the new name - encoding_map[source] = temp_var_name - encoding_map[dest] = temp_var_name + self.encoding_map[source] = temp_var_name + self.encoding_map[dest] = temp_var_name # Assign any remaining subcomponent port variables to temp names for sc_name, sc_obj in subcomponents.items(): @@ -70,8 +72,8 @@ def __init__( for port in ports: key = (port, sc_name) - if key not in encoding_map: - encoding_map[key] = self.tempVarName() + if key not in self.encoding_map: + self.encoding_map[key] = self.tempVarName() # Assign any remaining top level port variables to temp names tl_ports = list(self.component.inputs_types.keys()) + list( @@ -80,48 +82,48 @@ def __init__( for port in tl_ports: key = (port, None) - if key not in encoding_map: - encoding_map[key] = self.tempVarName() + if key not in self.encoding_map: + self.encoding_map[key] = self.tempVarName() # Compute which temporary variables are internal/external. input_temp_vars = { var - for key, var in encoding_map.items() + for key, var in self.encoding_map.items() if key[1] is None and key[0] in self.component.inputs_types } output_temp_vars = { var - for key, var in encoding_map.items() + for key, var in self.encoding_map.items() if key[1] is None and key[0] in self.component.outputs_types } - internal_temp_vars = set(encoding_map.values()) - ( + internal_temp_vars = set(self.encoding_map.values()) - ( input_temp_vars | output_temp_vars ) # Compute encoding transformer for each subcomponent and the top level component # The encoding transformer should encode all port variables to the appropriate # temp variable. - encoding_transformers = {} + self.encoding_transformers = {} # Subcomponents for subcomponent in subcomponents: name_map = {} - for source_info, target_name in encoding_map.items(): + for source_info, target_name in self.encoding_map.items(): if source_info[1] == subcomponent: name_map[source_info[0]] = target_name - encoding_transformers[ + self.encoding_transformers[ self.component.subcomponents[subcomponent] ] = NameSwapTransformer(name_map) # Top level component name_map = {} - for source_info, target_name in encoding_map.items(): + for source_info, target_name in self.encoding_map.items(): if source_info[1] == None: name_map[source_info[0]] = target_name - encoding_transformers[self.component] = NameSwapTransformer(name_map) + self.encoding_transformers[self.component] = NameSwapTransformer(name_map) # Compute decoding transformer for assumptions and guarantees # We need two decoding transformers because temp variables can get mapped to different @@ -129,12 +131,12 @@ def __init__( # (specifically in the case where a variable is passed through a component unchanged). assumption_decoding_map = {} for port in self.component.inputs_types.keys(): - assumption_decoding_map[encoding_map[((port, None))]] = port + assumption_decoding_map[self.encoding_map[((port, None))]] = port assumption_decoding_transformer = NameSwapTransformer(assumption_decoding_map) guarantee_decoding_map = {} for port in self.component.outputs_types.keys(): - guarantee_decoding_map[encoding_map[((port, None))]] = port + guarantee_decoding_map[self.encoding_map[((port, None))]] = port guarantee_decoding_transformer = NameSwapTransformer(guarantee_decoding_map) # Move through sub_stmts linearly, checking assumptions and accumulating guarantees @@ -142,7 +144,7 @@ def __init__( tl_guarantees = [] for sub_stmt in self.sub_stmts: - encoding_transformer = encoding_transformers[sub_stmt.component] + encoding_transformer = self.encoding_transformers[sub_stmt.component] ## Copy and encode assumptions and guarantees ## assumptions = [deepcopy(spec) for spec in sub_stmt.assumptions] diff --git a/src/scenic/contracts/refinement.py b/src/scenic/contracts/refinement.py index 7f59796c5..a3591c29e 100644 --- a/src/scenic/contracts/refinement.py +++ b/src/scenic/contracts/refinement.py @@ -1,10 +1,15 @@ +from copy import deepcopy from functools import cached_property +from pathlib import Path +from scenic.contracts.composition import Composition from scenic.contracts.contracts import ContractResult, VerificationTechnique +from scenic.syntax.compiler import NameSwapTransformer class Refinement(VerificationTechnique): def __init__(self, stmt, contract, method): + assert isinstance(stmt, Composition) self.stmt = stmt self.contract = contract self.method = method @@ -56,9 +61,121 @@ def __init__(self, proof_loc): self.proof_loc = proof_loc def check(self, stmt, contract): - # breakpoint() - # - pass + ## Extract and standardize intermediate assumptions and guarantees + i_assumptions = [] + i_guarantees = [] + for sub_stmt in stmt.sub_stmts: + encoding_transformer = stmt.encoding_transformers[sub_stmt.component] + + ## Copy and encode assumptions and guarantees ## + assumptions = [deepcopy(spec) for spec in sub_stmt.assumptions] + guarantees = [deepcopy(spec) for spec in sub_stmt.guarantees] + + for spec in assumptions + guarantees: + spec.applyAtomicTransformer(encoding_transformer) + + ## Add specs to accumulated i specs + i_guarantees += guarantees + + ## Simplify assumptions and guarantees, removing duplicates + new_i_assumptions = [] + for spec in i_assumptions: + if not any(spec == e_spec for e_spec in new_i_assumptions): + new_i_assumptions.append(spec) + i_assumptions = new_i_assumptions + + new_i_guarantees = [] + for spec in i_guarantees: + if not ( + any(spec == e_spec for e_spec in new_i_guarantees) + and not any(spec == e_spec for e_spec in new_i_assumptions) + ): + new_i_guarantees.append(spec) + i_guarantees = new_i_guarantees + + ## Extract and standardize refinement assumptions and guarantees + tl_assumptions = [deepcopy(spec) for spec in contract.assumptions] + tl_guarantees = [deepcopy(spec) for spec in contract.guarantees] + + for spec in tl_assumptions + tl_guarantees: + spec.applyAtomicTransformer(stmt.encoding_transformers[stmt.component]) + + ## Extract atomics from all specs, which will form the signals in our trace + atomics = [] + atomics_types = {} + for spec in i_assumptions + i_guarantees + tl_assumptions + tl_guarantees: + for na_name, na_type in spec.getAtomics(): + if all(na_name != a[0] for a in atomics): + atomics.append((na_name, na_type)) + + prop_atomics = [a for a, _ in filter(lambda x: x[1] is bool, atomics)] + real_atomics = [a for a, _ in filter(lambda x: x[1] is float, atomics)] + + ## Extract defs from all specs + defs = {} + for spec in i_assumptions + i_guarantees + tl_assumptions + tl_guarantees: + for d_name, d_spec in spec.getDefs(): + if d_name not in defs: + defs[d_name] = d_spec + else: + if defs[d_name] != d_spec: + # TODO: Handle renames + assert False + + print("Atomics:") + for a in atomics: + print(f" {a[0]}: {a[1]}") + print() + + print("Defs:") + for d_name, d_spec in defs.items(): + print(f" {d_name}: {d_spec}") + print() + + ## Setup proof directory + Path(self.proof_loc).mkdir(parents=True, exist_ok=True) + + with open(self.proof_loc / "Lib.lean", "w") as f: + # Imports + f.write("import Sceniclean.Basic\n\n") + + # TraceState structure + f.write("structure TraceState where\n") + f.write(" -- Props\n") + for i, a in enumerate(prop_atomics): + f.write(f" P{i}: Prop\n") + f.write(" -- Reals\n") + for i, a in enumerate(real_atomics): + f.write(f" N{i}: Real\n") + f.write("\n") + + # Prop Signals + f.write("-- Prop Signals\n") + for i, a in enumerate(prop_atomics): + f.write( + f" abbrev {a.toLean()} : TraceSet TraceState := TraceSet.of (·.P{i})\n" + ) + f.write("\n") + + # Real Signals + f.write("-- Real Signals\n") + for i, a in enumerate(real_atomics): + f.write( + f" abbrev {a.toLean()} : TraceSet TraceState := TraceSet.of (·.N{i})\n" + ) + f.write("\n") + + # Defs + f.write("-- Defs\n") + for d_name, d_spec in defs.items(): + f.write(f"abbrev {d_name} := FLTL[{d_spec.toLean()}]\n") + f.write("\n") + + breakpoint() + + ## Output Lean Library File + + breakpoint() def __str__(self): return f"LeanProof ('{self.proof_loc}')" diff --git a/src/scenic/contracts/specifications.py b/src/scenic/contracts/specifications.py index 6b1fe34bd..d7862c960 100644 --- a/src/scenic/contracts/specifications.py +++ b/src/scenic/contracts/specifications.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod import ast from copy import copy, deepcopy -from itertools import zip_longest +from itertools import chain, zip_longest import scenic.core.propositions as propositions import scenic.syntax.ast as scenic_ast @@ -24,26 +24,27 @@ def convert(self, node): assert isinstance(node, ast.AST) if isinstance(node, scenic_ast.Always): - return Always(self.convert(node.value), self.defInfo) + return Always(self.convert(node.value)) if isinstance(node, scenic_ast.Eventually): - return Eventually(self.convert(node.value), self.defInfo) - if isinstance(node, scenic_ast.Next): - return Next(self.convert(node.value), self.defInfo) + return Eventually(self.convert(node.value)) + if isinstance(node, scenic_ast.ContractNext): + return Next(self.convert(node.target)) if isinstance(node, scenic_ast.UntilOp): - return Until(self.convert(node.left), self.convert(node.right), self.defInfo) + return Until(self.convert(node.left), self.convert(node.right)) if isinstance(node, scenic_ast.ImpliesOp): - return Implies( - self.convert(node.hypothesis), self.convert(node.conclusion), self.defInfo - ) + return Implies(self.convert(node.hypothesis), self.convert(node.conclusion)) - if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not): - return Not(self.convert(node.operand), self.defInfo) + if isinstance(node, ast.UnaryOp): + if isinstance(node.op, ast.USub): + return Neg(self.convert(node.operand)) + if isinstance(node.op, ast.Not): + return Not(self.convert(node.operand)) if isinstance(node, ast.BoolOp): if isinstance(node.op, ast.And): - return And([self.convert(v) for v in node.values], self.defInfo) + return And([self.convert(v) for v in node.values]) if isinstance(node.op, ast.Or): - return Or([self.convert(v) for v in node.values], self.defInfo) + return Or([self.convert(v) for v in node.values]) if isinstance(node, ast.Compare): if len(node.ops) == 1: @@ -63,7 +64,7 @@ def convert(self, node): p1 = self.convert(node.left) p2 = self.convert(node.comparators[0]) - return Op(p1, p2, self.defInfo) + return Op(p1, p2) if len(node.ops) == 2: Op1 = None @@ -95,10 +96,7 @@ def convert(self, node): p2 = self.convert(node.comparators[0]) p3 = self.convert(node.comparators[1]) - return And( - [Op1(p1, p2, self.defInfo), Op2(p2, p3, self.defInfo)], - self.defInfo, - ) + return And([Op1(p1, p2), Op2(p2, p3)]) if isinstance(node, ast.BinOp): Op = None @@ -115,18 +113,40 @@ def convert(self, node): p1 = self.convert(node.left) p2 = self.convert(node.right) - return Op(p1, p2, self.defInfo) + return Op(p1, p2) + + if isinstance(node, ast.Call): + if ( + isinstance(node.func, ast.Name) + and node.func.id == "ceil" + and len(node.args) == 1 + ): + return Ceil(self.convert(node.args[0])) + + if ( + isinstance(node.func, ast.Name) + and node.func.id == "min" + and len(node.args) == 2 + ): + return Min(self.convert(node.args[0]), self.convert(node.args[1])) + + if ( + isinstance(node.func, ast.Name) + and node.func.id == "max" + and len(node.args) == 2 + ): + return Max(self.convert(node.args[0]), self.convert(node.args[1])) if isinstance(node, ast.Constant): - return ConstantSpecNode(node.value, self.defInfo) + return ConstantSpecNode(node.value) + + if isinstance(node, ast.Name) and node.id in self.defSpecs: + return DefSpecNode(node.id, self.defInfo) return Atomic(node, self.defInfo) class SpecNode(ABC): - def __init__(self, defInfo): - self.defSyntaxTrees, self.defSpecs = defInfo - @abstractmethod def applyAtomicTransformer(self, transformer): pass @@ -135,15 +155,37 @@ def applyAtomicTransformer(self, transformer): def getAtomicNames(self): pass - @staticmethod - def syntaxTreeToSyntaxVal(targetTree, syntaxMappings): - # Check if there's a tree in the mappings that's equivalent to this one - for existingTree in syntaxMappings: - if equivalentAST(targetTree, existingTree): - return syntaxMappings[existingTree] + @abstractmethod + def getDefs(self): + pass + + @abstractmethod + def getAtomics(self): + pass + + @abstractmethod + def toLean(self): + pass + + +class Atomic(SpecNode): + def __init__(self, ast, defInfo): + self.ast = ast + self.defSyntaxTrees, _ = defInfo + + def applyAtomicTransformer(self, transformer): + self.ast = transformer.visit(self.ast) + + def getAtomicNames(self): + nf = NameFinder() + nf.visit(self.ast) + return tuple(nf.names) + + def getDefs(self): + return tuple() - syntaxMappings[targetTree] = len(syntaxMappings) - return syntaxMappings[targetTree] + def getAtomics(self, ctx=bool): + return ((self, ctx),) @staticmethod def equivalentAST(node1, node2, node1_defSyntaxTrees, node2_defSyntaxTrees): @@ -166,7 +208,7 @@ def equivalentAST(node1, node2, node1_defSyntaxTrees, node2_defSyntaxTrees): "ctx", }: continue - if not SpecNode.equivalentAST( + if not Atomic.equivalentAST( val, getattr(node2, name), node1_defSyntaxTrees, node2_defSyntaxTrees ): return False @@ -174,25 +216,15 @@ def equivalentAST(node1, node2, node1_defSyntaxTrees, node2_defSyntaxTrees): elif isinstance(node1, list) and isinstance(node2, list): return all( - SpecNode.equivalentAST(n1, n2, node1_defSyntaxTrees, node2_defSyntaxTrees) + Atomic.equivalentAST(n1, n2, node1_defSyntaxTrees, node2_defSyntaxTrees) for n1, n2 in zip_longest(node1, node2) ) else: return node1 == node2 - -class Atomic(SpecNode): - def __init__(self, ast, defInfo): - self.ast = ast - super().__init__(defInfo) - - def applyAtomicTransformer(self, transformer): - self.ast = transformer.visit(self.ast) - - def getAtomicNames(self): - nf = NameFinder() - nf.visit(self.ast) - return nf.names + def toLean(self, ctx=bool): + # TODO: Better name replacement + return str(self).replace("[", "_").replace("]", "_") def __eq__(self, other): return type(self) is type(other) and self.equivalentAST( @@ -210,19 +242,56 @@ def __str__(self): return ast.unparse(self.ast) +class DefSpecNode(SpecNode): + def __init__(self, name, defInfo): + self.name = name + _, self.defSpecs = defInfo + + def applyAtomicTransformer(self, transformer): + self.defSpecs[self.name].applyAtomicTransformer(transformer) + + def getAtomicNames(self): + return self.defSpecs[self.name].getAtomicNames() + + def getDefs(self): + return ((self.name, self.defSpecs[self.name]),) + self.defSpecs[ + self.name + ].getDefs() + + def getAtomics(self, ctx=bool): + return self.defSpecs[self.name].getAtomics(ctx) + + def toLean(self, ctx=bool): + return self.name + + def __eq__(self, other): + return ( + type(self) is type(other) + and self.defSpecs[self.name] == other.defSpecs[other.name] + ) + + def __str__(self): + return self.name + + class ConstantSpecNode(SpecNode): - def __init__(self, value, defInfo): + def __init__(self, value): self.value = value - super().__init__(defInfo) def applyAtomicTransformer(self, transformer): pass def getAtomicNames(self): - return set() + return tuple() - def clearSourceStrings(self): - pass + def getDefs(self): + return tuple() + + def getAtomics(self, ctx=bool): + return tuple() + + def toLean(self, ctx=bool): + return str(self.value) def __eq__(self, other): return type(self) is type(other) and self.value == other.value @@ -232,10 +301,9 @@ def __str__(self): class UnarySpecNode(SpecNode): - def __init__(self, sub, defInfo): + def __init__(self, sub): assert isinstance(sub, SpecNode) self.sub = sub - super().__init__(defInfo) def applyAtomicTransformer(self, transformer): self.sub.applyAtomicTransformer(transformer) @@ -243,29 +311,33 @@ def applyAtomicTransformer(self, transformer): def getAtomicNames(self): return self.sub.getAtomicNames() - def clearSourceStrings(self): - self.sub.clearSourceStrings() + def getDefs(self): + return self.sub.getDefs() + + def getAtomics(self, ctx=bool): + return self.sub.getAtomics(self.ctx) def __eq__(self, other): return type(self) is type(other) and self.sub == other.sub class BinarySpecNode(SpecNode): - def __init__(self, sub1, sub2, defInfo): + def __init__(self, sub1, sub2): assert isinstance(sub1, SpecNode) and isinstance(sub2, SpecNode) self.sub1, self.sub2 = sub1, sub2 - super().__init__(defInfo) def applyAtomicTransformer(self, transformer): self.sub1.applyAtomicTransformer(transformer) self.sub2.applyAtomicTransformer(transformer) def getAtomicNames(self): - return self.sub1.getAtomicNames() | self.sub2.getAtomicNames() + return self.sub1.getAtomicNames() + self.sub2.getAtomicNames() - def clearSourceStrings(self): - self.sub1.clearSourceStrings() - self.sub2.clearSourceStrings() + def getDefs(self): + return self.sub1.getDefs() + self.sub2.getDefs() + + def getAtomics(self, ctx=bool): + return self.sub1.getAtomics(self.ctx) + self.sub2.getAtomics(self.ctx) def __eq__(self, other): return ( @@ -276,116 +348,235 @@ def __eq__(self, other): class NarySpecNode(SpecNode): - def __init__(self, subs, defInfo): + def __init__(self, subs): assert all(isinstance(sub, SpecNode) for sub in subs) self.subs = subs - super().__init__(defInfo) def applyAtomicTransformer(self, transformer): for sub in self.subs: sub.applyAtomicTransformer(transformer) def getAtomicNames(self): - return set().union(*(sub.getAtomicNames() for sub in self.subs)) + return tuple(chain(*(sub.getAtomicNames() for sub in self.subs))) - def clearSourceStrings(self): - for sub in self.subs: - self.sub.clearSourceStrings() + def getDefs(self): + return tuple(chain(*(sub.getDefs() for sub in self.subs))) + + def getAtomics(self, ctx=bool): + return tuple(chain(*(sub.getAtomics(self.ctx) for sub in self.subs))) def __eq__(self, other): return type(self) is type(other) and self.subs == other.subs class Always(UnarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"G ({self.sub.toLean()})" + def __str__(self): return f"always ({self.sub})" class Eventually(UnarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"F ({self.sub.toLean()})" + def __str__(self): return f"eventually ({self.sub})" class Next(UnarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + if ctx is bool: + return f"Xʷ ({self.sub.toLean(ctx)})" + else: + return f"X ({self.sub.toLean(ctx)})" + def __str__(self): return f"next ({self.sub})" class Not(UnarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"¬({self.sub.toLean()})" + def __str__(self): return f"not ({self.sub})" +class Neg(UnarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"-({self.sub.toLean(float)})" + + def __str__(self): + return f"-({self.sub})" + + +class Ceil(UnarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"⌈{self.sub.toLean(float)}⌉" + + def __str__(self): + return f"ceil({self.sub})" + + class Until(BinarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean()}) U ({self.sub2.toLean()})" + def __str__(self): return f"({self.sub1}) until ({self.sub2})" class Implies(BinarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean()}) → ({self.sub2.toLean()})" + def __str__(self): return f"({self.sub1}) implies ({self.sub2})" class Equal(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) == ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) == ({self.sub2})" class GT(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) > ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) > ({self.sub2})" class GE(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) ≥ ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) >= ({self.sub2})" class LT(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) < ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) < ({self.sub2})" class LE(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) ≤ ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) <= ({self.sub2})" class Add(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) + ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) + ({self.sub2})" class Sub(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) - ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) - ({self.sub2})" class Mul(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) * ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) * ({self.sub2})" class Div(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) / ({self.sub2.toLean(float)})" + def __str__(self): return f"({self.sub1}) / ({self.sub2})" class Min(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) ⊓ ({self.sub2.toLean(float)})" + def __str__(self): - return f"Min(({self.sub1}), ({self.sub2}))" + return f"min(({self.sub1}), ({self.sub2}))" class Max(BinarySpecNode): + ctx = float + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean(float)}) ⊔ ({self.sub2.toLean(float)})" + def __str__(self): - return f"Max(({self.sub1}), ({self.sub2}))" + return f"max(({self.sub1}), ({self.sub2}))" class And(NarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean()}) ∧ ({self.sub2.toLean()})" + def __str__(self): return " and ".join(f"({str(sub)})" for sub in self.subs) class Or(NarySpecNode): + ctx = bool + + def toLean(self, ctx=bool): + return f"({self.sub1.toLean()}) ∨ ({self.sub2.toLean()})" + def __str__(self): return " or ".join(f"({str(sub)})" for sub in self.subs) diff --git a/src/scenic/core/geometry.py b/src/scenic/core/geometry.py index dba37887a..2e99d0ad9 100644 --- a/src/scenic/core/geometry.py +++ b/src/scenic/core/geometry.py @@ -42,6 +42,11 @@ def min(*args, **kwargs): return __builtins__["min"](*args, **kwargs) +@distributionFunction +def ceil(*args, **kwargs): + return math.ceil(*args, **kwargs) + + @distributionFunction def normalizeAngle(angle) -> float: while angle > math.pi: diff --git a/src/scenic/syntax/veneer.py b/src/scenic/syntax/veneer.py index b682315c8..774d685e2 100644 --- a/src/scenic/syntax/veneer.py +++ b/src/scenic/syntax/veneer.py @@ -36,6 +36,7 @@ "hypot", "max", "min", + "ceil", "_toStrScenic", "_toFloatScenic", "_toIntScenic", @@ -204,7 +205,7 @@ VerifaiParameter, VerifaiRange, ) -from scenic.core.geometry import cos, hypot, max, min, sin +from scenic.core.geometry import ceil, cos, hypot, max, min, sin from scenic.core.object_types import Mutator, Object, OrientedPoint, Point from scenic.core.regions import ( BoxRegion,