-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcoerce_controller.py
264 lines (219 loc) · 9.5 KB
/
coerce_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import datetime
import math
import os
import random
import re
import time
from sqlalchemy.sql import exists
import ircbot.storage as db
from assassins.coerce_models import CoercePlayer, CoercePlayerGame, CoercePartialPoint, CoerceGame
MIN_PLAYERS = 4
BASE_DIR = os.path.dirname(__file__)
##################
# Special Exceptions
##################
class CoerceException(Exception):
def __init__(self, *args, **kwargs):
Exception.__init__(self, *args, **kwargs)
class MissingChannelException(CoerceException):
def __init__(self, *args, **kwargs):
CoerceException.__init__(self, "Could not find a channel to use to load or create a game.")
class PlayerInGameException(CoerceException):
def __init__(self, *args, **kwargs):
CoerceException.__init__(self, "Player is already in the game.")
class PlayerNotInGameException(CoerceException):
def __init__(self, *args, **kwargs):
CoerceException.__init__(self, "Player is not in the game.")
class GameInProgressException(CoerceException):
def __init__(self, *args, **kwargs):
CoerceException.__init__(self, "Game is in progress.")
class NoSuchPlayerException(CoerceException):
def __init__(self, name, *args, **kwargs):
CoerceException.__init__(self, "No player named {}.".format(name))
class NotEnoughPlayersException(CoerceException):
def __init__(self, *args, **kwargs):
CoerceException.__init__(self, "Game requires at least {} players.".format(MIN_PLAYERS))
##################
# Decorators
##################
def load_game(func, argnum=0):
@db.needs_session
def load_game_wrapper(*args, s=None, **kwargs):
if not isinstance(args[argnum], CoerceGame):
args = list(args)
args[argnum] = get_game(args[argnum], s=s)
args = tuple(args)
return func(*args, s=s, **kwargs)
return load_game_wrapper
##################
# Start of main functions
##################
@db.needs_session
def get_game(channel, s=None):
game = s.query(CoerceGame).filter_by(chan = channel).one_or_none()
if not game:
game = CoerceGame(chan = channel)
s.add(game)
return game
@db.needs_session
def get_player(name, s=None):
player = s.query(CoercePlayer).filter(CoercePlayer.name == name).one_or_none()
return player
@db.atomic
@load_game
def handle_join(game, name, print_func=print, s=None):
player = get_player(name, s=s)
if not player:
player = CoercePlayer(name=name)
s.add(player)
if s.query(CoercePlayerGame).filter_by(game = game, player = player).first():
print_func(game.chan, "{}: You are already in the game.".format(name))
raise PlayerInGameException()
s.add(CoercePlayerGame(player=player, game=game))
print_func(game.chan, "{} has joined the game.".format(name))
@db.atomic
@load_game
def handle_quit(game, name, print_func=print, s=None):
player = get_player(name, s=s)
error = None
if not player:
error = NoSuchPlayerException(name)
elif game.state == 'pregame':
if s.query(CoercePlayerGame).filter_by(game = game, player = player).delete() > 0:
print_func(game.chan, "{} has left the game.".format(name))
else:
error = PlayerNotInGameException()
else:
error = GameInProgressException()
if error:
print_func(game.chan, "{}: Cannot quit: {}".format(name, error))
raise PlayerNotInGameException()
@db.atomic
@load_game
def handle_message(game, speaker, msg, s=None):
speaker = get_player(speaker, s=s)
msg = msg.lower()
if game.state == 'running' and speaker:
if s.query(CoercePlayerGame).filter_by(game = game, player = speaker).one_or_none():
for cpg in s.query(CoercePlayerGame).filter(CoercePlayerGame.game == game, CoercePlayerGame.player != speaker, CoercePlayerGame.word != None):
if re.search(cpg.word, msg):
if cpg.target == speaker:
cpg.wins = True
else:
s.add(CoercePartialPoint(coerce_player_game=cpg, player=speaker))
@db.atomic
@load_game
def reset_game(game, s=None):
game.state = 'pregame'
game.start_time = None
for cpg in game.coerce_player_games:
s.query(CoercePartialPoint).filter_by(coerce_player_game=cpg).delete()
cpg.target = None
cpg.word = None
cpg.wins = False
@db.atomic
@load_game
def start_game(game, print_func=print, s=None):
if game.state == 'running':
print_func(game.chan, "Game is already running.")
raise GameInProgressException()
if len(game.coerce_player_games) < MIN_PLAYERS:
print_func(game.chan, "Not enough players. Please wait until at least {} players have joined.".format(MIN_PLAYERS))
raise NotEnoughPlayersException()
print_func(game.chan, "Starting game!")
cpg_list = list(game.coerce_player_games)
random.shuffle(cpg_list)
with open(os.path.join(BASE_DIR, 'word_data/base.txt'), 'r') as base:
word_list = base.read().strip().split('\n')
random.shuffle(word_list)
while cpg_list[0].target == None:
cpg_list[0].target = cpg_list[1].player
cpg_list[0].word = word_list.pop()
cpg_list = cpg_list[1:] + cpg_list[:1]
game.state = 'running'
game.start_time = time.time()
for cpg in game.coerce_player_games:
print_func(cpg.player.name, "Your target is {}. Get them to say {}.".format(cpg.target.name, cpg.word))
print_func(game.chan, "The game has started!")
@load_game
def print_status(game, user=None, print_func=print, s=None):
if game.state == 'pregame':
print_func(game.chan, "The game is not in progress.")
players = s.query(CoercePlayerGame).filter_by(game = game)
player_count = players.count()
players = players.join(CoercePlayerGame.player).order_by(CoercePlayer.name).values(CoercePlayer.name)
players = map(lambda p: p[0], players)
print_func(game.chan, "{} players are signed up for the next game: {}".format(player_count, ", ".join(players)))
elif game.state == 'running':
print_func(game.chan, "The game has been in progress for {}.".format(game_duration(game, s=s)))
players = s.query(CoercePlayerGame).filter_by(game = game).filter(CoercePlayerGame.target != None)
player_count = players.count()
players = players.join(CoercePlayerGame.player).order_by(CoercePlayer.name).values(CoercePlayer.name)
players = map(lambda p: p[0], players)
print_func(game.chan, "{} players are playing: {}".format(player_count, ", ".join(players)))
players = s.query(CoercePlayerGame).filter_by(game = game, target = None)
player_count = players.count()
if player_count > 0:
players = players.join(CoercePlayerGame.player).order_by(CoercePlayer.name).values(CoercePlayer.name)
players = map(lambda p: p[0], players)
print_func(game.chan, "{} players are signed up for the next game: {}".format(player_count, ", ".join(players)))
if user:
player = get_player(user, s=s)
cpg = s.query(CoercePlayerGame).filter_by(player = player, game = game).one_or_none()
if cpg and cpg.target:
target = s.query(CoercePlayer).filter_by(id = cpg.target_id).one()
print_func(player.name, "Your target is {}. Get them to say {}.".format(target.name, cpg.word))
else:
print_func(game.chan, "I have no clue what is going on. Why is the game state {}?".format(game.state))
@db.needs_session
def print_score(channel, user, target, print_func=print, s=None):
player = get_player(target or user, s=s)
if player:
print_func(channel, "{}: {} has {} points.".format(user, player.name, player.score))
else:
print_func(channel, "{}: {} has never registered for Coerce.".format(user, target or user))
@db.needs_session
def print_top(channel, user, number, print_func=print, s=None):
try:
number = int(number or 5)
except ValueError as e:
print_func(channel, "{}: Please give a valid integer.".format(user))
return
query = s.query(CoercePlayer)
if number < 0:
query = query.order_by(CoercePlayer.score.asc()).limit(-number)
else:
query = query.order_by(CoercePlayer.score.desc()).limit(number)
for player in query.all():
print_func(channel, "{} has {} points.".format(player.name, player.score))
@load_game
def check_game_over(game, s=None):
return bool(s.query(CoercePlayerGame).filter_by(wins = True, game = game).first())
@db.atomic
@load_game
def finish_game(game, print_func=print, s=None):
scores = calculate_scores(game, s=s)
for cpg in s.query(CoercePlayerGame).filter_by(wins = True, game = game):
print_func(game.chan, "{} has won by getting {} to say {} and received {} points!".format(cpg.player.name, cpg.target.name, cpg.word, scores[None]))
cpg.player.score += scores[None]
if s.query(CoercePlayerGame).filter_by(game = game, wins = True).count() == 0:
print_func(game.chan, "The game ended for some reason. No one won.")
for cpg in game.coerce_player_games:
cpg.player.score += scores[cpg.player]
print_func(game.chan, "{} received {} for coercing non-targets into saying {}.".format(cpg.player.name, scores[cpg.player], cpg.word))
#TODO: Tell people exactly who said their word.
reset_game(game, s=s)
@load_game
def calculate_scores(game, s=None):
total_partials = s.query(CoercePartialPoint).join(CoercePartialPoint.coerce_player_game).filter(CoercePlayerGame.game == game).count()
total_partials = max(total_partials, 1)
prize_pool = math.ceil(math.sqrt(total_partials))
winner_score = max(prize_pool, 1)
scores = { None: winner_score }
for cpg in game.coerce_player_games:
count = s.query(CoercePartialPoint).filter_by(coerce_player_game = cpg).count()
scores[cpg.player] = min(prize_pool * (count / total_partials), 0.5 * prize_pool)
return scores
@load_game
def game_duration(game, s=None):
return str(datetime.timedelta(seconds=int(time.time() - (game.start_time or 0))))