-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathagent.py
356 lines (315 loc) · 15 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import piece as piece_mod #To avoid name clashes
from fractions import Fraction
import random
from copy import copy
from itertools import combinations
from debug import *
class Agent:
agent_counter = 0
def get_dominations(agents, pieces, residue):
for p in pieces:
if p.allocated != None:
#Make it easier to reference an agent's allocated piece
p.allocated.piece = p
for a1 in agents:
a1.dominations = set([])
for a2 in agents:
#Test if a1 dominates a2
dominates = a1.get_value(a1.piece, count=False) >= a1.get_value(a2.piece, count=False) + a1.get_value(residue, count=False)
if dominates:
a1.dominations.add(a2)
# TODO this is order 2^n. Is there a better way?
def get_dominating_set(agents, pieces, residue):
Agent.get_dominations(agents, pieces, residue)
for n in range(len(agents)-1, 0, -1):
possibilities = combinations(agents, n)
for possibility in possibilities:
dominators = possibility
dominated = [a for a in agents if not a in dominators]
good = True
for d1 in dominators:
for d2 in dominated:
if not d2 in d1.dominations:
good = False
break
if not good:
break
if good:
return (dominators, dominated)
return None
def myrandom(x, percent_zeros=None):
if percent_zeros != None and random.random() < percent_zeros:
assert percent_zeros < 1
return 0
else:
return random.random()
'''
Creates preference functions for valuing and trimming cake, which can appoximate any function of one argument 0 <= x <= 1.
division_count gives the number of homogeneous segments to divide the cake into. A larger number better approximates the
function. The default function to use is a wrapper of random.random() that takes x
'''
def set_adv_from_function(self, division_count, preference_function):
division_values = {}
#Generated evenly spaced values outputted from the function (random by default)
for i in range(1,division_count+1):
x = Fraction(i,division_count)
division_values[x] = Fraction(preference_function(x))
s = sum([ division_values[k] for k in division_values])
if s == 0:
return self.set_adv_from_function(division_count, preference_function)
factor = division_count / s
#Adjusted Division Values
self.adv = {k: division_values[k]*factor for k in division_values}
def value_up_to(self, x):
assert type(x) == Fraction
acc_value = Fraction(0)
keys = sorted(list(self.adv.keys()))
for i in range(len(keys)):
left = 0 if i==0 else keys[i-1]
right = x if x<keys[i] else keys[i]
acc_value += self.adv[keys[i]] * (right - left)
if right == x:
break
assert type(acc_value) == Fraction
return acc_value
'''
Given a slice and a value, the agent must be able to determine where to trim the slice so that it is worth the value. Returns the trim and does not add it to the piece
'''
def get_trim_of_value(self, piece, desired_value, count=True):
assert type(desired_value) == Fraction
assert sorted(piece.intervals) == piece.intervals
assert desired_value >= 0
assert len(piece.intervals) > 0
keys = list(self.adv.keys())
keys.sort()
if count:
self.trim_count += 1
if len(piece.trims) > 0:
return self.get_trim_of_value(piece.get_after_rightmost_trim(), desired_value, count=False)
acc_value = Fraction(0)
trim_at = Fraction(0)
# target_value is the amount to trim OFF of the piece after the rightmost trim
# We agreed that if we incremented trim_count, the get_value should not count
target_value = self.get_value(piece, count=False) - desired_value
if target_value < 0:
return None
for interval in piece.intervals:
value_of_interval = self.value_up_to(interval.right) - self.value_up_to(interval.left)
if acc_value + value_of_interval <= target_value:
acc_value += value_of_interval
trim_at = interval.right
else:
# acc_value + value_of_interval > target_value:
''' Start using preference divisions '''
trim_at = interval.left
for k in filter( lambda k: interval.left < k, keys ):
#TODO don't use value_up_to, use the adv values
if self.value_up_to(k) - self.value_up_to(trim_at) + acc_value <= target_value:
acc_value += self.value_up_to(k) - self.value_up_to(trim_at)
trim_at = k
assert interval.left <= trim_at <= interval.right
else:
trim_at += (target_value - acc_value) / self.adv[k]
assert interval.left <= trim_at <= interval.right
break
break
assert any( [i.left <= trim_at <= i.right for i in piece.intervals] )
''' Because this trim may not be added to the piece, hash the value of a copied piece '''
#TODO don't need a new piece?
new_piece = copy(piece)
new_piece.trims = [piece_mod.Trim(self, trim_at)]
#assert self.get_value(new_piece, count=False) == desired_value
self.cached_values[new_piece.hash_info()] = desired_value
return piece_mod.Trim(self, trim_at)
'''
When created, all an agent has is a function for valuing different slices of cake
'''
def __init__(self, division_count = 10, preference_function=myrandom):
self.set_adv_from_function(division_count, preference_function)
self.number = Agent.agent_counter
self.name = 'Agent '+str(self.number)
Agent.agent_counter += 1
self.trim_count = 0
self.value_count = 0
''' This dictionary stores the cached values of pieces, with hash of piece as keys, and value of piece as value '''
self.cached_values = {}
self.allocated_cake = piece_mod.Piece([])
self.allocated_cake.allocated = self
def __repr__(self):
return self.name
'''
Given a list of slices, the agent must be able to identify their favorite. Ties are broken very intentionally
'''
def choose_piece(self, pieces, current_ranking=None, count=True):
max_value = max([self.get_value(p, count=count) for p in pieces])
possibilities = [p for p in pieces if self.get_value(p) == max_value]
''' Sort primarily by allocated or not, and secondarily by the ranking in the subcore above this one '''
if current_ranking != None and self in current_ranking:
possibilities.sort(key=lambda p: current_ranking[self].index(p))
possibilities.sort(key=lambda p: p.allocated != None)
return possibilities[0]
'''
Generate a ranking of pieces for this agent. More valuable pieces are placed at the left of the list.
The above ranking or lexicography breaks ties.
'''
def get_ranking(self, pieces, above_ranking):
order = pieces[:]
if above_ranking:
''' Break ties by the above ranking '''
order.sort(key=lambda p: above_ranking[self].index(p))
else:
''' Break ties by lexcographic ordering of the pieces (leftmost point on the piece) '''
order.sort(key=lambda p: p.intervals[0].left)
''' Sort by the current value of the piece '''
order.sort(key=lambda p: self.get_value(p), reverse=True)
return order
'''
Given a slice, the agent must be able to assign consistent, proportional value to the slice
'''
def get_value(self, piece, count=True, whole_piece=False, use_cache=True):
if use_cache and piece.hash_info() in self.cached_values and not whole_piece:
return self.cached_values[piece.hash_info()]
if count:
self.value_count +=1
if whole_piece:
cut_piece = piece
else:
cut_piece = piece.get_after_rightmost_trim()
sum_value = 0
for interval in cut_piece.intervals:
sum_value += self.value_up_to(interval.right) - self.value_up_to(interval.left)
#Cache the computed value
if count:
self.cached_values[piece.hash_info()] = sum_value
return sum_value
'''
Recursively cut pieces off the right side
'''
def cut_into_n_pieces_of_equal_value(self, n, piece):
if n <= 1:
return [piece]
total_value = self.get_value(piece)
target_value = total_value / n
assert type(target_value) == Fraction
left_piece = copy(piece)
start_number = left_piece.number+1
pieces = []
for i in range(n-1):
t = self.get_trim_of_value(left_piece, target_value)
assert t.x != 0
left_piece.trims.append(t)
assert left_piece.get_rightmost_trim() == t
left_piece, right_piece = left_piece.split_at_rightmost_trim()
pieces.append(right_piece)
pieces.append(left_piece)
#Cache the left piece's value
self.cached_values[left_piece.hash_info()] = target_value
#Pieces were added in the wrong order, so reverse!
pieces.reverse()
#Assert that all pieces have indeed been hashed (which mostly happens inside the trim function)
for index, p in enumerate(pieces):
p.number = start_number + index
p.name = 'Piece '+str(p.number)
assert p.hash_info() in self.cached_values
return pieces
def fractalize_preferences(self, residue_intervals, subdivisions=2, subdivide_if_below=20, preference_function=myrandom):
#Place fixed points at previous preference sections
fixed_points = list(self.adv.keys())
fixed_points.append(Fraction(0))
# Place fixed points at all cached value intervals
for key in self.cached_values:
for interval in key:
fixed_points.append( interval.left )
fixed_points.append( interval.right )
# Place fixed points at all residue intervals
for interval in residue_intervals:
fixed_points.append( interval.left )
fixed_points.append( interval.right )
fixed_points = sorted(list(set(fixed_points)))
intervals = [piece_mod.Interval(fixed_points[i], fixed_points[i+1]) for i in range(0, len(fixed_points)-1)]
intervals = list(filter(lambda i: any([i.overlaps(r_i) for r_i in residue_intervals]), intervals))
#No two intervals overlap:
assert all([not intervals[i1].overlaps(intervals[i2]) for i1 in range(len(intervals)) for i2 in range(i1+1, len(intervals))])
assert self.value_up_to(Fraction(1)) == 1
#print("splitting", len(intervals), 'intervals into', len(intervals)*subdivisions, 'intervals')
if len(intervals) >= subdivide_if_below:
return
#print(intervals)
i = 0
for x in sorted(list(self.adv.keys()))[:]:
pref_height = self.adv[x]
while intervals[i].left < x:
#print(intervals[i])
# Reset the right side of the larger preference intervals to the left side of what we're fractalizing
if intervals[i].left > 0 and (i==0 or intervals[i-1].right < intervals[i].left) and not intervals[i].left in self.adv:
self.adv[intervals[i].left] = pref_height
pref_width = intervals[i].right - intervals[i].left
pref_area = pref_height * pref_width
new_pref_width = pref_width / Fraction(subdivisions)
accumulated_area = Fraction(0)
for i2 in range( 1, subdivisions+1):
x2 = intervals[i].left + i2 * new_pref_width
new_pref_height = Fraction(preference_function(x2))
accumulated_area += new_pref_height * new_pref_width
self.adv[x2] = new_pref_height
#Now scale the intervals
factor = pref_area / accumulated_area
accumulated_area = Fraction(0)
for i2 in range( 1, subdivisions+1):
x2 = intervals[i].left + i2 * new_pref_width
self.adv[x2] *= factor
accumulated_area += self.adv[x2] * new_pref_width
i += 1
if i >= len(intervals):
assert self.value_up_to(Fraction(1)) == 1
return
if i >= len(intervals):
return
'''
Output the agent's preference function into a string that can be imported as well
'''
def get_preference_string(self):
string = ""
for k in sorted(self.adv.keys()):
f = self.adv[k]
string += str(k.numerator) + ' ' + str(k.denominator) +': ' + str(f.numerator) + ' ' + str(f.denominator) + ', '
return string[:-2] #Remove last comma and space
'''
Import a preference string of the form of that which was outputted
'''
def set_preferences(self, preference_string):
if ':' not in preference_string:
#This is the old style of recording preferences, where they are evenly spaced
fraction_string_list = preference_string.split(',')
self.adv = {}
interval = Fraction(1, len(fraction_string_list))
x = Fraction(0)
for f_s in fraction_string_list:
x += interval
num = int(f_s.split()[0])
den = int(f_s.split()[1])
self.adv[x] = Fraction(num, den)
assert x == Fraction(1,1)
assert sum([self.adv[k] for k in self.adv]) == len(fraction_string_list)
else:
fraction_string_list = preference_string.split(',')
self.adv = {}
for f_s in fraction_string_list:
k,v = f_s.split(': ')
k_num, k_den = map(int, k.split())
v_num, v_den = map(int, v.split())
self.adv[Fraction(k_num, k_den)] = Fraction(v_num, v_den)
# Adjust the values to sum to 1
acc_value = Fraction(0)
keys = sorted(list(self.adv.keys()))
for i in range(len(keys)):
width = keys[i] if i == 0 else keys[i] - keys[i-1]
acc_value += width * self.adv[keys[i]]
for k in keys:
self.adv[k] /= acc_value
# Check that the values sum to 1
acc_value = Fraction(0)
for i in range(len(keys)):
width = keys[i] if i == 0 else keys[i] - keys[i-1]
acc_value += width * self.adv[keys[i]]
assert acc_value == 1