Skip to content

Commit

Permalink
Merge pull request #48 from rohaquinlop/issue-47
Browse files Browse the repository at this point in the history
fix(dfa|nfa): #47 fix bugs presents
  • Loading branch information
rohaquinlop authored Sep 26, 2024
2 parents 6df8d84 + f619c35 commit edcacf9
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 180 deletions.
249 changes: 77 additions & 172 deletions automathon/finite_automata/dfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from automathon.errors.errors import (
SigmaError,
)
from automathon.utils.utils import (
list_map,
)
from collections import (
deque,
)
Expand Down Expand Up @@ -117,12 +120,12 @@ def accept(self, string: str) -> bool:

if idx == len(string) and state in self.f:
ans = True
elif idx < len(string) and state in self.delta:
elif idx < len(string):
# Search through states
for transition in self.delta[state].items():
for a, state in self.delta[state].items():
# transition: ('1', 'q0')
if string[idx] == transition[0]:
q.append([idx + 1, transition[1]])
if string[idx] == a:
q.append([idx + 1, state])

return ans

Expand Down Expand Up @@ -305,7 +308,9 @@ def symmetric_difference(self, m: "DFA") -> "DFA":
)

def __binary_operation(
self, m: "DFA", operation: Callable[..., bool]
self,
m: "DFA",
operation: Callable[[str, set[str], str, set[str]], bool],
) -> "DFA":
new_q_list: list[tuple[str, str]] = []

Expand All @@ -320,7 +325,7 @@ def __binary_operation(
self.sigma, "Sigma from both DFAs must be the same"
)

queue = deque()
queue: deque[tuple[str, str]] = deque()
queue.append((self.initial_state, m.initial_state))

while queue:
Expand Down Expand Up @@ -349,195 +354,95 @@ def __binary_operation(

def minimize(self) -> "DFA":
"""Minimize the automata and return the minimized version"""
visited: dict[tuple[str, str], bool] = dict()
table, q = self.__initialize_table_and_queue()
p_k: set[frozenset[str]] = set(
[frozenset([*self.q.difference(self.f)]), frozenset([*self.f])]
)
p_prev: set[frozenset[str]] = set()

while q:
q_a, q_b = q.popleft()
if visited.get((q_a, q_b), False):
break
while p_k != p_prev:
states_idx: dict[str, int] = self.__states_idx_table(p_k)
new_p_k: list[set[str]] = []

visited[(q_a, q_b)] = True
for p_states in p_k:
p_states_lst = list(p_states)
new_p_k.append({p_states_lst[0]})

common_sigma = filter(
lambda s, q_a=q_a, q_b=q_b: s in self.delta[q_a]
and s in self.delta[q_b],
self.sigma,
)
for i in range(1, len(p_states_lst)):
was_added = False
p_i_sigma = set(self.delta[p_states_lst[i]].keys())

reachable_pairs = map(
lambda s, q_a=q_a, q_b=q_b: (
self.delta[q_a][s],
self.delta[q_b][s],
),
common_sigma,
)
new_p_k, was_added = self.__define_group_ith_element(
i, new_p_k, p_i_sigma, p_states_lst, states_idx
)

it_is_marked = any(
map(lambda p: table.get(p, False), reachable_pairs)
)
if not was_added:
new_p_k.append({p_states_lst[i]})

if it_is_marked:
table[(q_a, q_b)] = True
visited = dict()
else:
q.append((q_a, q_b))

# combine pending_states
unmarked_pairs: list[tuple[str, str]] = []
unmarked_states: set[str] = set()

while q:
q_a, q_b = q.popleft()
unmarked_pairs.append((q_a, q_b))
unmarked_states.add(q_a)
unmarked_states.add(q_b)

remaining_states = self.q.copy() - unmarked_states
new_final_states: set[str] = set()
groups: dict[str, list[str]] = dict()

for pair in unmarked_pairs:
groups[pair[0]] = groups.get(pair[0], []) + [pair[1]]
groups[pair[1]] = groups.get(pair[1], []) + [pair[0]]

# group states from groups (dfs)
grouped_states: dict[int, list[str]] = dict()
states_group: dict[str, int] = dict()
final_groups: list[int] = []

groups_count = self.__group_unmarked_states(
unmarked_states,
groups,
grouped_states,
final_groups,
states_group,
)
p_prev, p_k = p_k, set(map(lambda s: frozenset([*s]), new_p_k))

for state in remaining_states:
states_group[state] = groups_count
grouped_states[groups_count] = [state]
states_idx: dict[str, int] = self.__states_idx_table(p_k)
initial_state = f"q{states_idx[self.initial_state]}"
final_states = set(list_map(lambda s: f"q{states_idx[s]}", self.f))

if state in self.f:
final_groups.append(groups_count)
delta: dict[str, dict[str, str]] = dict()
states = set(list_map(lambda idx: f"q{idx}", states_idx.values()))

groups_count += 1
for state_group in p_k:
fst_state = list(state_group)[0]

new_delta, new_initial_state = self.__build_new_delta(
groups_count, grouped_states, states_group
)
delta[f"q{states_idx[fst_state]}"] = dict()

for i in final_groups:
new_final_states.add(f"q{i}")
for s in self.delta[fst_state]:
delta[f"q{states_idx[fst_state]}"][s] = (
f"q{states_idx[self.delta[fst_state][s]]}"
)

return DFA(
set(new_delta.keys()),
self.sigma.copy(),
new_delta,
new_initial_state,
new_final_states,
states, self.sigma.copy(), delta, initial_state, final_states
)

def __initialize_table_and_queue(
self,
) -> tuple[dict[tuple[str, str], bool], deque[tuple[str, str]]]:
table: dict[tuple[str, str], bool] = dict()
states = list(self.q.copy())
q: deque[tuple[str, str]] = deque()

for i in range(1, len(states)):
for j in range(0, i):
if states[i] in self.f and states[j] not in self.f:
table[(states[i], states[j])] = True
else:
q.append((states[i], states[j]))
def __states_idx_table(self, p_k: set[frozenset[str]]) -> dict[str, int]:
states_idx: dict[str, int] = dict()

return table, q
for i in range(len(p_k)):
for state in list(p_k)[i]:
states_idx[state] = i

def __group_unmarked_states(
return states_idx

def __define_group_ith_element(
self,
unmarked_states: set[str],
groups: dict[str, list[str]],
grouped_states: dict[int, list[str]],
final_groups: list[int],
states_group: dict[str, int],
) -> int:
visited: dict[str, bool] = dict()
groups_count = 0

for state in unmarked_states:
if visited.get(state, False):
i: int,
new_p_k: list[set[str]],
p_i_sigma: set[str],
p_states_lst: list[str],
states_idx: dict[str, int],
) -> tuple[list[set[str]], bool]:
was_added = False
for new_p_states in new_p_k:
new_p_states_lst = list(new_p_states)
new_p_sigma = set(self.delta[new_p_states_lst[0]].keys())

if p_i_sigma != new_p_sigma:
continue

final_states, non_final_states = (
self.__get_final_and_non_final_states(state, visited, groups)
are_equivalent = all(
list_map(
lambda s,
p=p_states_lst[i],
q=new_p_states_lst[0],
table=states_idx,
delta=self.delta: table[delta[p][s]] == table[delta[q][s]],
p_i_sigma,
)
)

if final_states:
grouped_states[groups_count] = final_states
final_groups.append(groups_count)

for f_state in final_states:
states_group[f_state] = groups_count

groups_count += 1

if non_final_states:
grouped_states[groups_count] = non_final_states

for nf_state in non_final_states:
states_group[nf_state] = groups_count

groups_count += 1

return groups_count

def __get_final_and_non_final_states(
self, state: str, visited: dict[str, bool], groups: dict[str, list[str]]
) -> tuple[list[str], list[str]]:
stack: deque[str] = deque()
stack.append(state)
visited[state] = True
final_states = []
non_final_states = []

while stack:
current_state = stack.pop()

if current_state in self.f:
final_states.append(current_state)
else:
non_final_states.append(current_state)

for next_state in groups[current_state]:
if not visited.get(next_state, False):
stack.append(next_state)
visited[next_state] = True

return final_states, non_final_states

def __build_new_delta(
self,
groups_count: int,
grouped_states: dict[int, list[str]],
states_group: dict[str, int],
) -> tuple[dict[str, dict[str, str]], str]:
new_delta: dict[str, dict[str, str]] = dict()
new_initial_state: str = ""

for i in range(groups_count):
new_delta[f"q{i}"] = dict()

for state in grouped_states[i]:
if state == self.initial_state:
new_initial_state = f"q{i}"

for s in self.delta[state].keys():
new_delta[f"q{i}"][s] = (
f"q{states_group[self.delta[state][s]]}"
)
if are_equivalent:
new_p_states.add(p_states_lst[i])
was_added = True
break

return new_delta, new_initial_state
return new_p_k, was_added

def view(
self,
Expand Down
5 changes: 1 addition & 4 deletions automathon/finite_automata/nfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
annotations,
)
from automathon.errors.errors import (
InputError,
SigmaError,
)
from automathon.finite_automata.dfa import (
Expand Down Expand Up @@ -169,9 +168,7 @@ def _add_pairs_to_queue(

if idx == len(string) and state in self.f:
ans = True
elif idx < len(string) and string[idx] not in self.sigma:
raise InputError(string[idx], "Is not declared in sigma")
elif idx < len(string) and state in self.delta:
elif idx < len(string):
# Search through states
epsilon_transitions = filter(
lambda x: x[0] == "", self.delta[state].items()
Expand Down
48 changes: 44 additions & 4 deletions tests/test_dfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,55 @@ def test_minimize(self):
minimized_dfa = dfa.minimize()

self.assertTrue(minimized_dfa.is_valid())
self.assertGreaterEqual(len(dfa.q), len(minimized_dfa.q))

def test_minimize_2(self):
fa = DFA(
q={"q0", "q1", "q2", "q3", "q4", "q5"},
sigma={"0", "1"},
delta={
"q0": {"0": "q3", "1": "q1"},
"q1": {"0": "q2", "1": "q5"},
"q2": {"0": "q2", "1": "q5"},
"q3": {"0": "q0", "1": "q4"},
"q4": {"0": "q2", "1": "q5"},
"q5": {"0": "q5", "1": "q5"},
},
initial_state="q0",
f={"q1", "q2", "q4"},
)
minimized_fa = fa.minimize()

self.assertGreaterEqual(len(fa.q), len(minimized_fa.q))
self.assertEqual(fa.accept("1"), minimized_fa.accept("1"))
self.assertEqual(fa.accept("11"), minimized_fa.accept("11"))
self.assertEqual(fa.accept("110"), minimized_fa.accept("110"))
self.assertEqual(fa.accept("1101"), minimized_fa.accept("1101"))
self.assertEqual(fa.accept("01"), minimized_fa.accept("01"))
self.assertEqual(fa.accept("0101"), minimized_fa.accept("0101"))

def test_minimize_existing_1(self):
minimized_fa = self.fa.minimize()
fa = DFA(
q={"q0", "q1", "q2"},
sigma={"0", "1"},
delta={
"q0": {"0": "q0", "1": "q1"},
"q1": {"0": "q2", "1": "q0"},
"q2": {"0": "q1", "1": "q2"},
},
initial_state="q0",
f={"q0"},
)
minimized_fa = fa.minimize()
not_minimized_fa = minimized_fa.complement()

self.assertGreaterEqual(len(fa.q), len(minimized_fa.q))
self.assertTrue(minimized_fa.is_valid())
self.assertTrue(minimized_fa.accept(""))
self.assertTrue(minimized_fa.accept("001001"))
self.assertTrue(minimized_fa.accept("0101010101010"))
self.assertEqual(fa.accept(""), minimized_fa.accept(""))
self.assertEqual(fa.accept("001001"), minimized_fa.accept("001001"))
self.assertEqual(
fa.accept("0101010101010"), minimized_fa.accept("0101010101010")
)

self.assertTrue(not_minimized_fa.is_valid())
self.assertFalse(not_minimized_fa.accept(""))
Expand Down

0 comments on commit edcacf9

Please sign in to comment.