diff --git a/report.md b/report.md deleted file mode 100644 index df9a077..0000000 --- a/report.md +++ /dev/null @@ -1,2939 +0,0 @@ ---- -title: Computational Intelligence - Final Report -author: Alexandro Buffa - S316999 ---- -# Computational Intelligence - Final Report - -- [Computational Intelligence - Final Report](#computational-intelligence---final-report) - - [Lab 1](#lab-1) - - [Assignment](#assignment) - - [Code](#code) - - [Lab 2 (also known as Lab 3)](#lab-2-also-known-as-lab-3) - - [Assignment](#assignment-1) - - [README](#readme) - - [Code](#code-1) - - [Peer Reviews Submitted](#peer-reviews-submitted) - - [Lab 3 (also known as Lab 9)](#lab-3-also-known-as-lab-9) - - [Assignment](#assignment-2) - - [README](#readme-1) - - [Code](#code-2) - - [Peer Reviews Submitted](#peer-reviews-submitted-1) - - [Lab 4 (also known as Lab 10)](#lab-4-also-known-as-lab-10) - - [Assignment](#assignment-3) - - [README](#readme-2) - - [Code](#code-3) - - [Peer Reviews Submitted](#peer-reviews-submitted-2) - - [Project - Quixo](#project---quixo) - - [README](#readme-3) - - [Code](#code-4) - - [Custom Game Class](#extension-of-the-game-class) - - [Minimax Player](#minimax-player) - - [Monte Carlo Tree Search Player](#monte-carlo-tree-search-player) - - -## Lab 1 - -### Assignment - -Set Covering Problem - -#### Code - -[Last Commit: Oct 24, 2023](https://github.com/ExalFabu/Computational-Intelligence/commit/99665f144c3bb0602fb01b7172095beba74e2d8f) - -##### Set Covering - 2023-10-10 -Copyright(c) 2023 Alex Buffa - - - -```python -import numpy as np -from random import random -from typing import Tuple, Set -from functools import reduce -from operator import or_ -from queue import PriorityQueue, LifoQueue, SimpleQueue, Queue -from collections import namedtuple -from typing import Callable -from math import ceil -from tqdm.notebook import tqdm -Result = namedtuple("Result", ["name", "iters", "taken", "coverage", "prio"]) -State = Tuple[Set[int], Set[int]] -``` - -Define our problem data - - -```python -PROBLEM_SIZE = 10 -NUM_SETS = 30 -THRESHOLD = 0.3 -SETS = tuple(np.array([random() < THRESHOLD for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS)) -# Redefine SETS until the problem is solvable -while not all(reduce(or_, [SETS[i] for i in range(NUM_SETS)])): - SETS = tuple(np.array([random() < THRESHOLD for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS)) -results: dict[str, Result] = dict() - -``` - - -```python -# Utility function just to see our current taken array -def visualize_state(state: State) -> list[int]: - return sum([SETS[i] for i in state[0]]) -``` - - -```python -def goal_check(state: State): - return all(reduce(or_, [SETS[i] for i in state[0]], np.array([False for _ in range(PROBLEM_SIZE)]))) -``` - - -```python -def search(name: str, initial_state: State = None,*, frontier: "Queue" = None, priority: Callable[[State],int] = None) -> Result: - """Generic Search Function. - Through the parameters - """ - if initial_state is None: - initial_state = (set(), set(range(NUM_SETS))) - assert len(initial_state) == 2, "Invalid State" - if frontier is None: - frontier = PriorityQueue() - if priority is None: - priority = lambda _: None - WrappedState = namedtuple("WrappedState", ["priority", "state"]) - frontier.put(WrappedState(priority(initial_state), initial_state)) - _, state = frontier.get() - counter = 0 - with tqdm(total=None) as pbar: - while not goal_check(state): - counter += 1 - for a in state[1]: - new_state = (state[0] ^ {a}, state[1] ^ {a}) - frontier.put(WrappedState(priority(new_state), new_state)) - _, state = frontier.get() - pbar.update() - res = Result(name, counter, state[0], visualize_state(state), priority(state)) - results[name] = res - return res - -``` - -###### Depth First Search - - -```python -search(name="Depth-First", frontier=LifoQueue()).taken -``` - - - 0it [00:00, ?it/s] - - - - - - {23, 24, 25, 26, 27, 28, 29} - - - -###### Breadth First Search - - -```python -# Using SimpleQueue, which does it internally -search(name="Breadth-First", frontier=SimpleQueue()).taken -``` - - - 0it [00:00, ?it/s] - - - - - - {1, 2, 21} - - - -###### Djikstra Search - - -```python -def cost(state: State) -> int: - """Number of sets""" - return len(state[0]) -``` - - -```python -search(name="Djikstra", priority=cost).taken -``` - - - 0it [00:00, ?it/s] - - - - - - {1, 9, 16} - - - -###### A* Search - -A* requires a heuristic function that is admissible, i.e. it never overestimates the cost to reach the goal. -For example, we define the distance function as the optimal number of sets that are needed to cover the missing tiles. -With the above distance function we have an admissible heuristic function. -The priority for A* is given by the sum of the cost function and the heuristic function. - - -```python -def distance(state: State) -> int: - max_size = max(sum(s) for i, s in enumerate(SETS) if i in state[1]) - if(len(state[0]) == 0 ): - return ceil(PROBLEM_SIZE/max_size) - return ceil((sum([SETS[i] for i in state[0]]) == 0).sum() / max_size) -``` - - -```python -search(name="A*", priority=lambda x: cost(x) + distance(x)).taken -``` - - - 0it [00:00, ?it/s] - - - - - - {1, 2, 21} - - - - -```python -def informed_cost(state: State) -> int: - """Number tiles missing + number of overlapped tiles""" - return sum(abs(np.ones(PROBLEM_SIZE) - sum([SETS[i] for i in state[0]]))) -``` - - -```python -search("Greedy", priority=informed_cost).taken -``` - - - 0it [00:00, ?it/s] - - - - - - {1, 8, 16} - - - - -```python -print("All the results obtained above, sorted by number of iterations") -for result in reversed(results.values()): - print(result) -``` - - All the results obtained above, sorted by number of iterations - Result(name='Greedy', iters=4, taken={8, 16, 1}, coverage=array([1, 2, 1, 1, 1, 1, 1, 1, 1, 1]), prio=1.0) - Result(name='A*', iters=25, taken={1, 2, 21}, coverage=array([1, 1, 1, 2, 1, 1, 1, 1, 1, 2]), prio=3) - Result(name='Djikstra', iters=1111, taken={16, 9, 1}, coverage=array([1, 1, 1, 1, 1, 1, 1, 1, 2, 2]), prio=3) - Result(name='Breadth-First', iters=1760, taken={1, 2, 21}, coverage=array([1, 1, 1, 2, 1, 1, 1, 1, 1, 2]), prio=None) - Result(name='Depth-First', iters=7, taken={23, 24, 25, 26, 27, 28, 29}, coverage=array([4, 3, 5, 1, 2, 1, 3, 2, 1, 3]), prio=None) - - - -## Lab 2 (also known as Lab 3) - -### Assignment -Nim Game - Evolutionary Strategy-based agents - -#### README - ---- START OF README OF LAB 2 - Headings are now different --- - -# Lab 2: Nim - ES -## Description -This lab requested us to build an evolutionary-based agent able to play a Nim game. -We started by defining a (*expert*) rule-based agent, which was able to play the game with a certain level of success. -We then tried (really hard) to come up with an evolutionary-based agent and we came up with the following parameters to be trained: - - **Phase Thresholds**: the thresholds for the phases of the game (early, mid, late) - We measure the phase of the game by the number of theoretical moves left to end the game over the total number of moves (it thus depends on both size of the board and k-limit): $p\in [0,1]$ - The thresholds are thus $t_{1}, t_{2} \in [0,1]$ and the phase is defined as follows: - - Late phase: $t_{2} < p\ < 1$ - - Mid phase: $t_{1} < p \leq t_{2}$ - - Early phase: $0 \leq\ p \leq t_{1}$ - - **Strategy Probabilities**: the probabilities of the strategies to be used in each phase, the strategies he can use are: [expert, pure_random, gabriele, optimal] - *Note*: We initially thought of saving this as probabilities (thus summing up to 1 for each phase), but the results were not satisfactory (due to the fact that we were often applying softmax, who would *flatten* them up to $1/n$), so we decided to save them as weights, and then normalize them (w/ softmax) when picking the strategy to use. - *TL;DR*: these values do not represents probabilities, but weights. - *Example*: - ```python - [[0.7, 0.2, 0.0, 0.1], # early phase - [0.7, 0.4, 0.01, 0.0], # mid phase - [0.7, 0.1, 0.1, 0.1]] # late phase - ``` - -Each Individual, for every Nim board, will have a probability to use each strategy (depending on the phase the board is), and the strategy that will be played will be picked with the aforementioned probabilities. - -The fitness of an individual is the average of the accuracy of the games played against the *expert* rule-based agent. - -A good Individual will be able to play the game with a high accuracy, and will be able to adapt to different board sizes and k-limits. -We believe that the Individual should converge to be playing always the *expert* strategy, thus achieving a fitness of 50%, and a fitness above that would mean that the Individual is better than the *expert*. - -## Collaborations -- I worked with (Davide Vitabile - S330509)[https://github.com/Vitabile] and (Davide Sferrazza - S326619)[https://github.com/FarInHeight], used their rule-based agent (tried to implement a rule for the k-limit variant, with not much success) and we developed the evolutionary agent together. - -## Sources -- [Nim Game](https://en.wikipedia.org/wiki/Nim) -- [How to Win at Nim](https://www.archimedes-lab.org/How_to_Solve/Win_at_Nim.html) - - ---- END OF README OF LAB 2 - Headings are now restored --- - - -#### Code - -[Last Commit: Nov 14, 2023](https://github.com/ExalFabu/Computational-Intelligence/commit/2c88352471845815c2f9b4b1404c71c4d819e758) - -Copyright **`(c)`** 2022 Giovanni Squillero `` -[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence) -Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details. - - -# Lab 2: ES - -## Task - -Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*). - -The goal of the game is to **avoid** taking the last object. - -* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*) -* Task2.2: An agent using evolved rules using ES - -## Instructions - -* Create the directory `lab2` inside the course repo -* Put a `README.md` and your solution (all the files, code and auxiliary data if needed) - -## Notes - -* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`. -* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`. - - - - -```python -import logging -logging.basicConfig(level=logging.INFO) -from pprint import pprint, pformat -from collections import namedtuple -import random -from copy import deepcopy -from dataclasses import dataclass -from typing import Literal, TypedDict, Callable -import math -import random -from tqdm import tqdm, trange - -``` - -## The *Nim* and *Nimply* classes - - -```python -Nimply = namedtuple("Nimply", "row, num_objects") - -``` - - -```python -class Nim: - def __init__(self, num_rows: int, k: int = None) -> None: - """ - Args: - num_rows (int): number of piles - k (int, optional): maximum number of objects nimmable each time. Defaults to None (any amount). - """ - self._rows = [i * 2 + 1 for i in range(num_rows)] - self._k = k - - def __bool__(self): - return sum(self._rows) > 0 - - def __str__(self): - return "<" + " ".join(str(_) for _ in self._rows) + ">" + (f" ({self._k}) " if self._k is not None else "") - - def __repr__(self): - return self.__str__() - - @property - def rows(self) -> tuple: - return tuple(self._rows) - - @property - def k(self) -> int: - return self._k - - - def nimming(self, ply: Nimply) -> None: - row, num_objects = ply - assert self._rows[row] >= num_objects - assert self._k is None or num_objects <= self._k, f"{num_objects=}, {self._k=}" - self._rows[row] -= num_objects -``` - -## Sample (and silly) startegies - - -```python -def pure_random(state: Nim) -> Nimply: - """A completely random move""" - row = random.choice([r for r, c in enumerate(state.rows) if c > 0]) - num_objects = random.randint(1, state.rows[row]) if state._k is None else min(random.randint(1, state.rows[row]), state._k) - return Nimply(row, num_objects) - -``` - - -```python -def gabriele(state: Nim) -> Nimply: - """Pick always the maximum possible number of the smallest row""" - possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c+1 if state._k is None else min(c + 1, state._k))] - return max(possible_moves, key=lambda m: (-m[0], m[1])) -``` - - -```python -import numpy as np - -Strategy = Callable[[Nim], Nimply] - -def nim_sum(state: Nim) -> int: - tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows]) - xor = tmp.sum(axis=0) % 2 - return int("".join(str(_) for _ in xor), base=2) - - -def analize(raw: Nim) -> dict: - cooked = dict() - cooked["possible_moves"] = dict() - for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c+1 if raw._k is None else min(c + 1, raw._k))): - tmp = deepcopy(raw) - tmp.nimming(ply) - cooked["possible_moves"][ply] = nim_sum(tmp) - return cooked - - -def optimal(state: Nim) -> Nimply: - analysis = analize(state) - logging.debug(f"analysis:\n{pformat(analysis)}") - spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0] - if not spicy_moves: - spicy_moves = list(analysis["possible_moves"].keys()) - logging.debug(pformat(f"{analysis['possible_moves']}")) - ply = random.choice(spicy_moves) - return ply - -``` - -## Oversimplified match - - -```python -def match(player_position: int, player_strategy: Strategy, opponent: Strategy, *, size: int = 5, k: int = None, lvl = logging.WARN): - logging.getLogger().setLevel(lvl) - strategy = (player_strategy, opponent) if player_position == 0 else (opponent, player_strategy) - - nim = Nim(size, k) - logging.info(f"init : {nim} {bool(nim)=}") - player = 0 - while nim: - ply = strategy[player](nim) - nim.nimming(ply) - # logging.debug(f"ply: player {player} ({strategy[player].__qualname__}) \t plays {ply} -> {nim} ({nim_sum(nim)})") - player = 1 - player - # logging.debug(f"status: Player {player} ({strategy[player].__qualname__}) won!") - return player == player_position -``` - - -```python -def expert_strategy(state: Nim, klimit: bool = False) -> Nimply: - """ - This function implement an expert systems which beats the strategies defined above - """ - analysis = analize(state) - logging.debug(f"analysis:\n{pformat(analysis)}") - not_zero_rows = len(state.rows) - state.rows.count(0) - one_count_rows = state.rows.count(1) - # if state._k is not None and klimit: - # non_modulo_rows = [Nimply(row, (objects % state.k + 1)) for row, objects in enumerate(state.rows) if objects > state._k and (objects % (state._k+1)) == 0] - # if len(non_modulo_rows) > 0: - # return non_modulo_rows[0] - if one_count_rows == not_zero_rows - 1: - is_odd = (one_count_rows % 2) == 1 - row, objects = [(row, objects) for row, objects in enumerate(state.rows) if objects > 1][0] - if is_odd: - return Nimply(row, objects if state.k is None else min(objects, state.k)) - return Nimply(row, objects - 1 if state.k is None else min(objects - 1, state.k)) - spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0] - if not spicy_moves: - spicy_moves = list(analysis["possible_moves"].keys()) - logging.debug(pformat(f"{analysis['possible_moves']}")) - ply = random.choice(spicy_moves) - return ply -``` - -# Adaptive Strategy - - -```python -def remaining_moves(n: Nim, ratio: bool = False)->float: - """Measure used to understand the current phase of the game - - Args: - n (Nim): game - ratio (bool, optional): If true calculates the ratio between the current remaining moves over the starting number of moves. - Defaults to False. - - Returns: - float: number of remaining moves or ratio - """ - mr = sum([1 for _, c in enumerate(n.rows) for _ in range(1, c+1 if n._k is None else min(c + 1, n._k))]) - if ratio: - mt = remaining_moves(Nim(len(n.rows), n.k), False) - return mr/mt - else: - return mr -``` - - -```python -def softmax(x): - """Compute softmax values for each sets of scores in x.""" - # https://stackoverflow.com/questions/34968722/how-to-implement-the-softmax-function-in-python - e_x = np.exp(x - np.max(x)) - return (e_x / e_x.sum(axis=0)).tolist() - -starting_mutation_rate = (0.01, 2.5) -mutation_rate: tuple[float, float] = deepcopy(starting_mutation_rate) - -@dataclass(init=False) -class Individual: - n_strategy: int - phase_thresholds: tuple[float] - strategy_probs: tuple[tuple[float]] - _history: list - - def __init__(self, n_strategy: int = None, strategy_probs = None, phase_thresholds = None) -> None: - if n_strategy is None: - n_strategy = 4 - if strategy_probs is None: - strategy_probs = Individual._generate_random_strategy_probs(n_strategy) - if phase_thresholds is None: - phase_thresholds = sorted([random.uniform(0, 1), random.uniform(0, 1)]) - else: - phase_thresholds = sorted([max(0, phase_thresholds[0]), min(1, phase_thresholds[1])]) - - self.n_strategy = n_strategy - self.strategy_probs = strategy_probs - self.phase_thresholds = phase_thresholds - self._history: list[dict[str, int]] = [dict(),dict(),dict()] - - def _generate_random_strategy_probs(n_strategy): - matrix = [[], [], []] - for i in range(3): - x = [random.randint(4,6) for _ in range(n_strategy)] - # x = softmax(x) - matrix[i] = x - return matrix - - def mutate(ind: "Individual", mr: tuple[float, float]) -> "Individual": - ind = deepcopy(ind) - phase_thresholds = np.random.normal(ind.phase_thresholds, mr[0]).tolist() - strategy_probs = np.random.normal(ind.strategy_probs, mr[1]).tolist() - return Individual(strategy_probs=strategy_probs, phase_thresholds=phase_thresholds, n_strategy=ind.n_strategy) - - def __call__(self: "Individual", state: Nim) -> Nimply: - phase_ratio = remaining_moves(state, True) - phase_index = 0 if phase_ratio < self.phase_thresholds[0] else (1 if self.phase_thresholds[0] <= phase_ratio <= self.phase_thresholds[1] else 2) - probs = softmax(self.strategy_probs[phase_index]) - STRATEGIES = [expert_strategy, gabriele, optimal, pure_random] - strategy = np.random.choice(STRATEGIES[:self.n_strategy], p=probs) - move = strategy(state) - h: dict[str, int] = self._history[phase_index] - self._history[phase_index] = { - **h, - strategy.__qualname__: h.get(strategy.__qualname__, 0) + 1 - } - return move - - def reset_history(self): - self._history = [dict(), dict(), dict()] - - @property - def history(self: "Individual") -> list[dict[str, str]]: - # History as percentage for each phase - sums = [sum(phase.values()) for phase in self._history] - ret = [dict(), dict(), dict()] - for i in range(len(self._history)): - for k,v in self._history[i].items(): - ret[i][k] = f"{v/sums[i]:7.2%}" - return ret -``` - - -```python -ITERS = 600 -LAMBDA = 30 -N_MATCHES = 10 -OPPONENT = expert_strategy -``` - - -```python -def streak(player_strategy: "Strategy", n: int = None, opponent: "Strategy" = None) -> float: - """Perform a series of matches and calculate the accuracy (win ratio). Order of players is random - - Args: - player_strategy (Strategy): Player 1 - n (int, optional): number of games to be played (circa). Defaults to None. - opponent (Strategy, optional): Player 2. Defaults to None. - - Returns: - float: accuracy (win ratio) - """ - if n is None: - n = N_MATCHES - if opponent is None: - opponent = OPPONENT - wins = 0 - total = random.randrange((n*3)//4, n) - for _ in range(total): - random_size = random.randint(4,10) - random_k = random.choice([None, None, *[random.randint(2, random_size*2+1) for _ in range(2)]]) - # pprint((random_size, random_k)) - wins += 1 if match(random.choice([0,1]), player_strategy, opponent, size=random_size, k=random_k) else 0 - return wins / total -``` - - -```python -def train(*, variant: Literal["comma", "plus"] = "comma", - mu: int = 1, lambda_: int = None, iters: int = None, mutation_rate: tuple[float, float] = None, training_factor: float = 1.1) -> TypedDict: - if lambda_ is None: - lambda_ = LAMBDA - if iters is None: - iters = ITERS - if mutation_rate is None: - mutation_rate = deepcopy(starting_mutation_rate) - - parents = [Individual() for _ in range(mu)] - starting = deepcopy(parents) - parents_result = [streak(p) for p in parents] - pbar = trange(0, iters // lambda_, unit="epoch") - streak_bar = tqdm(total=lambda_, desc="Evaluating offspring fitness", unit="streak", colour="gray") - for _ in pbar: - pbar.set_description(f"Training - Accuracy: {max(parents_result):.2%}") - offspring = [(random.choice(parents)).mutate(mutation_rate) for _ in range(lambda_)] - results = [] - streak_bar.reset(total=lambda_) - for i in offspring: - results.append(streak(i)) - streak_bar.update(1) - - # results = [streak(i) for i in tqdm(offspring, unit="streak", leave=False, disable=True)] - incrate = (np.sum([res > sum(parents_result)/len(parents_result) for res in results])/lambda_) - - if incrate > 1/5: - mutation_rate = (mutation_rate[0]*training_factor, mutation_rate[1]*training_factor) - elif incrate < 1/5: - mutation_rate = (mutation_rate[0]/training_factor, mutation_rate[1]/training_factor) - - - population = list(zip(results, offspring)) - if variant == "plus": - population.extend(list(zip(parents_result, parents))) - population = sorted(population, key=lambda i:i[0], reverse=True)[:mu] - - parents = [it[1] for it in population] - parents_result = [it[0] for it in population] - streak_bar.close() - best_ind = np.argmax(parents_result) - - return { - "best": (parents_result[best_ind], parents[best_ind]), - "starting": starting, - "parents": list(zip(parents_result, parents)), - "mutation_rate": mutation_rate - } - -def evaluate(ind: Individual, name: str = None,*, opponents: list["Strategy"] = None, only_accuracies: bool = False): - ind.reset_history() - if opponents is None: - opponents = [gabriele, pure_random, optimal, expert_strategy] - if name is None: - name = ind.__qualname__ - - acc_onecomma = list(zip( - [streak(ind, 100, opponent) for opponent in tqdm(opponents, leave=False, desc=f"Evaluating {name}", smoothing=0.1, unit="opponent", disable=only_accuracies)], - [it.__qualname__ for it in opponents]) - ) - msg = "Accuracy of" if name == "" else f"{name} has an accuracy of" - print("\n".join([f"{msg} {acc:6.2%} vs {o}" for acc,o in acc_onecomma])) - if only_accuracies: - return - print(f"History: {pformat(ind.history)}") - print(f"StrategyProbs: {pformat([[f'{itit:.3f}' for itit in it ] for it in ind.strategy_probs])}") - print(f"Thresholds: {pformat([f'{it:.3f}' for it in ind.phase_thresholds])}") - -``` - -# $(1,\lambda)$ - ES - - -```python -res_oc = train(variant="comma", mu=1, training_factor=1.2) -ind_onecomma: Individual -_, ind_onecomma = res_oc["best"] -``` - - 0%| | 0/20 [00:00 "Genome": - """Alters one single gene of the starting genome - - Args: - g1 (Genome): Starting genome (never altered) - - Returns: - Genome: Mutated genome - """ - d = asdict(g1) - rand_attr_to_change: str = random.choice(list(d.keys())) - d[rand_attr_to_change] = random_allele_value(rand_attr_to_change) - return Genome(**d) - - def crossover(g1: "Genome", g2: "Genome") -> "Genome": - d1, d2 = asdict(g1), asdict(g2) - child = dict() - for field in d1.keys(): - child[field] = d1[field] if random.random() < 0.5 else d2[field] - - return Genome(**child) - -d = Genome() -e = Genome() -print(d, e, d.crossover(e).mutate()) -``` - - Genome(prefer_rows=0, percent_to_take=0.6731544357330547) Genome(prefer_rows=-1, percent_to_take=0.505487204798421) Genome(prefer_rows=-1, percent_to_take=0.32183818818233567) - - -### Peer Reviews Submitted - -![Let's do this](https://media4.giphy.com/media/BpGWitbFZflfSUYuZ9/giphy.gif) - -#### Review 1 [(Open on Github)](https://github.com/vinz321/computational_intelligence_23_24/issues/2) -Issued to [Vincenzo Micciché - s316900](https://github.com/vinz321/computational_intelligence_23_24/blob/2a266ebdb14b920f9d5b60547b9dfe9f2c4c1a64/lab2-nim.ipynb), a friend of mine whom i have not worked with in this lab. - -### Considerations -Hi Vinz 😊 -I'll start off by complimenting you with how the code is well written and pretty straight forward, also nice approach with the `vinzgorithm` rule-based strategy. -I also have nothing to say about the Evolution-Strategy approach, if I had to name something I would say that you are training the individuals by playing always on the same side, with the same board size and with the same number of matches. This is not a problem per se, but it might be interesting to see how the individuals perform in different scenarios. - -I do have found though a couple of hiccups regarding the implementation, which I'll explain in the next section. - -#### Problems with the implementation -- An oversight on the `tweak` function of the invidivual caused the program to never save the fitness value of the individual. This caused the program to always select the first individual as the best one. - ![Oopsie](https://media2.giphy.com/media/cE9GVwn2mJwoSvScrI/giphy.gif) -- Another problem I observed (because I've done the same mistake) is caused by applying softmax every time a new individual is created. This causes the probabilities to converge to $1/n$ where $n$ is the number of strategies used, e.g. - ```python - softmax(softmax(softmax(softmax(softmax([0.8, 0.1, 0.1])))))=array([0.33553626, 0.33223187, 0.33223187]) - ``` - I encountered the same problem in my implementation and I solved it by applying softmax only when the probabilities are used to select a strategy. Another approach would be to use a different normalization function, like dividing by their sum. - -I ran your code with a combination of the above problem fixed and I got the following results: -- Results with nothing fixed, as a baseline (after only 50 epochs): - ```python - individual3.vec=array([0.24987535, 0.25108963, 0.25034635, 0.24868867]), mean=0.25, std=0.00087 # Caused by softmax - individual3.fitness_value=0 # Caused by oversight - evaluation=40.000% - [('vinzgorithm', 0.2510896342022743), - ('optimal', 0.2503463466419632), - ('pure_random', 0.24987534537561654), - ('gabriele', 0.24868867378014592)] - ``` - -- Results with the oversight fixed (50 epochs): - ```python - individual3.vec=array([0.25074403, 0.25004322, 0.24976212, 0.24945063]), mean=0.25, std=0.00048 # Caused by softmax - individual3.fitness_value=0.52 # Oversight fixed - evaluation=39.000% - [('pure_random', 0.2507440276099417), - ('vinzgorithm', 0.2500432178488247), - ('optimal', 0.24976211973159504), - ('gabriele', 0.24945063480963853)] - ``` - -- Results with oversight and softmax fixed (50 epochs): - ```python - individual3.vec=array([0.91646977, 1.81028982, 0.9667728 , 0.82709137]), mean=1.130155942697836, std=0.39585 # Fixed removing softmax - individual3.fitness_value=0.6 - evaluation=40.000% - # strategy name, softmax(vec) - [('vinzgorithm', 0.4517941354717714), - ('optimal', 0.1943595129269683), - ('pure_random', 0.1848244714173517), - ('gabriele', 0.16902188018390876)] - ``` - -Overall it seems that with the right amount of epochs and sigma it kind of converges to vinzgorithm, which is what we expect! -![Bye bye](https://media4.giphy.com/media/p6P5KdqRljCrVoZj79/giphy.gif) - - -#### Review 2 [(Open on Github)](https://github.com/AngeloIannielli/polito-computational-intelligence-23/issues/2) - -Issued to [Angelo Iannelli - s317887](https://github.com/AngeloIannielli/polito-computational-intelligence-23/blob/a4dbb254077fdfd85c50b0e84765439962104c95/Lab2/Lab2.ipynb), picked random from the excel with random.org 😊. - - -### Considerations -Hi Angelo 😊, you've been picked randomly from random.org for my peer review, hope it will bring something useful to you! -Your code is very well structured, well commented and very straight-forward to read and understand. -I also liked your approach on trying to find new strategies to compete against the `optimal` strategy, and thanks to your graphs it's easy to see that your results look promising. - -The only thing left for me to add is that you are using a $1+\lambda$ approach (instead of the $1,\lambda$ noted above the code), since you are appending the parent to the offspring and then picking the best individual (which could be the parent of the previous generation). This is not a problem per se, but it might be interesting to see how the results change with a $1,\lambda$ approach. -Another twist that could spice things up is to try and train the individuals by playing different versions of the game (different sizes, with/without $k$-max pieces nimmable) and by playing different sides (first/second player). -That said, I think you did a great job with this lab! -![Bye bye](https://media2.giphy.com/media/ziWDuOipMj0BMrI540/giphy.gif) - -## Lab 3 (also known as Lab 9) -### Assignment -Solve a black-box problem using a black-box evolutionary algorithm. - -#### README - -# Black Box Problem - -Given a black-box fitness function, our goal is to solve problem instances 1, 2, 5 and 10 with the minimum number of fitness calls. \ -We are allowed to implement any algorithm we can think of, no rules. - -## Notes -I've implemented different variations of the algorithm but I did not have the opportunity to test them all and fine-tune them. \ -Thus i did not obtain the best results possible, but I'm still satisfied with the results I got. - -## Collaborations -I collaborated with [Davide Vitabile s330509](https://github.com/Vitabile), [Davide Sferrazza s326619](https://github.com/FarInHeight), [Simone Giambrone s317002](https://github.com/JustLooller) and [Andrea Panuccio s294603](https://github.com/AndPan96). - - -#### Code - -[Last Commit: Dec 3, 2023](https://github.com/ExalFabu/Computational-Intelligence/commit/dc4de1f6ae239f59db8cf360bf7a9a6884bc5249) - -lab9_lib.py - -```python -# Copyright © 2023 Giovanni Squillero -# https://github.com/squillero/computational-intelligence -# Free for personal or classroom use; see 'LICENSE.md' for details. - -from abc import abstractmethod - - -class AbstractProblem: - def __init__(self): - self._calls = 0 - - @property - @abstractmethod - def x(self): - pass - - @property - def calls(self): - return self._calls - - @staticmethod - def onemax(genome): - return sum(bool(g) for g in genome) - - def __call__(self, genome): - self._calls += 1 - fitnesses = sorted((AbstractProblem.onemax(genome[s :: self.x]) for s in range(self.x)), reverse=True) - val = sum(f for f in fitnesses if f == fitnesses[0]) - sum( - f * (0.1 ** (k + 1)) for k, f in enumerate(f for f in fitnesses if f < fitnesses[0]) - ) - return val / len(genome) - - -def make_problem(a): - class Problem(AbstractProblem): - @property - @abstractmethod - def x(self): - return a - - return Problem() -``` - -# Lab9 - Black Box EA - - -Wrote a local-search algorithm (eg. an EA) able to solve the Problem instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all. - - - -```python -import os -if "lab9_lib.py" not in os.listdir("."): - !curl https://raw.githubusercontent.com/squillero/computational-intelligence/master/2023-24/lab9_lib.py > lab9_lib.py - -``` - - -```python -from lab9_lib import make_problem -from tqdm import tqdm, trange -from collections import namedtuple -from copy import deepcopy -from dataclasses import dataclass, field -import random -from typing import Literal, Union, Callable -import numpy as np -import math -``` - - -```python -LOCI = 1000 -Gene = Literal[0,1] -Genome = tuple[Gene] - -@dataclass(frozen=True, repr=False) -class Individual: - - genome: tuple[Gene] = field(default_factory=lambda: list(random.choices([0, 1], k = LOCI)), repr=False) - _fitness: float = field(default=None, init=False, compare=False) - - def mutate(it: "Individual") -> "Individual": - gene_to_mutate = 10 - mutated_genome = [*it.genome] - for _ in range(gene_to_mutate): - mutated_genome[random.randrange(LOCI)] ^= 1 - return Individual(mutated_genome) - - def crossover(it: "Individual", other: "Individual", mode: Literal["uniform", "onecut"] = None) -> "Individual": - if mode is None: - mode = 'uniform' - if mode == "uniform": - return Individual( - [i if r < .5 else o for i, o, r in zip(it.genome, other.genome, [random.random() for _ in range(LOCI)])] - ) - elif mode == 'onecut': - cut = random.randrange(0, len(it.genome)) - new_genome = [*it.genome[:cut], *other.genome[cut:]] - assert len(new_genome) == len(it.genome), f"Somehow created a child with {len(new_genome)} loci" - return Individual(new_genome) - - - def evaluate(self: "Individual", fitness_fn: Callable[[Genome], float]) -> float: - """Wrapped evaluation inside individual to allow some kind of caching - - Args: - self (Individual): Individual - fitness_fn (Callable[[Genome], float]): Fitness function - - Returns: - float: fitness - """ - if self._fitness is None: - fitness = fitness_fn(self.genome) - object.__setattr__(self, "_fitness", fitness) - return self._fitness - - @property - def fitness(self) -> float: - assert self._fitness is not None, "Fitness has not been evaluated yet" - return self._fitness - - def __repr__(self: "Individual"): - return f"I(Zeros={sum([1 for it in self.genome if it == 0])}, Ones={sum([it for it in self.genome])}{'' if self.fitness is None else f', Fit={self.fitness}'})" - - def __str__(self: "Individual"): - return self.__repr__() - - @property - def phenotype(self) -> str: - return "".join(str(bit) for bit in self.genome) -``` - -## Problem Size 1 - - -```python -PROB_SIZE = 1 -EPOCHS = 10000 -POP_SIZE = 50 -OFFSPRING_SIZE = 25 -CROSSOVER_PROB = .2 -TOURNAMENT_SIZE = 3 - -# Not used here -SURVIVAL_RATE = .15 -CONVERGENCENESS_THRESHOLD = 0.0001 -``` - - -```python -Result = namedtuple("Result", ['individual', 'calls', 'size', 'epoch']) -def train(*, crossover_mode: str = None, extinction: bool = False, convergence_measure: Callable[[list[Individual]], list[float]] = None): - - if convergence_measure is None: - convergence_measure = lambda x: [i.fitness for i in x] - - problem = make_problem(PROB_SIZE) - - parents = [Individual() for _ in range(POP_SIZE)] - for i in parents: - i.evaluate(problem) - - max_fitness: Callable[[list["Individual"]], float] = lambda x: max([i.fitness for i in x]) - best_in_list: Callable[[list["Individual"]], "Individual"] = lambda x: [c for c in x if c.fitness == max_fitness(x)][0] - tournament_selection: Callable[[list["Individual"]], "Individual"] = lambda l: best_in_list(random.choices(l, k=TOURNAMENT_SIZE)) - epoch_bar = trange(0, EPOCHS, unit="epoch") - extinctions = 0 - best: Result = None - if extinction: - update_epoch_bar = lambda: epoch_bar.set_description(f"Fitness {max_fitness(parents):.2%} - #Calls: {problem.calls} - Extinctions: {extinctions}") - else: - update_epoch_bar = lambda: epoch_bar.set_description(f"Fitness {max_fitness(parents):.2%} - #Calls: {problem.calls}") - - for epoch in epoch_bar: - update_epoch_bar() - if math.isclose(1, best_in_list(parents).fitness): - break - offspring = [] - convergenceness = convergence_measure(parents) - if extinction and np.std(convergenceness) < CONVERGENCENESS_THRESHOLD: - extinctions += 1 - to_purge = int(len(parents) * SURVIVAL_RATE) - parents = random.choices(parents, k=to_purge) - for _ in range(POP_SIZE - len(parents)): - ind = Individual() - ind.evaluate(problem) - parents.append(ind) - else: - for i in range(OFFSPRING_SIZE): - new_ind: "Individual" - if random.random() < CROSSOVER_PROB: - new_ind = tournament_selection(parents).crossover(tournament_selection(parents), mode=crossover_mode) - else: - new_ind = tournament_selection(parents) - new_ind = new_ind.mutate() - new_ind.evaluate(problem) - offspring.append(new_ind) - parents = sorted([*parents, *offspring], key=lambda i:i.fitness, reverse=True)[:POP_SIZE] - - - best_ind = best_in_list(parents) - if best is None or best.individual.fitness < best_ind.fitness: - best = Result(best_ind, problem.calls, problem.x, epoch) - - return best -``` - - -```python -CROSSOVER_PROB = 0.5 -best_one = train() -``` - - Fitness 57.40% - #Calls: 200: 0%| | 4/10000 [00:00<04:28, 37.24epoch/s] - - Fitness 98.50% - #Calls: 250025: 100%|██████████| 10000/10000 [02:40<00:00, 62.48epoch/s] - - - -```python -best_one -``` - - - - - Result(individual=I(Zeros=15, Ones=985, Fit=0.985), calls=215250, size=1, epoch=8607) - - - -# Problem Size 2 - - With Extinction based on the population fitness - - -```python -PROB_SIZE = 2 -CROSSOVER_PROB = .5 -CONVERGENCENESS_THRESHOLD = 0.001 -best_two = train(extinction=True, crossover_mode='onecut') -``` - - Fitness 25.13% - #Calls: 75 - Extinctions: 0: 0%| | 0/10000 [00:00 list[float]: - """For each individual the sum of the edit distances to each other individual - - Args: - pop (list[Individual]): population - - Returns: - list[float]: sum of edit distances for each ind - """ - def edit_distance(it: "Individual", other: "Individual") -> float: - dist = sum([e1 ^ e2 for e1, e2 in zip(it.genome, other.genome)]) - return dist - return [ - sum([edit_distance(it, other) for j, other in enumerate(pop) if j != i]) for i, it in enumerate(pop) - ] -``` - -## Island Implementation - - -```python -ISLANDS = 2 -MIGRATION_STEP = 1000 -MIGRANT_COUNT = 5 -def train_with_islands(*, crossover_mode: str = None, extinction: bool = False, convergence_measure: Callable[[list[Individual]], list[float]] = None): - - if convergence_measure is None: - convergence_measure = lambda x: [i.fitness for i in x] - - problem = make_problem(PROB_SIZE) - - archipelago = [[Individual() for _ in range(POP_SIZE)] for _ in range(ISLANDS)] - for parents in archipelago: - for i in parents: - i.evaluate(problem) - - max_fitness: Callable[[list["Individual"]], float] = lambda x: max([i.fitness for i in x]) - best_in_list: Callable[[list["Individual"]], "Individual"] = lambda x: [c for c in x if c.fitness == max_fitness(x)][0] - tournament_selection: Callable[[list["Individual"]], "Individual"] = lambda l: best_in_list(random.choices(l, k=TOURNAMENT_SIZE)) - epoch_bar = trange(0, EPOCHS, unit="epoch") - extinctions = 0 - best: Result = None - if extinction: - update_epoch_bar = lambda: epoch_bar.set_description(f"Fitness {max_fitness(list([ind for pop in archipelago for ind in pop])):.2%} - #Calls: {problem.calls} - Extinctions: {extinctions}") - else: - update_epoch_bar = lambda: epoch_bar.set_description(f"Fitness {max_fitness([ind for pop in archipelago for ind in pop]):.2%} - #Calls: {problem.calls}") - - for epoch in epoch_bar: - - if (epoch+1) % MIGRATION_STEP == 0: - random.shuffle(archipelago) - for idx in range(0,ISLANDS,2): - # swap - tmp = archipelago[idx][:MIGRANT_COUNT] - archipelago[idx + 1][:MIGRANT_COUNT] = archipelago[idx][:MIGRANT_COUNT] - archipelago[idx + 1][:MIGRANT_COUNT] = tmp - pass - - for ic, parents in enumerate(archipelago): - update_epoch_bar() - if math.isclose(1, best_in_list(parents).fitness): - break - offspring = [] - convergenceness = convergence_measure(parents) - if extinction and np.std(convergenceness) < CONVERGENCENESS_THRESHOLD: - extinctions += 1 - to_purge = int(len(parents) * SURVIVAL_RATE) - parents = random.choices(parents, k=to_purge) - for _ in range(POP_SIZE - len(parents)): - ind = Individual() - ind.evaluate(problem) - parents.append(ind) - else: - for i in range(OFFSPRING_SIZE): - new_ind: "Individual" - if random.random() < CROSSOVER_PROB: - new_ind = tournament_selection(parents).crossover(tournament_selection(parents), mode=crossover_mode) - else: - new_ind = tournament_selection(parents) - new_ind = new_ind.mutate() - new_ind.evaluate(problem) - offspring.append(new_ind) - parents = sorted([*parents, *offspring], key=lambda i:i.fitness, reverse=True)[:POP_SIZE] - - - best_ind = best_in_list(parents) - if best is None or best.individual.fitness < best_ind.fitness: - best = Result(best_ind, problem.calls, problem.x, epoch) - - archipelago[ic] = parents - - - return best -``` - - -```python -PROB_SIZE = 10 -POP_SIZE = 20 -OFFSPRING_SIZE = 10 -CROSSOVER_PROB - .2 -best_ten = train_with_islands() -``` - - Fitness 16.20% - #Calls: 200: 0%| | 8/10000 [00:00<02:13, 75.07epoch/s] - - Fitness 33.58% - #Calls: 200030: 100%|██████████| 10000/10000 [02:02<00:00, 81.60epoch/s] - - -### Results -As you can see i did not obtain great results, I believe there's much room for improvement, starting from some parameter tweaking and also implementing some more advanced techniques like the ones we saw in class. -An improvement could be made by using a different "convergence measure" used for extinction, based on the genome instead of the fitness. -I also tried to implement a migration policy, but i did not have enough time to test it properly. - - -```python -best_one, best_two, best_five, best_ten -``` - - - - - (Result(individual=I(Zeros=15, Ones=985, Fit=0.985), calls=215250, size=1, epoch=8607), - Result(individual=I(Zeros=122, Ones=878, Fit=0.878), calls=248594, size=2, epoch=9935), - Result(individual=I(Zeros=344, Ones=656, Fit=0.5636), calls=250000, size=5, epoch=9997), - Result(individual=I(Zeros=478, Ones=522, Fit=0.33579005), calls=197110, size=10, epoch=9853)) - - - -### Peer Reviews Submitted - -#### Review 1 [(Open On Github)](https://github.com/RaffaeleViola/computational-intelligence/issues/4) - -Hi Raffaele, -the code is well-written and the README summarizes your intentions and iterations, demonstrating you've put thoughts into your work! -Nice touch using a in-memory cache to avoid re-calling the fitness function uselessly, though i believe you missed the line where you save the fitness once you calculate it on new individuals. -One other thing worth exploring would be to use this cache as a "filter" for the offspring, maybe it could be beneficial to consider only new individuals you have never explored so far, leading to a better result. -I also appreciated your island implementation, i too believe it's the right path to success in this problem, and in fact the results speak for themselves - -#### Review 2 [(Open On Github)](https://github.com/TiloccaS/computational-intelligence-2023-24/issues/2) - -Hi Salvatore, the code is well written and understandable at first sight, the README helps a lot too! -I see you've implemented multiple crossover strategies and that's nice. One thing that could improve your results is replacing the early stopping with extinction or maybe with some selection on the offspring (like ignoring the children that have the same genome of one individual in the population). I believe that these improvements could improve your exploration and to avoid a convergence (which i believe you encountered in problem instances > 2) - -## Lab 4 (also known as Lab 10) - -### Assignment -Tic-Tac-Toe player using Reinforcement Learning -#### README - -# LAB 10 -Use reinforcement learning to devise a tic-tac-toe player. -Implemented a Q-Learning agent that learns to play Tic-Tac-Toe against a random opponent, also exploiting symmetries. -## Deadlines - -- Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus) -- Reviews: [Befana](https://en.wikipedia.org/wiki/Befana) - -### Notes -- Reviews will be assigned on Monday, December 4 -- You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit) - ---- - -### Honesty Declaration -As a starting point i took a look at the code provided by the teacher, and my collegue/friends, whom i also collaborated with. -Nonetheless, I wrote my own code from scratch. - -Collegues/friends i talked to: -- [Davide](https://github.com/FarInHeight) -- [Davide](https://github.com/Vitabile) -- [Andrea](https://github.com/AndPan96) - -#### Code - -[Last Commit: Dec 18, 2023](https://github.com/ExalFabu/Computational-Intelligence/commit/a9bcbc5c00881ff069561355191c1d23402433c2) - -# LAB 10 -Use reinforcement learning to devise a tic-tac-toe player. - -## Deadlines - -- Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus) -- Reviews: [Befana](https://en.wikipedia.org/wiki/Befana) - -### Notes -- Reviews will be assigned on Monday, December 4 -- You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit) - - -```python -import numpy as np -from tqdm import trange -from dataclasses import dataclass, field -from typing import Literal, Union -from abc import ABC, abstractmethod -from collections import defaultdict -from copy import deepcopy -import random -import pickle -from os import path -``` - -## Game Class - - -```python -DirectIndex = Literal[0,1,2,3,4,5,6,7,8] -RowColIndex = tuple[Literal[0,1,2], Literal[0,1,2]] -Move = Union[DirectIndex, RowColIndex] -Cell = Literal[-1, 0, 1] -PlayerIndex = Literal[0,1] -BoardHash = str - -CELL_TO_EMOJI=("⬜","❎","⏺️") -CELL_TO_CHAR=("B", "X", "O") - -ROTATED_INDEXES = [ - [0, 6, 8, 2], - [1, 3, 7, 5], - [2, 0, 6, 8], - [3, 7, 5, 1], - [4, 4, 4, 4], - [5, 1, 3, 7], - [6, 8, 2, 0], - [7, 5, 1, 3], - [8, 2, 0, 6], -] -# 0 with -90 * 1 rotation maps to 6, with -90 * two maps to 8 and so on... - -def charify(arr) -> str: - """Convert board array to hash-compatible string representation""" - return "".join([CELL_TO_CHAR[i + 1] for i in arr]) - - -CANONICAL_REPRESENTATION: bool = False - -@dataclass(repr=False) -class Board: - board: np.ndarray = field(default_factory=lambda: np.ones(9, dtype=np.int8) * -1) - - @staticmethod - def i_to_rc(i: DirectIndex) -> RowColIndex: - return i//3, i % 3 - - @staticmethod - def rc_to_i(rc: RowColIndex) -> DirectIndex: - r, c = rc - return r*3 + c - - @staticmethod - def is_valid_index(idx: Move) -> bool: - if isinstance(idx, tuple): - return idx[0] >= 0 and idx[0] <= 2 and idx[1]>=0 and idx[1]<= 2 - else: - return idx >= 0 and idx <= 8 - - def __getitem__(self, idx: Move) -> Cell: - """Access the cell directly with index or row-col""" - assert Board.is_valid_index(idx), "Invalid Index: {idx}" - if isinstance(idx, tuple): - idx = Board.rc_to_i(idx) - return self.board[idx] - - def __setitem__(self, idx: Move, value: Cell) -> None: - assert Board.is_valid_index(idx), "Invalid Index: {idx}" - if isinstance(idx, tuple): - idx = Board.rc_to_i(idx) - self.board[idx] = value - - def is_valid_move(self: "Board",move: Move) -> bool: - return self[move] == -1 - - def move(self: "Board", player: "PlayerIndex", move: Move) -> bool: - valid = self[move] == -1 - if valid: - self[move] = player - return valid - - def is_playable(self: "Board") -> bool: - return any(self.board == -1) and self.won() == -1 - - def won(self: "Board") -> Literal[0, 1, -1]: - """Check if someone has won""" - - rows = [[0,1,2], [3,4,5], [6,7,8]] - cols = [[0,3,6],[1,4,7], [2,5,8]] - diag = [[0,4,8], [2,4,6]] - all_ = [*rows, *cols, *diag] - - if any(all(self.board[c] == 0) for c in all_): - return 0 - elif any(all(self.board[c] == 1) for c in all_): - return 1 - else: - return -1 - - def canonical(self) -> tuple["Board", Literal[0,1,2,3]]: - as_mat = self.board.reshape((3,3)) - rots = [(charify(np.rot90(as_mat, k=i).flatten()), i) for i in range(4)] - canonical, idx = sorted(rots, key=lambda x: x[0])[0] - canonical = [CELL_TO_CHAR.index(c)-1 for c in canonical[:9]] - return Board(np.array(canonical)), idx - - def __repr__(self: "Board") -> str: - winner = self.won() - return f"Board({str(self.board)}, {winner=}) " - - def __str__(self) -> str: - """Pretty print the board""" - s = "" - for r in range(3): - for c in range(3): - s += CELL_TO_EMOJI[self[(r,c)] + 1] - s+="\n" - winner = self.won() - if winner != -1: - s += f"Winner: Player {winner}" - return s - - def hash(self: "Board") -> BoardHash: - """Stringified version of the board, so it can be used as a dict key""" - plind = ((self.board == -1).sum() + 1) % 2 - return charify(self.board) + str(plind) - - @staticmethod - def from_hash(s: BoardHash) -> "Board": - assert len(s) >= 9, "Invalid board" - b: list[int] - try: - b = [CELL_TO_CHAR.index(c)-1 for c in s[:9]] - except ValueError: - raise AssertionError("InvalidError") - return Board(np.array(b)) - - @staticmethod - def from_canonical(canonical: "Board", idx: Literal[1,2,3,4]) -> "Board": - b = np.rot90(canonical.board.reshape((3,3)), k = 4-idx).flatten() - return Board(np.array(b)) - - -``` - - -```python -def clamp(value, min_, max_): - """Clamp value between min_ and max_""" - return min(max(value, min_), max_) - -def avg(iterable): - return sum(iterable)/len(iterable) -``` - -## Players - -#### Abstract Player - - -```python -class Player(ABC): - """Abstract Player class""" - - @property - @abstractmethod - def name(self: "Player") -> str: - pass - - @abstractmethod - def choose_move(self, board: "Board", player_index: PlayerIndex) -> Move: - raise NotImplementedError -``` - -#### Utility Functions - - -```python -def game(player0: "Player", player1: "Player", verbose: bool = False) -> Literal[-1, 0, 1]: - """Play a single game""" - board = Board() - if verbose: - print(board) - players = [player0, player1] - plind: PlayerIndex = 1 - while board.is_playable(): - plind = 1-plind - player = players[plind] - move = None - while move is None or not board.is_valid_move(move): - move = player.choose_move(board, plind) - board.move(plind, move) - if verbose: - print(board) - return board.won() - -def benchmark(player_to_benchmark: "Player", opponent: "Player", games: int = 100, *, quiet: bool = False) -> tuple[float, float, float]: - """Benchmark a player, in both position""" - wins_as_first, wins_as_second = 0, 0 - draws_as_first, draws_as_second = 0, 0 - for i in range(games): - if i % 2 == 0: - end = game(player_to_benchmark, opponent) - wins_as_first += 1 if end == 0 else 0 - draws_as_first += 1 if end == -1 else 0 - else: - end = game(opponent, player_to_benchmark) - wins_as_second += 1 if end == 1 else 0 - draws_as_second += 1 if end == -1 else 0 - acc, first_acc, sec_acc = (wins_as_first + wins_as_second) / games, wins_as_first*2/games, wins_as_second*2/games - draw_acc, draw_first_acc, draw_sec_acc = (wins_as_first + wins_as_second + draws_as_first + draws_as_second) / games, (wins_as_first+draws_as_first)*2/games, (wins_as_second+draws_as_second)*2/games - if not quiet: - print(f"[{player_to_benchmark.name} vs {opponent.name} for {games} games]") - print(f" Wins: {acc:.2%}, {first_acc:.2%} as first, {sec_acc:.2%} as second") - print(f"Wins + Draws: {draw_acc:.2%}, {draw_first_acc:.2%} as first, {draw_sec_acc:.2%} as second") - else: - return (acc, first_acc, sec_acc), (draw_acc, draw_first_acc, draw_sec_acc) - - -``` - -### Random Player and Human Player - - -```python -@dataclass -class AndyDwyer(Player): - """Random Player""" - - @property - def name(self): - return "Andy Dwyer" - - def choose_move(self, board, player_index) -> DirectIndex: - """Make random move""" - return random.randrange(0,9) - -@dataclass -class TomHaverford(Player): - """Human Player, I wanted to have fun :)""" - - @property - def name(self): - return "Tom Haverford" - - def choose_move(self, board, player_index) -> DirectIndex: - print(board) - while True: - inp = input(f"{CELL_TO_EMOJI[player_index+1]} choose your move (row, column):") - try: - r, c = inp.split(",") - r = int(r.strip()) - c = int(c.strip()) - return Board.rc_to_i((r,c)) - except: - pass - -``` - -### Q-Learning - -#### Q-Learning Class - - -```python -def entry_default(): - """Needed for the object to be pickable""" - return [0] * 9 - -def qtable_default(): - """Needed for the object to be pickable""" - - - return defaultdict(entry_default) - - -@dataclass -class RonSwanson(Player): - """Q-Learning Player""" - - learning_rate: float = field(default=0.1) - discount_rate: float = field(default=0.99) - exploration_rate: float = field(default=1) - min_exploration_rate: float= field(default=0.01) - exploration_decay_rate: float= field(default=2.5e-5) - num_of_episodes: int = field(default=1_000) - qtable: dict[BoardHash, list[float]] = field(default_factory=qtable_default, repr=False) - # qtable: dict[BoardHash, list[float]] = field(default_factory=lambda: defaultdict(lambda: [0]*9), repr=False) - - @property - def name(self): - return "Ron Swanson" - - def reward(self, type: Literal["action", "game"], board: "Board", *, move: Move = None, player_position: PlayerIndex = None) -> float: - assert type in ["action", "game"], "Invalid reward type" - if type == "action": - assert move is not None, "Cannot retrieve reward for action if no move is provided" - return 1 if board.is_valid_move(move) else float('-inf') - else: - assert player_position is not None, "Cannot retrieve reward for game if no player position is provided" - won = board.won() - draw = won == -1 - if draw: return 0 - else: - return 10 if won == player_position else -10 - - def training_move_chooser(self, board: "Board") -> Move: - if random.uniform(0, 1) > self.exploration_rate: - # exploit - if board.hash() in self.qtable: - return np.argmax(self.qtable[board.hash()]) - # explore or nothing to exploit - return random.randrange(0, 9) - - def train(self: "RonSwanson", opponent: "Player" = None, verbose: bool = False, canonical: bool = None): - if opponent is None: - opponent = AndyDwyer() - if canonical is None: - canonical = CANONICAL_REPRESENTATION - rewards_per_episode = [0] * self.num_of_episodes - pbar = trange(self.num_of_episodes, unit="episode", desc=f"Training against {opponent.name}") - - if not verbose: - vprint = lambda x: None - else: - vprint = print - for episode in pbar: - board = Board() - if episode % 2 == 0: - whoami = 0 - else: - whoami = 1 - plind: PlayerIndex = 1 - - previous_board_hash: BoardHash - next_board_hash: BoardHash - move: Move - - while board.is_playable(): - plind = 1-plind - if whoami == plind: - move_was_valid = False - vprint(f"{self.name}'s turn ({plind})") - if canonical: - # if canonical, play with the canonical board - vprint(f"Canon: going from {board.hash()}") - board, canon_idx = board.canonical() - vprint(f"Canon: going to {board.hash()} (rot90: {canon_idx})") - - previous_board_hash = board.hash() - while not move_was_valid: - move = self.training_move_chooser(board) - reward = self.reward("action", board, move=move) - move_was_valid = board.is_valid_move(move) - if not move_was_valid: - self.qtable[previous_board_hash][move] = reward # -inf - continue - board.move(plind, move) - next_board_hash = board.hash() - vprint(f"{self.name} is picking: {move=},{reward=},{previous_board_hash=},{next_board_hash=}") - # Update qtable - self.qtable[previous_board_hash][move] *= 1-self.learning_rate - self.qtable[previous_board_hash][move] += self.learning_rate * (reward + - self.discount_rate * - (-np.max(self.qtable[next_board_hash]))) - # Vitabile's idea to put minus sign - if canonical: - # restore the non-canonical for the opponent - board = Board.from_canonical(board, canon_idx) - vprint(f"Going back to {board.hash()}") - rewards_per_episode[episode] += reward - else: - opponent_move: Move = None - while opponent_move is None or not board.is_valid_move(opponent_move): - opponent_move = opponent.choose_move(board, plind) - vprint(f"{opponent.name}'s turn ({plind}) -> {opponent_move}") - board.move(plind, opponent_move) - - reward = self.reward("game", board, player_position=whoami) - rewards_per_episode[episode] += reward - self.qtable[previous_board_hash][move] *= 1-self.learning_rate - self.qtable[previous_board_hash][move] += self.learning_rate * ( - reward + self.discount_rate * (-np.max(self.qtable[next_board_hash])) - ) - - self.exploration_rate = clamp(np.exp(-self.exploration_decay_rate * episode), self.min_exploration_rate, 1) - if episode % clamp(int(self.num_of_episodes/100), 1, self.num_of_episodes) == 0: - pbar.set_postfix({ - "Explored": len(self.qtable.keys()) - }) - - return rewards_per_episode - - def choose_move(self, board: Board, player_index: PlayerIndex) -> Move: - original_board = deepcopy(board) - idx = 3 - if CANONICAL_REPRESENTATION: - board, idx = board.canonical() - if board.hash() in self.qtable: - move = np.argmax(self.qtable[board.hash()]) - if board.is_valid_move(move): - rotated_move = ROTATED_INDEXES[move][-idx] - if original_board.is_valid_move(rotated_move): - board = original_board - return rotated_move - else: - print(f"Move was valid/invalid??? {original_board.hash()=} -{idx}-> {board.hash()=} {move=}, {rotated_move=} {np.argmax(self.qtable[board.hash()])=}") - return random.randrange(0,9) -``` - -#### Q-Learning Player Results - - -```python -CANONICAL_REPRESENTATION = False -filename = "./basic_ron.pkl" -use_saved_obj: bool = False -if use_saved_obj and path.isfile(path.abspath(filename)): - with open(filename, "rb") as f: - qlearning = pickle.load(f) -else: - qlearning = RonSwanson(num_of_episodes=100_000) - _ = qlearning.train() - with open(filename, "wb") as f: - pickle.dump(qlearning, f) -``` - - Training against Andy Dwyer: 100%|██████████| 100000/100000 [04:34<00:00, 364.38episode/s, Explored=5475] - - - -```python -benchmark(qlearning, AndyDwyer(), games=1000) -``` - - [Ron Swanson vs Andy Dwyer for 1000 games] - Wins: 91.10%, 99.60% as first, 82.60% as second - Wins + Draws: 99.90%, 100.00% as first, 99.80% as second - - -#### Q-Learning with Canonical Representation Results -In order to reduce the number of states, exploiting the symmetries in TicTacToe -I've tried to use a canonical representation of the board. \ -The canonical board is the one with the smallest lexicographical order among the \ -boards obtained by applying all the possible rotations to the original board. \ -The player then uses the canonical board to update the Q-table and to choose the next move. - - -```python -CANONICAL_REPRESENTATION = True -filename = "./canon_ron.pkl" -use_saved_obj: bool = True -if use_saved_obj and path.isfile(filename): - with open(filename, "rb") as f: - with_canon = pickle.load(f) -else: - with_canon = RonSwanson(num_of_episodes=100_000) - _ = with_canon.train(canonical=True) - with open(filename, "wb") as f: - pickle.dump(with_canon, f) -``` - - Training against Andy Dwyer: 0%| | 0/100000 [00:00 list[Move]: - valids = [] - if p.x != 0: - valids.append(Move.LEFT) - if p.x != 4: - valids.append(Move.RIGHT) - if p.y != 0: - valids.append(Move.TOP) - if p.y != 4: - valids.append(Move.BOTTOM) - - return valids - - -POSSIBLE_MOVES = tuple( - CompleteMove(p, m) for p in POSSIBLE_POSITIONS for m in valid_move_from_position(p) -) -"""Every possible moves, taking into account the position in the board (obviously, not considering the board itself)""" - -INT_TO_CHAR = ["B", "X", "O"] -"""To stringify board""" - -CHARS_TO_INT = { - "B": -1, - "X": 0, - "O": 1 -} -"""To parse stringified version back into Game""" - - -class CustomGame(Game): - def pprint(self): - chars = np.ndarray(self._board.shape, np.dtypes.StrDType) - chars[self._board == -1] = "⬜" - chars[self._board == 0] = "❎" - chars[self._board == 1] = "🔵" - for row in chars: - for c in row: - print(c, end="") - print() - - def __repr__(self) -> str: - return str(self) - - - def __str__(self) -> str: - arr: list[int] = deepcopy(self._board).flatten().tolist() - stringified = "".join([INT_TO_CHAR[it + 1] for it in arr]) - return f"{self.current_player_idx}{stringified}" - - def from_board(board: np.ndarray, player_idx: int) -> "CustomGame": - c = CustomGame() - c._board = board - c.current_player_idx = player_idx - return c - - def from_str(s: str) -> "CustomGame": - p, b = s[0], s[1:] - assert len(b) == 25 and p.isdigit(), f"Invalid Board {s} or playerind {p} ???" - board = np.array([CHARS_TO_INT[c] for c in b]).reshape((5,5)) - g = CustomGame() - g._board = board - g.current_player_idx = int(p) - return g - - def symmetries(start: "CustomGame") -> list[str]: - def rot_flip(board: np.ndarray, player_idx: int) -> list["CustomGame"]: - starting_board = board - rotations = [CustomGame.from_board(np.rot90(starting_board, k=k), player_idx) for k in range(4)] - flip = np.fliplr(starting_board) - flip_rotations = [CustomGame.from_board(np.rot90(flip, k=k), player_idx) for k in range(4)] - return [*rotations, *flip_rotations] - - inverted = start.get_board() - zeros = inverted == 0 - ones = inverted == 1 - inverted[zeros] = 1 - inverted[ones] = 0 - - all_variants = set([*rot_flip(start.get_board(), start.current_player_idx), *rot_flip(inverted, 1-start.current_player_idx)]) - # all_variants = set([*rot_flip(start.get_board(), start.current_player_idx)]) - return sorted([str(it) for it in list(all_variants)]) - - def to_canon(start: "CustomGame") -> tuple["CustomGame", int]: - symmetries = start.symmetries() - self_idx = symmetries.index(str(start)) - return CustomGame.from_str(symmetries[0]), self_idx - - def from_canon(canon: "CustomGame", idx: int) -> "CustomGame": - symmetries = canon.symmetries() - return CustomGame.from_str(symmetries[idx]) - - def from_game(game: "Game") -> "CustomGame": - return CustomGame.from_board(game.get_board(), game.get_current_player()) - - def __hash__(self) -> str: - return str(self).__hash__() - - def __eq__(self, other: "CustomGame") -> bool: - return self.__hash__() == other.__hash__() - - @staticmethod - def convert_canon_move(canon_board: "CustomGame", canon_move: "CompleteMove", original_board: "CustomGame") -> "CompleteMove": - target_board = str(canon_board.simulate_move(canon_move).to_canon()[0]) - for move in original_board.valid_moves(None, False): - temp_board = original_board.simulate_move(move) - if str(temp_board.to_canon()[0]) == target_board: - return move - debug = f"canon= {canon_board} move= {canon_move} original= {original_board}" - raise Exception(f"Unable to convert move from canon to non-canon\n{debug}") - - def valid_moves(self, player: int = None, filter_duplicates: bool = True, canon_unique: bool = False) -> tuple[CompleteMove]: - if player is None: - player = self.current_player_idx - valids = [it for it in POSSIBLE_MOVES if self.is_valid(it)] - if not filter_duplicates: - return valids - s = defaultdict(list) - for valid in valids: - copy = deepcopy(self) - copy._Game__move(*valid, player) - if canon_unique: - s[str(copy.to_canon()[0])].append(valid) - else: - s[str(copy)].append(valid) - non_duplicate = [] - for _, moves in s.items(): - non_duplicate.append(moves[0]) - return tuple(non_duplicate) - - def is_valid(self: "CustomGame", move: "CompleteMove") -> bool: - return self._board[move[0][1], move[0][0]] == -1 or self._board[move[0][1], move[0][0]] == self.current_player_idx - - def play(self, player1: "Player", player2: "Player", verbose: bool = False) -> int: - '''Play the game. Returns the winning player''' - players = [player1, player2] - winner = -1 - if verbose: - pbar = tqdm(range(100)) - pbar.disable = not verbose - pbar.unit = "move" - while winner < 0: - ok = False - counter = 0 - verbose and pbar.set_postfix({"Player": self.current_player_idx, "wrong-moves": counter}) - while not ok: - move = players[self.current_player_idx].make_move(self) - ok = self._Game__move(*move, self.current_player_idx) - counter += 1 - if verbose and counter > 1: - pbar.set_postfix({"Player": self.current_player_idx, "wrong-moves": counter}) - winner = self.check_winner() - self.current_player_idx = 1-self.current_player_idx - verbose and pbar.update(1) - return winner - - @property - def score(self) -> int: - - # Reference: https://github.com/poyrazn/quixo/blob/77d876e0e9ce5c9aba677060a62713cb66243fef/players/aiplayer.py#L79 - winner = self.check_winner() - if winner != -1: - return (5**5) * (1 if winner == self.current_player_idx else -1) - transposed = self._board.transpose() - - x_score = [] - o_score = [] - for row, column in zip(self._board, transposed): - x_score.append(sum(row == 0)) - x_score.append(sum(column == 0)) - o_score.append(sum(row == 1)) - o_score.append(sum(column == 1)) - - diag = self._board.diagonal() - second_diag = self._board[:, ::-1].diagonal() - - x_score.append(sum(diag == 0)) - o_score.append(sum(diag == 1)) - x_score.append(sum(second_diag == 0)) - o_score.append(sum(second_diag == 1)) - - score_x, score_o = 5**max(x_score), 5**max(o_score) - score = score_x - score_o - score *= 1 if self.current_player_idx == 0 else -1 - return score - - def simulate_move(self, move: "CompleteMove") -> "CustomGame": - copy = deepcopy(self) - investigating = copy.is_valid(move) - success = copy._Game__move(*move, copy.current_player_idx) - if success: - copy.current_player_idx = 1-copy.current_player_idx - else: - print("Simulated invalid move") - assert success == investigating, "AAAA SOMEHOW IS_VALID is different thant Game.move validation | board {copy} - move {move} move for {copy.current_player_idx}" - return copy - - -@pytest.mark.benchmark -def test_benchmark_symmetries(number: int = 1_000) -> None: - import timeit - pbar = tqdm(range(3), unit="test", leave=False) - ff = timeit.timeit(stmt="it.valid_moves(None, False, False)", setup="from custom_game import CustomGame;it = CustomGame()", number=number) - pbar.update(1) - tf = timeit.timeit(stmt="it.valid_moves(None, True, False)", setup="from custom_game import CustomGame;it = CustomGame()", number=number) - tfup = tf/ff - pbar.update(1) - tt = timeit.timeit(stmt="it.valid_moves(None, True, True)", setup="from custom_game import CustomGame;it = CustomGame()", number=number) - ttup = tt/ff - - pbar.update(1) - - - print(f"Benchmark ({number}): Valid={ff:.2f}s Dedup={tf:.2f}s ({tfup:+.0%}) CanonDedup={tt:.2f}s ({ttup:+.0%})") - -if __name__ == "__main__": - from random import choice - test_benchmark_symmetries() -``` - -#### Minimax Player - -```python -from typing import TYPE_CHECKING, Literal, Union -try: - from game import Player, Game - from custom_game import CustomGame, POSSIBLE_MOVES - if TYPE_CHECKING: - from custom_game import CompleteMove -except: - from .game import Player, Game - from .custom_game import CustomGame, POSSIBLE_MOVES - if TYPE_CHECKING: - from .custom_game import CompleteMove - -import numpy as np -from collections import defaultdict -import random -import time - - - -class MinMaxPlayer(Player): - """ Minimax Player with alpha-beta pruning (togglable) and a hash-table to store previously evaluated states. - - There are 4 possible pruning 'levels' (explained in detail below), i believe the best tradeoff between pruning and speed is level 1, - going at a deeper level is just too much time wasted due to the time required to process the (ineffiently implemented) symmetries. - To have an understanding of the difference of time there is a bencharmking function that shows it (see `custom_game.test_benchmark_symmetries`), spoiler: +2400% - """ - - def __init__( - self, - max_depth: int = 2, - *, - alpha_beta: bool = True, - pruning: Literal["0", "1", "2", "3"] = 1, - htable: bool = True, - - ) -> None: - """Init - - Args: - max_depth (int, optional): Tree depth. Defaults to 2. - alpha_beta (bool, optional): Whether to use the Alpha-Beta pruining. Defaults to True. - pruning (Literal['0', '1', '2', '3'] , optional): Pruning level. Defaults to 1. - This pruning level determines the amount of pre-filtering done to the MinMax tree (i.e. how many children a node has) - 0: Consider only valid moves - 1: Consider only valid moves that land on distinct boards (purge moves that would land on a board that is already covered by another move) - 2: Consider only valid moves that land on distinct *canonical* boards (purge moves that would land on the same equivalence class of already covered boards) - 3: Same as 2, plus we filter the boards that we have already covered on a lower depth (where the lowest is the root) - This is done because it is possible, with a sufficiently high `max_depth`, to loop into an already covered board, - and if I have encountered it at a lower depth, it means that that evaluation has more information than I can ever hope to achieve, meaning it's useless - to expand this subtree - htable (bool, optional): Whether to use an hash-table to save and use already evaluated states. Defaults to True. - """ - super().__init__() - - self.max_depth = 2 if max_depth is None else max_depth - self.use_alpha_beta_pruning = alpha_beta - self.pruning_level = pruning - self.use_htable = htable - - self.history: dict[str, "CompleteMove"] = dict() - """Hash-Table but only for complete moves, always enabled""" - self.htable: dict[ - str, dict[tuple[Literal["l", "h"], int], float] - ] = defaultdict(lambda: defaultdict(float)) - """Hash Table for intermediate states, enabled with `htable` flag""" - - self._stats = defaultdict(int) - """Used to gather some basic stats and counters""" - - @property - def short_name(self) -> str: - '''Short Name used in pictures''' - return f"MinMax({'AB, ' if self.use_alpha_beta_pruning else ''}D{self.max_depth}, P{self.pruning_level}{', H' if self.use_htable else ''})" - - @property - def name(self) -> str: - '''Full Name''' - return f"MinMax(depth={self.max_depth}, alpha_beta={self.use_alpha_beta_pruning}, pruning={self.pruning_level}, use_htable={self.use_htable})" - - def make_move(self, game: Game) -> "CompleteMove": - start = time.time() - cg = CustomGame.from_game(game) - best_move = self._minmax(cg) - if best_move is None or not cg.is_valid(best_move): - self._stats["EVAL-invalidmove"] += 1 - best_move = random.choice(cg.valid_moves()) - else: - self._stats['evals'] += 1 - self._stats['evals-ms'] += (time.time() - start) - - - return best_move - - def search_in_htable( - self, game: "CustomGame", curr_depth: int, curr_side: Literal["l", "h"] - ) -> Union[float, None]: - """Searches the move in the hash-table - Look for states explored previously prioritizing the ones that have been visited with a lower depth - (meaning it explored more states, and thus has more insights), exploiting also the states that have been visited by the other players (*-1) - - - Args: - game (CustomGame): game we are looking for - curr_depth (int): current depth - curr_side (Literal[l,h]): Side at which we are on (l = low = min | h = high = max) - - Returns: - Union[float, None]: The value stored in the hash table if found, None otherwise (or if htable disabled) - """ - if not self.use_htable or str(game) not in self.htable: - self._stats["HTABLE-MISS"] += 1 - return None - - visited = self.htable[str(game)] - samesies = defaultdict(float) - '''stored here are the visited states played on the same side''' - contries = defaultdict(float) - '''stored here are the visited states played on the opponent side''' - - for key, value in visited.items(): - side, depth = key - if side == curr_side and depth <= curr_depth: - samesies[depth] = value - elif side != curr_side and depth <= curr_depth: - # note here that if we are at an opponent side, we flip the value (leveraging the fact that our heuristic can do so) - contries[depth] = -value - - # If there are some match save the (depth, value) of it, - if len(samesies) != 0: - sms_dv = min(samesies.keys()) - sms_dv = (sms_dv, samesies[sms_dv]) - else: - #otherwise continue with an impossible value that we will filter out later - sms_dv = (self.max_depth +10, None) - - if len(contries) != 0: - cnt_dv = min(contries.keys()) - cnt_dv = (cnt_dv, contries[cnt_dv]) - else: - cnt_dv = (self.max_depth +10, None) - - # Get the (depth, value) that has the lowest depth (meaning more insight) - dv = sms_dv if sms_dv[0] < cnt_dv[0] else cnt_dv - - # If a match is really found (and it is not an impossible thing that we previously set), return the value of that move - if dv[0] <= self.max_depth: - self._stats["HTABLE-HIT"] += 1 - self._stats[f"HTABLE-HIT-{dv[0]}/{curr_depth}"] += 1 - return dv[1] - - self._stats["HTABLE-MISS"] += 1 - return None - - def put_in_htable( - self, - game: "CustomGame", - curr_depth: int, - curr_side: Literal["l", "h"], - value: float, - ) -> None: - """Save move in htable if enabled - - Args: - game (CustomGame): Game - curr_depth (int): current depth - curr_side (Literal[l, h]): Side we are currently on - value (float): value of that game - """ - - if self.use_htable: - self.htable[str(game)][(curr_side, curr_depth)] = value - - def _minmax(self, game: "CustomGame") -> "CompleteMove": - visited_list: list[set[str]] = [set() for _ in range(self.max_depth)] - """Store here the states that i have visited while making a move (used only with pruning_level == 4)""" - - # Store the player that is to move, used in various assertions when I was trying to figure out what was wrong, - # keeping it because it makes sense - whoami = game.get_current_player() - - def moves_getter(game: "CustomGame", depth: int) -> list[tuple["CompleteMove", "CustomGame"]]: - """Utility function used to get the moves when expanding a node, used both in max and min side""" - - self._stats["MOVES-THEORETICAL"] += 44 # length of POSSIBLE_MOVES - - if self.pruning_level == 0: - # Only valid moves - moves = game.valid_moves(None, False, False) - elif self.pruning_level == 1: - # filter the moves that land on a board already covered - moves = game.valid_moves(None, True, False) - else: # both 2 and 3 - # filter the moves that land on a board already covered (using symmetries) - moves = game.valid_moves(None, True, True) - - games = [game.simulate_move(move) for move in moves] - move_n_games = list(zip(moves, games)) - - if self.pruning_level == 3: - # Filter also the boards that we already covered at a lower depth - visited_list[depth].union(set([str(it) for it in games])) - already_visited = set([game for d in range(0, depth) for game in visited_list[d]]) - """Already visited games at a lower depth""" - # For stats purposes - _pre = len(move_n_games) - move_n_games = [it for it in move_n_games if str(it[1]) not in already_visited] - # For stats purposes - _post = len(move_n_games) - self._stats["PRUNING3-DIFF"] += _pre-_post # this is always 0.. Is depth 2 enough for a loop? Maybe 3 might - - self._stats["MOVES-ACTUAL"] += len(move_n_games) - return move_n_games - - def min_side( - self: "MinMaxPlayer", game: "CustomGame", alpha: int, beta: int, depth: int - ) -> int: - assert game.current_player_idx == 1-whoami, "Something went awfully wrong" - - htable_value = self.search_in_htable(game, depth, "l") - if htable_value: - return htable_value - - winner = game.check_winner() - if (self.max_depth is not None and depth >= self.max_depth) or winner != -1: - score = -1 * game.score # We want the score as if I'm the other player (thus *-1) - self.put_in_htable(game, depth, "l", score) - return score - - min_found = np.infty - - for _, copy in moves_getter(game, depth): - min_found = min(min_found, max_side(self, copy, alpha, beta, depth + 1)) - if alpha >= min_found and self.use_alpha_beta_pruning: - break - beta = min(beta, min_found) - - self.put_in_htable(game, depth, "l", min_found) - return min_found - - def max_side( - self: "MinMaxPlayer", game: "CustomGame", alpha: int, beta: int, depth: int - ) -> int: - assert game.current_player_idx == whoami, "Something went awfully wrong" - - htable_value = self.search_in_htable(game, depth, "h") - if htable_value: - return htable_value - - winner = game.check_winner() - if (self.max_depth is not None and depth >= self.max_depth) or winner != -1: - score = game.score - self.put_in_htable(game, depth, "h", score) - return score - - max_found = -np.infty - - for _, copy in moves_getter(game, depth): - max_found = max(max_found, min_side(self, copy, alpha, beta, depth + 1)) - if max_found >= beta and self.use_alpha_beta_pruning: - break - alpha = max(alpha, max_found) - - self.put_in_htable(game, depth, "h", max_found) - return max_found - - # Start MinMax - - best_move = None - alpha, beta = -np.inf, np.inf - - if str(game) in self.history: - self._stats["cache-hit"] += 1 - return self.history[str(game)] - - for move, copy in moves_getter(game, 0): - min_score = min_side(self, copy, alpha, beta, 1) - if min_score > alpha: - alpha = min_score - best_move = move - self._stats["EVALS"] += 1 - self.history[str(game)] = best_move - self.put_in_htable(game, 0, "h", alpha) - return best_move - - - @property - def _avg_time(self): - if self._stats['evals'] == 0: - return 0 - return self._stats['evals-ms'] / self._stats['evals'] - - - @property - def stats(self) -> dict[str, str]: - """Pretty Print relevant stats - - Returns: - dict[str, str]: pretty printed stats - """ - # actual moves performed and total number of moves that would've been performed without any pruning at all (not even lvl 0) - am, thm = self._stats["MOVES-ACTUAL"], self._stats["MOVES-THEORETICAL"] - - pp = { - "Average time per move": f"{self._avg_time:.2f}s", - f"Pruning lvl. {self.pruning_level} discount": f"{(1-(am/thm)):.2%}", - "Total Moves performed": self._stats["evals"] - } - if self._stats["EVAL-invalidmove"] != 0: - # This should never happen, but as we say where I come from "pi na mano..." - # (which very roughly translates to: "better safe than sorry") - pp['Invalid Moves performed'] = self._stats["EVAL-invalidmove"] - if self.use_htable: - hitratio = self._stats["HTABLE-HIT"] / (self._stats['HTABLE-MISS'] + self._stats['HTABLE-HIT']) - # Ratio of games found over all games explored - pp["HashTable HitRatio"] = f"{hitratio:.3%}" - return pp - -if __name__ == "__main__": - try: - from helper import evaluate - except: - from .helper import evaluate - - from pprint import pprint - - mf = MinMaxPlayer(2, pruning=0, htable=False) - evaluate(mf, None, 50, True) - pprint(mf.stats, sort_dicts=False) -``` - -#### Monte Carlo Tree Search Player - -```python -from typing import TYPE_CHECKING, Literal -try: - from game import Game, Move, Player - from custom_game import CustomGame - if TYPE_CHECKING: - from custom_game import CompleteMove -except: - from .game import Game, Move, Player - from .custom_game import CustomGame - if TYPE_CHECKING: - from .custom_game import CompleteMove - -import numpy as np, random -from dataclasses import dataclass, field -from collections import defaultdict -from copy import deepcopy -import time -from tqdm.auto import trange, tqdm - -# implementation inspired from https://github.com/aimacode/aima-python/blob/61d695b37c6895902081da1f37baf645b0d2658a/games4e.py#L178 - -@dataclass -class MCTNode: - """Monte Carlo Tree Node - - Wrapper for a node of the MCTS that contains the utility and count values, parent and children references - """ - - state: "CustomGame" = field() - parent: "MCTNode" = field() - constant_factor: float = field(default=1.4) - utility: int = field(default=0, init=False) - count: int = field(default=0, init=False) - children: dict["CompleteMove", "MCTNode"] = field(default_factory=lambda: dict(), init=False) - - def ucb(self, constant_factor = None): - """Upper Confidence Bound 1 applied to trees - - Args: - constant_factor (float, optional): exploration parameter. Defaults to `sqrt(2)`. - - Returns: - float: `self.utility/self.count + constant_factor * sqrt(log(parent.count)/(self.count))`. If it has never been visited, returns `+inf` - """ - if constant_factor is None: - constant_factor = self.constant_factor - - if self.count == 0: - return float("inf") - return self.utility / self.count + constant_factor * (np.sqrt(np.log(self.parent.count) / self.count)) - -@dataclass -class MCTSPlayer(Player): - """Monte Carlo Tree Search Player - - Disclaimer: - Implementation took insipiration from looking at different sources, such as - - [Artificial Intelligence: a Modern Approach](https://aima.cs.berkeley.edu/) and it's code [here](https://github.com/aimacode/aima-python/blob/61d695b37c6895902081da1f37baf645b0d2658a/games4e.py#L178) - - [Monte Carlo Tree Search - Wikipedia](https://en.wikipedia.org/wiki/Monte_Carlo_tree_search) - """ - - games: int = field(default=500) - """Number of games to play for each move""" - sim_heuristic: bool = field(default=False) - """Whether to use an heuristic when simulating a node. - - If disabled, the simulation is played random - If enabled, it uses the same scoring function used for minmax to determine the best next move - """ - - progress: bool = field(default=False) - """Show progress bar while playing.. used this when I discovered that it could loop while playing using heuristic (see stats.loop and stats.deeploop :'))""" - - _stats: dict[str, int] = field(default_factory=lambda: defaultdict(int), init=False) - """Simple dict used to keep track of basic statistics, see property stats for a prettified version""" - - @property - def short_name(self) -> str: - """Used in graphs pictures""" - return f"MCTS({'H' if self.sim_heuristic else 'R'}, {self.games})" - - @property - def name(self) -> str: - return f"MCTS(games={self.games}, use_heuristic_in_simulation={self.sim_heuristic})" - - def make_move(self, game: Game) -> tuple[tuple[int, int], Move]: - start = time.time() - root_cg = CustomGame.from_game(game) - - root = MCTNode(root_cg, None) - if self.progress: - range_games = trange(self.games, unit="games", leave=False) - else: - range_games = range(self.games) - for _ in range_games: - self.progress and range_games.set_postfix({"phase": "select"}) - leaf = self._select(root) - - self.progress and range_games.set_postfix({"phase": "expand"}) - child = self._expand(leaf) - - self.progress and range_games.set_postfix({"phase": "simulate"}) - score = self._simulate(child) - - self.progress and range_games.set_postfix({"phase": "backprop"}) - self._backpropagate(child, score) - - # The Best Move is the child of the root that has been visited the most - best_move = max(root.children.items(), key=lambda it: it[1].count)[0] - self._stats['evals'] += 1 - - if best_move not in root_cg.valid_moves(None, False, False): - self._stats['eval-invalid'] += 1 - best_move = random.choice(root_cg.valid_moves(None, False, False)) - else: - self._stats['evals-ms'] += time.time()-start - return best_move - - def _select(self, node: "MCTNode") -> "MCTNode": - """Select Phase - Choose the leaf using UCB function""" - if node.children: - return self._select(max(node.children.values(), key=MCTNode.ucb)) - else: - return node - - def _expand(self, node: "MCTNode") -> "MCTNode": - if not node.children or node.state.check_winner() == -1: - # If the node has no children and is not a terminal state, expand all the children - node.children = { - move: MCTNode(node.state.simulate_move(move), node) - for move in node.state.valid_moves(None, False, False) - } - - return self._select(node) - - def _select_move_in_simulation(self, game: "CustomGame", i: int = 0) -> tuple["CompleteMove", "CustomGame"]: - """Move selector in simulation phase - What moves are going to be played? - - Args: - game (CustomGame): Game board - i (int): In case we are in a loop, start getting sub-optimal moves to escape - - Returns: - tuple[CompleteMove, CustomGame]: Move and Game - """ - - if self.sim_heuristic: - # If we are using an heuristic, sort them accordingly to the score of the landing state - moves = game.valid_moves(None, True, True) - games = [game.simulate_move(move) for move in moves] - - mg = zip(moves, games) - score_sorted_move_games = sorted(mg, key=lambda it: it[1].score) - # Start escaping the loop - return score_sorted_move_games[i % len(score_sorted_move_games)] - else: - # Play random - move = random.choice(game.valid_moves(None, False, False)) - return move, game.simulate_move(move) - - def _simulate(self, node: "MCTNode") -> int: - """Simulate Phase - Plays one single game""" - - starting_player = node.state.get_current_player() - - copy = deepcopy(node.state) - winner = copy.check_winner() - - if self.progress: - pbar = tqdm(None, desc="move", leave=False) - - # Used to detect "simple loops" (A and B play always the same move) - last_moves = [None, None] - dup_counter = 0 - - # Used to detect "deep loops" (A and B land on a state that has been visited more than 50 times) - visited: dict[str, int] = defaultdict(int) - - while winner != -1: - curr_player = copy.get_current_player() - - if dup_counter > 40: - # If we are in a simple loop, start playing other moves - move, copy = self._select_move_in_simulation(copy, dup_counter-20) - self._stats["loop-dodged"] += 1 - else: - move, copy = self._select_move_in_simulation(copy) - - if last_moves[curr_player] == move: - dup_counter += 1 - else: - dup_counter = 0 - - - visited[str(copy)] += 1 - - if visited[str(copy)] > 50: - # Deep loop - self._stats["deeploop-dodged"] += 1 - move, copy = self._select_move_in_simulation(copy, visited[str(copy)]-50) - - last_moves[curr_player] = move - - self.progress and pbar.update(1) - self.progress and pbar.set_postfix({"board": str(copy), "move": move}) - - winner = copy.check_winner() - - if winner == starting_player: - # if the child won, the parent must be penalized - return -1 - else: - # otherwise give him a big hug, parents deserve them - return 1 - - def _backpropagate(self, node: "MCTNode", score: Literal['-1', '1']) -> None: - """Backpropagate till the root""" - - if score > 0: - node.utility += score - node.count += 1 - - if node.parent: - self._backpropagate(node.parent, -score) - - - @property - def _avg_time(self): - if self._stats['evals'] == 0: - return 0 - return self._stats['evals-ms'] / self._stats['evals'] - - @property - def stats(self): - """Pretty Printed stats""" - return { - "Average time per move": f"{self._avg_time:.2f}s", - "Total Moves performed": self._stats['evals'], - "Loops Dodged": self._stats['loop-dodged'], - "Deep-Loop Dodged": self._stats['deeploop-dodged'] - } - -if __name__ == "__main__": - from helper import evaluate - from main import RandomPlayer - from pprint import pprint - games_for_evaluation = 10 - mcts_depth = 500 - show_progress = False - ### - mr = MCTSPlayer(mcts_depth, False, show_progress) - print("---\t---") - print(f"MCTS({mcts_depth}) Simulating with random moves") - evaluate(mr, RandomPlayer(), games_for_evaluation, True) - pprint(mr.stats, sort_dicts=False) - mh = MCTSPlayer(mcts_depth, True, show_progress) - print("---\t---") - print(f"MCTS({mcts_depth}) Simulating with heuristic") - evaluate(mh, RandomPlayer(), games_for_evaluation, True) - pprint(mh.stats, sort_dicts=False) -``` diff --git a/report.pdf b/report.pdf deleted file mode 100644 index 74ac465..0000000 Binary files a/report.pdf and /dev/null differ