-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclient.da
471 lines (427 loc) · 23.9 KB
/
client.da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import sys, time
from master import Master
from random import randint
from math import ceil
from ProcessMonitor import ProcessMonitor
config(channel is fifo, clock is lamport)
from functools import reduce
'''
GFS client that interacts with the external user and also communicates with
master and chunkserver
'''
class GFSClient(process):
def setup(master:Master, pmon:ProcessMonitor):
output('Setting up client', master)
self.master = master
self.pmon = pmon
self.chunkSize = 10
def getNumChunks(size):
'''
Helper method which returns number of chunks given the size of data
'''
return ceil(float(size)/self.chunkSize)
def exists(fileName):
'''
Helper method to check if a file exisits or not.
'''
res = None
c = logical_clock()
send(('FILE_EXISTS_MASTER', fileName, c), to= master)
await(some(received(('FILE_EXISTS_MASTER_DONE', res, fileName1, c2)),
has= c2 > c and fileName == fileName1))
return res
def receive(msg=('CREATE_MSG', fileName)):
'''
Create API: It takes in a fileName and creates and allocates a storage for this file
on the chunkServers and creates the fileStore mapping for this file. It send error
messages for the following:
1. User is trying to create file which has already been created.
2. In case of any other errors it reports back the error and free up the mapping and
space, if allocated before failure.
'''
try:
send(('START_PROCESS', 'Create'), to= pmon)
if(exists(fileName)):
raise Exception("FAIL1")
output('**************Create File**************** ',fileName)
numChunks = 1
chunkuuids = []
c = logical_clock()
send(('ALLOC_MASTER', True, fileName, numChunks, c), to= master)
await(some(received(('ALLOC_DONE', fileName1, chunkuuids, c2,
chunkServerStore, chunkStore)),has= c2 > c and fileName == fileName1 ))
self.writeChunks(chunkuuids,'',chunkServerStore,chunkStore,fileName)
send(('FINISH_PROCESS', 'Create'), to= pmon)
send(('CREATED',fileName), to=nodeof(self))
send(('CREATE_SUCCESS','File Create Success'), to=nodeof(self))
except Exception as e:
send(('FINISH_PROCESS_FAIL', 'Create'), to= pmon)
if str(e) == "FAIL1":
output("*************File error, file exists already:****************" + fileName)
message = 'File error, file exists already'
send(('ERROR_CREATE',message),to=nodeof(self))
send('File error, file exists already:' + fileName,to=nodeof(self))
else:
output('Error while creating file', str(e))
message = 'Error while creating file'
send(('ERROR_CREATE',message),to=nodeof(self))
send(('ERROR_CREATE',fileName), to = master)
def receive(msg= ('WRITE_MSG', fileName, dataToWrite)):
'''
WRITE API: This API takes in a fileName and dataToWrite and writes the data
to the various chunks as allocated by the Master. It send error messages for the following:
1. User is trying to write to a file which has not been created yet.
2. User is trying to write again to a file to which he has already written.
In this case he should call a write append.
3. In case of any other errors, we delete the whole file by removing the mappings
which had been created and deallocating the space that had been allocated for it.
'''
try:
send(('START_PROCESS', 'Write'), to= pmon)
c = logical_clock()
if not self.exists(fileName):
raise Exception("FAIL1")
send(('ADD_TO_LEASE_Q', fileName, c, 'WRITE'), to= master)
if await(some(received(('CLIENT_TURN_IN_LEASE_Q_WRITE', fileName1, c1)),
has= c1 > c and fileName == fileName1)):
pass
elif timeout(100):
raise Exception("Write taking too long for file:", fileName)
output("Got lease for write")
c = logical_clock()
send(('LATEST_CHUNK_DATA', fileName, c), to= master)
await(some(received(('LATEST_CHUNK_DATA_DONE',currentChunkUuid,currChunkLen,chunkuuids,fileName1,c3)),
has= c3 > c and fileName == fileName1))
if(currChunkLen == 0 and len(chunkuuids) == 1):
numChunks = self.getNumChunks(len(dataToWrite))-1
chunkuuids = [currentChunkUuid]
c = logical_clock()
output('Num chunks', numChunks)
send(('ALLOC_MASTER', False, fileName, numChunks, c), to= master) # send and await message from master server
if await(some(received(('ALLOC_DONE', fileName1, chunkuuids, c2, chunkServerStore, chunkStore)),
has= c2 > c and fileName == fileName1 )):
self.writeChunks(chunkuuids, dataToWrite, chunkServerStore, chunkStore, fileName)
elif timeout(100):
raise Exception("Error while writing file:"+ fileName)
else:
raise Exception("FAIL2")
send(('FINISH_PROCESS', 'Write'), to= pmon)
send(('WRITTEN',fileName), to=nodeof(self))
send(('WRITE_SUCCESS','File Write Success'), to=nodeof(self))
except Exception as e:
send(('FINISH_PROCESS_FAIL', 'Write'), to= pmon)
if str(e) == "FAIL1":
output("**************File error, file doesnt exists:******************" + fileName)
message = 'File error, file doesnt exists'
send(('ERROR_WRITE',message),to=nodeof(self))
elif str(2)== "FAIL2":
output("**************File is already writen, use record append:***************" + fileName)
message = 'File is already writen, use record append'
send(('ERROR_WRITE',message),to=nodeof(self))
else:
output('***********Error while writing file****************', str(e))
message = 'Error writing file'
send(('ERROR_WRITE',message),to=nodeof(self))
send(('ERROR_WRITE',fileName), to = master)
finally:
send(('PROCESS_NEXT_IN_LEASE_Q', fileName), to= master)
def writeChunks(chunkuuids, dataToWrite, chunkServerStore, chunkStore, fileName):
'''
This is a helper method which takes in the allocated chunkuuids, dataToWrite
chunkServerStore, chunkStore, fileName and writes the data to physical servers
by communicating with them. At the end of processing it updates the last chunk
for the file which is utilized in record append and write process.
'''
output('inside write chunks', chunkStore)
chunks = [ dataToWrite[x: x+self.chunkSize] for x in range(0, len(dataToWrite), self.chunkSize) ]
if len(chunks) ==0:
chunks= [""]
for i in range(0, len(chunkuuids)):
chunkloc = chunkStore[chunkuuids[i]]
output(chunkloc, chunkServerStore[chunkloc[0]])
c= logical_clock()
chunkServerLocal = []
chunkLocLength = len(chunkloc)
for j in range(1, chunkLocLength):
chunkServerLocal.append(chunkServerStore[chunkloc[j]])
'''
As per section 3.1 point 3, The client pushes the data to all the replicas.
A client can do so in any order
'''
send(('STORE_DATA', chunkloc[1:], chunkuuids[i], chunks[i],chunkServerLocal, c),
to= chunkServerStore[chunkloc[0]])
'''
As per section 3.1 point 4, Once all the replicas have acknowledged receiving the
data, the client sends a write request to the primary. The request identifies the
data pushed earlier to all of the replicas.
'''
if await(some(received(('STORE_DATA_DONE', c2, chunkuuid2)),
has= c2 > c and chunkuuids[i] == chunkuuid2 )):
send(('WRITE_CHUNK', chunkuuids[i],chunkloc[1:], chunkServerLocal,
c, False), to= chunkServerStore[chunkloc[0]])
elif timeout(10):
raise Exception("Error while writing file:"+ fileName)
if await(some(received(('WRITE_CHUNK_DONE',success, chunkuuid3 , c3)),
has= c3 > c and chunkuuids[i] == chunkuuid3)):
pass
elif timeout(10):
raise Exception("Error while writing file:"+ fileName)
send(('UPDATE_LAST_CHUNK_DATA', fileName, chunkuuids[-1], len(chunks[-1])), to= master)
def receive(msg= ('RECORD_APPEND_MSG', fileName, dataToWrite)):
'''
RECORD APPEND API: This API takes in the fileName, dataToWrite and appends
the data to the end of the file. It send error messages for the following:
1. User is trying to write to a file which has not been created yet.
2. In case of any other errors, we delete the newly allocated chunks by removing the
mappings which had been created and deallocating the space that had been allocated for it.
'''
try:
send(('START_PROCESS', 'RecordAppend'), to= pmon)
if not self.exists(fileName):
raise Exception("FAIL1")
c = logical_clock()
send(('ADD_TO_LEASE_Q', fileName, c, 'APPEND'), to= master)
if await(some(received(('CLIENT_TURN_IN_LEASE_Q_APPEND', fileName1, c3)),
has= c3 > c and fileName == fileName1)):
pass
elif timeout(10):
raise Exception("Record append taking too long:", fileName)
numChunks = self.getNumChunks(len(dataToWrite))
if((numChunks == 1) and (len(dataToWrite) != self.chunkSize)):
output('In append if')
c = logical_clock()
send(('GET_LAST_CHUNK_DATA', fileName, c), to= master)
await(some(received(('GET_LAST_CHUNK_DATA_DONE', fileName1, chunkuuid,
currChunkLen, chunkLocList, chunkServerLocal, c1)),
has= c1 > c and fileName == fileName1))
output('After await', dataToWrite, self.chunkSize, currChunkLen)
'''
As per section 3.3,The primary checks to see if appending the record to the current chunk
would cause the chunkto exceed the maximum size
'''
if(len(dataToWrite) <= (self.chunkSize - currChunkLen)):
addToExistingChunkFile(fileName1, chunkuuid, currChunkLen,
chunkLocList, chunkServerLocal, dataToWrite)
else:
'''
As per section 3.3, If so, it pads the chunkto the maximum size, tells secondaries
to do the same, and replies to the client indicating that the operation should be
retried on the next chunk.
'''
chunkuuids = []
c = logical_clock()
output('IN append else')
send(('ALLOC_APPEND_MASTER', fileName, numChunks, c), to= master)
await(some(received(('ALLOC_APPEND_DONE', fileName1, chunkuuids,
c2, chunkServerStore, chunkStore)),
has= c2 > c and fileName == fileName1 ))
output('After await')
self.writeChunks(chunkuuids, dataToWrite, chunkServerStore, chunkStore, fileName)
else:
'''
As per section 3.3, If so, it pads the chunkto the maximum size, tells secondaries
to do the same, and replies to the client indicating that the operation should be
retried on the next chunk.
'''
chunkuuids = []
c = logical_clock()
output('IN append else')
send(('ALLOC_APPEND_MASTER', fileName, numChunks, c), to= master)
await(some(received(('ALLOC_APPEND_DONE', fileName1, chunkuuids, c2, chunkServerStore, chunkStore)),
has= c2 > c and fileName == fileName1 ))
output('After await')
self.writeChunks(chunkuuids, dataToWrite, chunkServerStore, chunkStore, fileName)
output('DONE WITH RECORD_APPEND ', self)
send(('RECORD_APPEND_DONE',), to=nodeof(self))
send(('FINISH_PROCESS', 'RecordAppend'), to= pmon)
except Exception as e:
send(('FINISH_PROCESS_FAIL', 'RecordAppend'), to= pmon)
if str(e) == "FAIL1":
output("******************File error, file doesnt exist to do record append ************** " + fileName)
message = 'File Doesnt Exist to do record append'
send(('ERROR_APPEND',message),to=nodeof(self))
output("******************File error, file doesnt exist to do record append ************** " + fileName)
else:
output("Error while record append:"+str(e) )
#Cleanup to remove the half written files
send(('ERROR_RECORD_APPEND',fileName), to = master)
finally:
send(('PROCESS_NEXT_IN_LEASE_Q', fileName), to= master)
def addToExistingChunkFile(fileName, chunkuuid, currChunkLen, chunkLocList, chunkServerLocal, dataToWrite):
'''
This is a helper method which is used to append data in case the file system
doesn't require extra chunks to be allocated and it needs to append it to the last chunk
'''
output('inside addToExistingChunkFile', chunkuuid)
'''
As per section 3.3, If the record fits within the maximum size, which is the common case,
the primary appends the data to its replica, tells the secondaries to write the data at
the exact offset where it has, and finally replies success to the client
'''
c = logical_clock()
send(('STORE_DATA', chunkLocList[1:], chunkuuid, dataToWrite, chunkServerLocal[1:], c), to= chunkServerLocal[0])
if await(some(received(('STORE_DATA_DONE', c2, chunkuuid2)),
has= c2 > c and chunkuuid == chunkuuid2 )):
send(('WRITE_CHUNK', chunkuuid,chunkLocList[1:], chunkServerLocal[1:], c, True), to= chunkServerLocal[0])
elif timeout(10):
raise Exception("Record append taking too long to write data:", fileName)
if await(some(received(('WRITE_CHUNK_DONE',success, chunkuuid3 , c3)),
has= c3 > c and chunkuuid == chunkuuid3)):
send(('UPDATE_LAST_CHUNK_DATA', fileName, chunkuuid, len(dataToWrite)+currChunkLen), to= master)
elif timeout(10):
raise Exception("Record append taking too long to write data:", fileName)
def receive(msg= ('READ_MSG',clock, fileName)):
'''
READ API: This API takes in fileName and return the data corresponding to
the current file. It send error messages for the following:
1. User is trying to read a file which does not exist.
2. In case of any other errors it returns back an empty string.
'''
try:
send(('START_PROCESS', 'Read'), to= pmon)
if not self.exists(fileName):
raise Exception("FAIL1")
chunks = []
c = logical_clock()
'''
As per section 2.4, the client typically asks for multiple chunks in the same request(in our case the whole file)
and the master can also include the information for chunks immediately following those requested.
'''
send(('READ_MASTER', fileName,c), to= master)
await(some(received(('READ_MASTER_DONE', fileName1, c2, chunkServerStore,chunkuuids,chunkStore)),
has= c2 > c and fileName == fileName1))
output(chunkuuids)
for chunkuuid in chunkuuids:
c = logical_clock()
chunkloc = chunkStore[chunkuuid]
'''
As per section 2.4, The client then sends a request to one of the replicas,
most likely the closest one(we send to all and await reply from one).
'''
for chunklocation in chunkloc:
send(('READ_CHUNK',chunkuuid,c), to= chunkServerStore[chunklocation])
if await(some(received(('READ_CHUNK_DONE',chunk,c2)),
has= c2 > c )):
pass
elif timeout(10):
raise Exception()
chunks.append(chunk)
dataRead = reduce(lambda x, y: x+y, chunks)
send(('FINISH_PROCESS', 'Read'), to= pmon)
send(('READ_FILE_DONE',dataRead,fileName, clock+1), to=nodeof(self))
except Exception as e:
#send(('FINISH_PROCESS_FAIL', 'Read'), to= pmon)
if str(e) == "FAIL1":
message = 'File Doesnt Exist to read'
output("*****************File error, file does not exists to read ******************" + fileName)
send(('ERROR_READ',message),to=nodeof(self))
output("*****************File error, file does not exists to read ******************" + fileName)
else:
message = 'File read error'
send(('ERROR_READ',message),to=nodeof(self))
output("*********************Error while reading file *************************" + str(e))
send(('READ_FILE_DONE',"",fileName, clock+1), to=nodeof(self))
def receive(msg= ('DELETE', fileName)):
'''
DELETE API: This API takes in a fileName and deletes the file corresponding
the current file. It send error messages for the following:
1. User is trying to delete a file which does not exist.
2. In case of any other errors it logs the error.
'''
try:
send(('START_PROCESS', 'Delete'), to= pmon)
if not self.exists(fileName):
raise Exception("FAIL1")
c = logical_clock()
send(('DELETE_FILE_MASTER', fileName, c), to= master) # send and await message from master server
if await(some(received(('DELETE_FILE_MASTER_DONE', fileName1, c2)),
has= c2 > c and fileName == fileName1)):
output('File ', fileName, ' deleted successfully')
elif timeout(10):
raise Exception("FAIL2")
send(('FINISH_PROCESS', 'Delete'), to= pmon)
send(('DELETE_DONE',fileName,'File Delete Success'), to=nodeof(self))
except Exception as e:
send(('FINISH_PROCESS_FAIL', 'Delete'), to= pmon)
if str(e) == "FAIL1":
message = 'File Doesnt Exist to Delete'
send(('ERROR_DELETE',message),to=nodeof(self))
output("****************************File error, file does not exists to delete************************* " + fileName)
else:
output("Error while deleting file:" + fileName)
send(('ERROR_DELETE',fileName), to=nodeof(self))
def receive(msg= ('CREATE_SNAPSHOT', fileName)):
'''
SNAPSHOT API: This API takes in a fileName and creates a snapshot of it. It updates
the metadata once the snapshot creation has completed succesfully
It send error messages for the following:
1. User is trying to create a snapshot of a file which has not been created yet.
3. In case of any other errors, we log the error.
'''
try:
send(('START_PROCESS', 'CreateSnapshot'), to= pmon)
if not self.exists(fileName):
raise Exception("FAIL1")
c = logical_clock()
send(('ADD_TO_LEASE_Q', fileName, c, 'SNAPSHOT'), to= master)
if await(some(received(('CLIENT_TURN_IN_LEASE_Q_SNAPSHOT', fileName1, c3)),
has= c3 > c and fileName == fileName1)):
pass
elif timeout(10):
raise Exception("Snapshot creation taking too long for file:", fileName)
output("Got lease for Snapshot")
c = logical_clock()
send(('READ_MASTER', fileName,c), to= master)
await(some(received(('READ_MASTER_DONE', fileName1, c2, chunkServerStore,chunkuuids,chunkStore)),
has= c2 > c and fileName == fileName1))
res = True
'''
As per section 3.4, After the leases have been revoked or have expired, the master logs
the operation to disk. It then applies this log record to its in-memory state by duplicating
the metadata for the source file or directory tree. The newly created snapshot files point
to the same chunks as the source files
'''
for chunkuuid in chunkuuids:
chunkloc = chunkStore[chunkuuid]
for chunklocation in chunkloc:
c = logical_clock()
send(('WRITE_CHUNK_SNAPSHOT', chunkuuid,c), to=chunkServerStore[chunklocation])
if await(some(received(('WRITE_CHUNK_SNAPSHOT_DONE',chunkuuid,c2, res)),has= c2 > c )):
pass
elif timeout(10):
raise Exception("SNAPSHOT creation failed for file:", fileName)
if not res:
raise Exception("SNAPSHOT creation failed for file:", fileName)
c = logical_clock()
send(('UPDATE_SNAPSHOT', fileName, c ), to=master)
if await(some(received(('UPDATE_SNAPSHOT_DONE',fileName, c2)),has= c2 > c )):
pass
elif timeout(10):
raise Exception("SNAPSHOT creation failed for file:", fileName)
send(('FINISH_PROCESS', 'CreateSnapshot'), to= pmon)
send(('SNAPSHOT_SUCCESS','Snapshot Create Success'), to=nodeof(self))
except Exception as e:
send(('FINISH_PROCESS_FAIL', 'CreateSnapshot'), to= pmon)
if str(e) == "FAIL1":
message = 'File Doesnt Exist to create Snapshot'
send(('ERROR_SNAPSHOT',message),to=nodeof(self))
output("**********************File error, file does not exists to create Snapshot************************* " + fileName)
else:
output('Error', str(e))
send(('ERROR_SNAPSHOT',str(e)),to=nodeof(self))
finally:
send(('PROCESS_NEXT_IN_LEASE_Q', fileName), to= master)
def receive(msg= ('KILL_CHUNKSERVER',c)):
'''
KILL CHUNKSERVER: This API was developed for testing. It randomly selects
one of the chunkServers, deletes it and returns back.
The main deletes the chunkServer
'''
send(('KILL_CHUNKSERVER', c), to= master)
await(some(received(('KILL_CHUNK', chunkServer, c3)),
has= c3 > c))
send(('DONE',), to =chunkServer)
output("Chunk Server killed", chunkServer)
send(('KILL', chunkServer, c3+1), to = nodeof(self))
def run():
await(received(('DONE',)))