-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharena.py
608 lines (536 loc) · 23.4 KB
/
arena.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
"""
Arena, A game of team survival. The last team
standing scores a point.
A map that uses arena needs to be modified to have a starting area for each
team. A starting area is enclosed and has a gate on it. Each block of a gate
must have the EXACT same color to work properly. Between each rounds, the
gate is rebuilt. The gates are destroyed simultaneously at the start of each
round, releasing the players onto the map. Players are free to switch weapons
between rounds.
Spawn locations and gate locations MUST be present in the map metadata (map
txt file) for arena to work properly.
The spawn location/s for the green team are set by using the data from the
``arena_green_spawns`` tuple in the extensions dictionary. Likewise, the blue
spawn/s is set with the ``arena_blue_spawns`` key. ``arena_green_spawns`` and
``arena_blue_spawns`` are tuples which contain tuples of spawn coordinates.
Spawn locations are chosen randomly.
.. note::
the script retains backwards compatibility With the old
``arena_green_spawn`` and ``arena_blue_spawn``
The ``arena_max_spawn_distance`` can be used to set ``MAX_SPAWN_DISTANCE`` on a
map by map basis. See the comment by ``MAX_SPAWN_DISTANCE`` for more information
The locations of gates is also determined in the map metadata. ``arena_gates``
is a tuple of coordinates in the extension dictionary. Each gate needs only one
block to be specified (since each gate is made of a uniform color)
Sample extensions dictionary of an arena map with two gates: In this example
there is one spawn location for blue and two spawn locations for green::
extensions = { 'arena': True, 'arena_blue_spawns' : ((128, 256, 60),),
'arena_green_spawns' : ((384, 256, 60), (123, 423, 51)), 'arena_gates':
((192, 236, 59), (320, 245, 60)) }
.. codeauthor:: Yourself
"""
import random
import math
from pyspades.contained import BlockAction, SetColor, BlockLine, ExistingPlayer, IntelCapture
from pyspades import world
from pyspades.constants import DESTROY_BLOCK, TEAM_CHANGE_KILL, CTF_MODE
from twisted.internet import reactor
from piqueserver.commands import command, admin
existing_player, intel_capture = ExistingPlayer(), IntelCapture()
# If ALWAYS_ENABLED is False, then the 'arena' key must be set to True in
# the 'extensions' dictionary in the map metadata
ALWAYS_ENABLED = True
# How long should be spent between rounds in arena (seconds)
SPAWN_ZONE_TIME = 13.0
# How many seconds a team color should be shown after they win a round
# Set to 0 to disable this feature.
TEAM_COLOR_TIME = 4.0
# Maximum duration that a round can last. Time is in seconds. Set to 0 to
# disable the time limit
MAX_ROUND_TIME = 180
MAP_CHANGE_DELAY = 35.0
# Coordinates to hide the tent and the intel
HIDE_COORD = (0, 0, 63)
# Max distance a player can be from a spawn while the players are held within
# the gates. If they get outside this they are teleported to a spawn.
# Used to teleport players who glitch through the map back into the spawns.
MAX_SPAWN_DISTANCE = 15.0
BUILDING_ENABLED = False
if MAX_ROUND_TIME >= 60:
MAX_ROUND_TIME_TEXT = '%.2f minutes' % (float(MAX_ROUND_TIME) / 60.0)
else:
MAX_ROUND_TIME_TEXT = str(MAX_ROUND_TIME) + ' seconds'
@command(admin_only=True)
def coord(connection):
connection.get_coord = True
return 'Spade a block to get its coordinate.'
@command(admin_only=True)
def forcestart(connection):
connection.protocol.begin_arena()
return 'Countdown forced'
def make_color(r, g, b, a=255):
r = int(r)
g = int(g)
b = int(b)
a = float(a)
return b | (g << 8) | (r << 16) | (int((a / 255.0) * 128.0) << 24)
# Algorithm for minimizing the number of blocks sent for the gates using
# a block line. Probably won't find the optimal solution for shapes that are not
# rectangular prisms but it's better than nothing.
# d = changing indice
# c1 = first constant indice
# c2 = second constant indice
def partition(points, d, c1, c2):
row = {}
row_list = []
for point in points:
pc1 = point[c1]
pc2 = point[c2]
if pc1 not in row:
row[pc1] = {}
dic1 = row[pc1]
if pc2 not in dic1:
dic1[pc2] = []
row_list.append(dic1[pc2])
dic2 = dic1[pc2]
dic2.append(point)
row_list_sorted = [sorted(div, key=lambda k: k[d]) for div in row_list]
# row_list_sorted is a list containing lists of points that all have the same
# point[c1] and point[c2] values and are sorted in increasing order
# according to point[d]
start_block = None
final_blocks = []
for block_list in row_list_sorted:
counter = 0
for i, block in enumerate(block_list):
counter += 1
if start_block is None:
start_block = block
if i + 1 == len(block_list):
next_block = None
else:
next_block = block_list[i + 1]
# Current AoS version seems to have an upper limit of 65 blocks for
# a block line
if counter == 65 or next_block is None or block[d] + \
1 != next_block[d]:
final_blocks.append([start_block, block])
start_block = None
counter = 0
return final_blocks
def minimize_block_line(points):
x = partition(points, 0, 1, 2)
y = partition(points, 1, 0, 2)
z = partition(points, 2, 0, 1)
xlen = len(x)
ylen = len(y)
zlen = len(z)
if xlen <= ylen and xlen <= zlen:
return x
if ylen <= xlen and ylen <= zlen:
return y
if zlen <= xlen and zlen <= ylen:
return z
return x
def get_team_alive_count(team):
count = 0
for player in team.get_players():
if player.world_object is not None:
if not player.world_object.dead:
count += 1
return count
def get_team_dead(team):
for player in team.get_players():
if player.world_object is not None:
if not player.world_object.dead:
return False
return True
class ArenaException(Exception):
pass
class Gate:
def __init__(self, x, y, z, protocol_obj):
self.support_blocks = []
self.blocks = []
self.protocol_obj = protocol_obj
solid, self.color = self.protocol_obj.map.get_point(x, y, z)
if not solid:
raise ArenaException(
'The gate coordinate (%i, %i, %i) is not solid.' % (x, y, z))
self.record_gate(x, y, z)
self.blocks = minimize_block_line(self.blocks)
def build_gate(self):
map_ = self.protocol_obj.map
block_line = BlockLine()
set_color = SetColor()
set_color.value = make_color(*self.color)
set_color.player_id = block_line.player_id = 32
self.protocol_obj.broadcast_contained(set_color, save=True)
for block_line_ in self.blocks:
start_block, end_block = block_line_
points = world.cube_line(*(start_block + end_block))
if not points:
continue
for point in points:
x, y, z = point
if not map_.get_solid(x, y, z):
map_.set_point(x, y, z, self.color)
block_line.x1, block_line.y1, block_line.z1 = start_block
block_line.x2, block_line.y2, block_line.z2 = end_block
self.protocol_obj.broadcast_contained(block_line, save=True)
def destroy_gate(self):
map_ = self.protocol_obj.map
block_action = BlockAction()
block_action.player_id = 32
block_action.value = DESTROY_BLOCK
for block in self.support_blocks: # optimize wire traffic
if map_.get_solid(*block):
map_.remove_point(*block)
block_action.x, block_action.y, block_action.z = block
self.protocol_obj.broadcast_contained(block_action, save=True)
for block_line_ in self.blocks: # avoid desyncs
start_block, end_block = block_line_
points = world.cube_line(*(start_block + end_block))
if not points:
continue
for point in points:
x, y, z = point
if map_.get_solid(x, y, z):
map_.remove_point(x, y, z)
def record_gate(self, x, y, z):
if x < 0 or x > 511 or y < 0 or x > 511 or z < 0 or z > 63:
return False
solid, color = self.protocol_obj.map.get_point(x, y, z)
if solid:
coordinate = (x, y, z)
if color[0] != self.color[0] or color[1] != self.color[1] or color[2] != self.color[2]:
return True
for block in self.blocks:
if coordinate == block:
return False
self.blocks.append(coordinate)
returns = (self.record_gate(x + 1, y, z),
self.record_gate(x - 1, y, z),
self.record_gate(x, y + 1, z),
self.record_gate(x, y - 1, z),
self.record_gate(x, y, z + 1),
self.record_gate(x, y, z - 1))
if True in returns:
self.support_blocks.append(coordinate)
return False
def send_normal_kill_count(self) -> None:
if self.disconnected:
return
existing_player.player_id = self.player_id
existing_player.team = self.team.id
existing_player.weapon = self.weapon
existing_player.tool = self.tool or 0
existing_player.kills = self.kills
existing_player.color = make_color(*self.color)
existing_player.name = self.name
self.protocol.broadcast_contained(existing_player)
def apply_script(protocol, connection, config):
class ArenaConnection(connection):
get_coord = False
def capture_flag(self):
other_team = self.team.other
flag = other_team.flag
player = flag.player
if player is not self:
return
#self.add_score(10) # 10 points for intel #changed: remove 10 kill bonus for last person standing at server logic
self.team.score += 1
self.on_flag_capture()
reactor.callLater(0.01, send_normal_kill_count, self) #overwrite client side +10 kills on scoreboard
if (self.protocol.max_score not in (0, None) and
self.team.score >= self.protocol.max_score):
self.protocol.reset_game(self)
self.protocol.on_game_end()
else:
intel_capture.player_id = self.player_id
intel_capture.winning = False
self.protocol.broadcast_contained(intel_capture, save=True)
flag = other_team.set_flag()
flag.update()
def on_block_destroy(self, x, y, z, mode):
returned = connection.on_block_destroy(self, x, y, z, mode)
if self.get_coord:
self.get_coord = False
self.send_chat('Coordinate: %i, %i, %i' % (x, y, z))
return False
return returned
def on_disconnect(self):
if self.protocol.arena_running:
if self.world_object is not None and not self.world_object.dead:
self.world_object.dead = True
self.protocol.check_round_end()
return connection.on_disconnect(self)
def on_kill(self, killer, kill_type, grenade):
if self.protocol.arena_running and kill_type != TEAM_CHANGE_KILL:
if self.world_object is not None and not self.world_object.dead:
self.world_object.dead = True
self.protocol.check_round_end(killer)
return connection.on_kill(self, killer, kill_type, grenade)
def on_team_join(self, team):
returned = connection.on_team_join(self, team)
if returned is False:
return False
if self.protocol.arena_running:
if self.world_object is not None and not self.world_object.dead:
self.world_object.dead = True
self.protocol.check_round_end()
return returned
def on_position_update(self):
if not self.protocol.arena_running:
min_distance = None
pos = self.world_object.position
for spawn in self.team.arena_spawns:
xd = spawn[0] - pos.x
yd = spawn[1] - pos.y
zd = spawn[2] - pos.z
distance = math.sqrt(xd ** 2 + yd ** 2 + zd ** 2)
if min_distance is None or distance < min_distance:
min_distance = distance
if min_distance > self.protocol.arena_max_spawn_distance:
self.set_location(random.choice(self.team.arena_spawns))
self.refill()
return connection.on_position_update(self)
def get_respawn_time(self):
if self.protocol.arena_enabled:
if self.protocol.arena_running:
return -1
else:
return 0
return connection.get_respawn_time(self)
def respawn(self):
if self.protocol.arena_running:
return False
return connection.respawn(self)
def on_spawn(self, pos):
returned = connection.on_spawn(self, pos)
if self.protocol.arena_running:
self.kill()
return returned
def on_spawn_location(self, pos):
if self.protocol.arena_enabled:
return random.choice(self.team.arena_spawns)
return connection.on_spawn_location(self, pos)
def on_flag_take(self):
if self.protocol.arena_take_flag:
self.protocol.arena_take_flag = False
return connection.on_flag_take(self)
return False
def on_refill(self):
returned = connection.on_refill(self)
if self.protocol.arena_running:
return False
return returned
class ArenaProtocol(protocol):
game_mode = CTF_MODE
old_respawn_time = None
old_building = None
old_killing = None
arena_enabled = False
arena_running = False
arena_counting_down = False
arena_take_flag = False
arena_countdown_timers = None
arena_limit_timer = None
arena_old_fog_color = None
arena_max_spawn_distance = MAX_SPAWN_DISTANCE
def check_round_end(self, killer=None, message=True):
if not self.arena_running:
return
for team in (self.green_team, self.blue_team):
if get_team_dead(team):
self.arena_win(team.other, killer)
return
if message:
self.arena_remaining_message()
def arena_time_limit(self):
self.arena_limit_timer = None
green_team = self.green_team
blue_team = self.blue_team
green_count = get_team_alive_count(green_team)
blue_count = get_team_alive_count(blue_team)
if green_count > blue_count:
self.arena_win(green_team)
elif green_count < blue_count:
self.arena_win(blue_team)
else:
self.broadcast_chat('Round ends in a tie.')
self.begin_arena_countdown()
def arena_win(self, team, killer=None):
if not self.arena_running:
return
if self.arena_old_fog_color is None and TEAM_COLOR_TIME > 0:
self.arena_old_fog_color = self.fog_color
self.set_fog_color(team.color)
reactor.callLater(TEAM_COLOR_TIME, self.arena_reset_fog_color)
if killer is None or killer.team is not team:
for player in team.get_players():
if player.world_object is not None and not player.world_object.dead:
killer = player
break
if killer is not None:
self.arena_take_flag = True
killer.take_flag()
killer.capture_flag()
self.broadcast_chat(team.name + ' team wins the round!')
self.begin_arena_countdown()
def arena_reset_fog_color(self):
if self.arena_old_fog_color is not None:
# Shitty fix for disco on game end
self.old_fog_color = self.arena_old_fog_color
self.set_fog_color(self.arena_old_fog_color)
self.arena_old_fog_color = None
def arena_remaining_message(self):
if not self.arena_running:
return
green_team = self.green_team
blue_team = self.blue_team
for team in (self.green_team, self.blue_team):
num = get_team_alive_count(team)
team.arena_message = '%i player' % num
if num != 1:
team.arena_message += 's'
team.arena_message += ' on ' + team.name
self.broadcast_chat('%s and %s remain.' %
(green_team.arena_message, blue_team.arena_message))
def on_map_change(self, map_):
for players in self.players.values():
players.kills = 0
extensions = self.map_info.extensions
if ALWAYS_ENABLED:
self.arena_enabled = True
else:
if 'arena' in extensions:
self.arena_enabled = extensions['arena']
else:
self.arena_enabled = False
self.arena_max_spawn_distance = MAX_SPAWN_DISTANCE
if self.arena_enabled:
self.old_respawn_time = self.respawn_time
self.respawn_time = 0
self.old_building = self.building
self.old_killing = self.killing
self.gates = []
if 'arena_gates' in extensions:
for gate in extensions['arena_gates']:
self.gates.append(Gate(*gate, protocol_obj=self))
if 'arena_green_spawns' in extensions:
self.green_team.arena_spawns = extensions[
'arena_green_spawns']
elif 'arena_green_spawn' in extensions:
self.green_team.arena_spawns = (
extensions['arena_green_spawn'],)
else:
raise ArenaException(
'No arena_green_spawns given in map metadata.')
if 'arena_blue_spawns' in extensions:
self.blue_team.arena_spawns = extensions[
'arena_blue_spawns']
elif 'arena_blue_spawn' in extensions:
self.blue_team.arena_spawns = (
extensions['arena_blue_spawn'],)
else:
raise ArenaException(
'No arena_blue_spawns given in map metadata.')
if 'arena_max_spawn_distance' in extensions:
self.arena_max_spawn_distance = extensions[
'arena_max_spawn_distance']
self.delay_arena_countdown(MAP_CHANGE_DELAY)
self.begin_arena_countdown()
else:
# Cleanup after a map change
if self.old_respawn_time is not None:
self.respawn_time = self.old_respawn_time
if self.old_building is not None:
self.building = self.old_building
if self.old_killing is not None:
self.killing = self.old_killing
self.arena_enabled = False
self.arena_running = False
self.arena_counting_down = False
self.arena_limit_timer = None
self.arena_old_fog_color = None
self.old_respawn_time = None
self.old_building = None
self.old_killing = None
return protocol.on_map_change(self, map_)
def build_gates(self):
for gate in self.gates:
gate.build_gate()
def destroy_gates(self):
for gate in self.gates:
gate.destroy_gate()
def arena_spawn(self):
for player in list(self.players.values()):
if player.team.spectator:
continue
if player.world_object is not None and player.world_object.dead:
player.spawn(random.choice(player.team.arena_spawns))
else:
player.set_location(
random.choice(player.team.arena_spawns))
player.refill()
def refill_all(self):
for player in self.players.values():
if player.team.spectator:
continue
player.refill()
def begin_arena_countdown(self):
if self.arena_limit_timer is not None:
if self.arena_limit_timer.cancelled == 0 and self.arena_limit_timer.called == 0:
self.arena_limit_timer.cancel()
self.arena_limit_timer = None
if self.arena_counting_down:
return
self.arena_running = False
self.arena_limit_timer = None
self.arena_counting_down = True
self.killing = False
self.building = False
self.build_gates()
self.arena_spawn()
self.broadcast_chat('The round will begin in %i seconds.' %
SPAWN_ZONE_TIME)
self.arena_countdown_timers = [
reactor.callLater(SPAWN_ZONE_TIME, self.begin_arena)]
for time in range(1, 6):
self.arena_countdown_timers.append(reactor.callLater(
SPAWN_ZONE_TIME - time, self.broadcast_chat, str(time)))
def delay_arena_countdown(self, amount):
if self.arena_counting_down:
for timer in self.arena_countdown_timers:
if timer.cancelled == 0 and timer.called == 0:
timer.delay(amount)
def begin_arena(self):
self.arena_counting_down = False
for team in (self.green_team, self.blue_team):
if team.count() == 0:
self.broadcast_chat(
'Not enough players on the %s team to begin.' %
team.name)
self.begin_arena_countdown()
return
self.arena_running = True
self.killing = True
self.building = BUILDING_ENABLED
self.refill_all()
self.destroy_gates()
self.broadcast_chat('Go!')
if MAX_ROUND_TIME > 0:
self.broadcast_chat(
'There is a time limit of %s for this round.' %
MAX_ROUND_TIME_TEXT)
self.arena_limit_timer = reactor.callLater(
MAX_ROUND_TIME, self.arena_time_limit)
def on_base_spawn(self, x, y, z, base, entity_id):
if not self.arena_enabled:
return protocol.on_base_spawn(self, x, y, z, base, entity_id)
return HIDE_COORD
def on_flag_spawn(self, x, y, z, flag, entity_id):
if not self.arena_enabled:
return protocol.on_base_spawn(self, x, y, z, flag, entity_id)
return HIDE_COORD
return ArenaProtocol, ArenaConnection