forked from internetarchive/dweb-transports
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTransportYJS.js
374 lines (338 loc) · 16.8 KB
/
TransportYJS.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
//Note if now that its included by the consumer (nodejs or html) its unclear if this requires a new Y and then Y.extend("y-text",..." but YJS is no longer maintained
/*
This Transport layers builds on the YJS DB and uses IPFS as its transport.
Y Lists have listeners and generate events - see docs at ...
*/
const Url = require('url');
const debugyjs = require('debug')('dweb-transports:yjs');
const canonicaljson = require('@stratumn/canonicaljson');
//const Y = require('yjs/dist/y.js'); // Explicity require of dist/y.js to get around a webpack warning but causes different error in YJS
/* See TransportYJS.requires at bottom
const Y = require('yjs');
require('y-memory')(Y);
require('y-array')(Y);
require('y-text')(Y);
require('y-map')(Y);
require('y-ipfs-connector')(Y);
require('y-indexeddb')(Y);
//require('y-leveldb')(Y); //- can't be there for browser, node seems to find it ok without this, though not sure why..
*/
// Utility packages (ours) And one-liners
function delay(ms, val) { return new Promise(resolve => {setTimeout(() => { resolve(val); },ms)})}
// Other Dweb modules
const { CodingError, ToBeImplementedError } = require('./Errors'); // Standard Dweb Errors
const Transport = require('./Transport.js'); // Base class for TransportXyz
const Transports = require('./Transports'); // Manage all Transports that are loaded
const utils = require('./utils'); // Utility functions
let defaultoptions = {
db: {
name: 'indexeddb', // leveldb in node
},
connector: {
name: 'ipfs',
//ipfs: ipfs, // Need to link IPFS here once created
},
};
class TransportYJS extends Transport {
/*
YJS specific transport - over IPFS, but could probably use other YJS transports
Fields: TODO document this
*/
constructor(options) {
super(options);
this.options = options; // Dictionary of options
this.name = "YJS"; // For console log etc
this.supportURLs = ['yjs'];
this.supportFunctions = ['fetch', 'add', 'list', 'listmonitor', 'newlisturls',
'connection', 'get', 'set', 'getall', 'keys', 'newdatabase', 'newtable', 'monitor']; // Only does list functions, Does not support reverse,
this.supportFeatures = []; // Doesnt support noCache and is mutable
this.setStatus(Transport.STATUS_LOADED);
}
async p__y(url, opts) {
/*
Utility function to get Y for this URL with appropriate options and open a new connection if not already
url: URL string to find list of
opts: Options to add to defaults
resolves: Y
*/
if (!(typeof(url) === "string")) { url = url.href; } // Convert if its a parsed URL
console.assert(url.startsWith("yjs:/yjs/"));
try {
if (this.yarrays[url]) {
//debugyjs("Found Y for %s", url);
return this.yarrays[url];
} else {
let options = Transport.mergeoptions(this.options, {connector: {room: url}}, opts); // Copies options, ipfs will be set already
//debugyjs("Creating Y for %s", url);
return this.yarrays[url] = await Y(options);
}
} catch(err) {
console.error("Failed to initialize Y", err.message);
throw err;
}
}
async p__yarray(url) {
/*
Utility function to get Yarray for this URL and open a new connection if not already
url: URL string to find list of
resolves: Y
*/
return this.p__y(url, { share: {array: "Array"}}); // Copies options, ipfs will be set already
}
async p_connection(url) {
/*
Utility function to get Yarray for this URL and open a new connection if not already
url: URL string to find list of
resolves: Y - a connection to use for get's etc.
*/
return this.p__y(url, { share: {map: "Map"}}); // Copies options, ipfs will be set already
}
//TODO-SPLIT define load()
static setup0(options) {
/*
First part of setup, create obj, add to Transports but dont attempt to connect, typically called instead of p_setup if want to parallelize connections.
*/
let combinedoptions = Transport.mergeoptions(defaultoptions, options.yjs);
debugyjs("YJS options %o", combinedoptions);
let t = new TransportYJS(combinedoptions); // Note doesnt start IPFS or Y
Transports.addtransport(t);
return t;
}
async p_setup2(cb) {
/*
This sets up for Y connections, which are opened each time a resource is listed, added to, or listmonitored.
p_setup2 is defined because IPFS will have started during the p_setup1 phase.
Throws: Error("websocket error") if WiFi off, probably other errors if fails to connect
*/
try {
this.setStatus(Transport.STATUS_STARTING); // Should display, but probably not refreshed in most case
this.options.connector.ipfs = Transports.ipfs().ipfs; // Find an IPFS to use (IPFS's should be starting in p_setup1)
this.yarrays = {};
await this.p_status();
} catch(err) {
console.error(this.name,"failed to start",err);
this.setStatus(Transport.STATUS_FAILED);
}
return this;
}
async p_status() {
/*
Return a string for the status of a transport. No particular format, but keep it short as it will probably be in a small area of the screen.
For YJS, its online if IPFS is.
*/
this.setStatus((await this.options.connector.ipfs.isOnline()) ? Transport.STATUS_CONNECTED : Transport.STATUS_FAILED);
return super.p_status();
}
// ======= LISTS ========
async p_rawlist(url) {
/*
Fetch all the objects in a list, these are identified by the url of the public key used for signing.
(Note this is the 'signedby' parameter of the p_rawadd call, not the 'url' parameter
Returns a promise that resolves to the list.
Each item of the list is a dict: {"url": url, "date": date, "signature": signature, "signedby": signedby}
List items may have other data (e.g. reference ids of underlying transport)
:param string url: String with the url that identifies the list.
:resolve array: An array of objects as stored on the list.
*/
try {
let y = await this.p__yarray(url);
let res = y.share.array.toArray();
// .filter((obj) => (obj.signedby.includes(url))); Cant filter since url is the YJS URL, not the URL of the CL that signed it. (upper layers verify, which filters)
//Logged by Transports
//debugyjs("p_rawlist found %o", res);
return res;
} catch(err) {
//Logged by Transports
// console.log("TransportYJS.p_rawlist failed",err.message);
throw(err);
}
}
listmonitor(url, callback, {current=false}={}) {
/*
Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_rawlist to get any more items not returned by p_rawlist.
:param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
:param callback: function(obj) Callback for each new item added to the list
obj is same format as p_rawlist or p_rawreverse
*/
let y = this.yarrays[typeof url === "string" ? url : url.href];
console.assert(y,"Should always exist before calling listmonitor - async call p__yarray(url) to create");
if (current) {
y.share.array.toArray.map(callback);
}
y.share.array.observe((event) => {
if (event.type === 'insert') { // Currently ignoring deletions.
debugyjs('resources inserted %o', event.values);
//cant filter because url is YJS local, not signer, callback should filter
//event.values.filter((obj) => obj.signedby.includes(url)).map(callback);
event.values.map(callback);
}
})
}
rawreverse() {
/*
Similar to p_rawlist, but return the list item of all the places where the object url has been listed.
The url here corresponds to the "url" parameter of p_rawadd
Returns a promise that resolves to the list.
:param string url: String with the url that identifies the object put on a list.
:resolve array: An array of objects as stored on the list.
*/
//TODO-REVERSE this needs implementing once list structure on IPFS more certain
throw new ToBeImplementedError("Undefined function TransportYJS.rawreverse"); }
async p_rawadd(url, sig) {
/*
Store a new list item, it should be stored so that it can be retrieved either by "signedby" (using p_rawlist) or
by "url" (with p_rawreverse). The underlying transport does not need to guarantee the signature,
an invalid item on a list should be rejected on higher layers.
:param string url: String identifying list to post to
:param Signature sig: Signature object containing at least:
date - date of signing in ISO format,
urls - array of urls for the object being signed
signature - verifiable signature of date+urls
signedby - urls of public key used for the signature
:resolve undefined:
*/
// Logged by Transports
//debugyjs("TransportYJS.p_rawadd %o %o", url.href, sig);
console.assert(url && sig.urls.length && sig.signature && sig.signedby.length, "TransportYJS.p_rawadd args", url, sig);
let value = sig.preflight(Object.assign({}, sig));
let y = await this.p__yarray(url);
y.share.array.push([value]);
}
p_newlisturls(cl) {
let u = cl._publicurls.map(urlstr => Url.parse(urlstr))
.find(parsedurl =>
(parsedurl.protocol === "ipfs" && parsedurl.pathname.includes('/ipfs/'))
|| (parsedurl.protocol === "yjs:"));
if (!u) {
u = `yjs:/yjs/${ cl.keypair.verifyexportmultihashsha256_58() }`; // Pretty random, but means same test will generate same list
}
return [u,u];
}
// ======= KEY VALUE TABLES ========
async p_newdatabase(pubkey) {
//if (pubkey instanceof Dweb.PublicPrivate)
if (pubkey.hasOwnProperty("keypair"))
pubkey = pubkey.keypair.signingexport();
// By this point pubkey should be an export of a public key of form xyz:abc where xyz
// specifies the type of public key (NACL VERIFY being the only kind we expect currently)
let u = `yjs:/yjs/${encodeURIComponent(pubkey)}`;
return {"publicurl": u, "privateurl": u};
}
//TODO maybe change the listmonitor / monitor code for to use "on" and the structure of PP.events
//TODO but note https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy about Proxy which might be suitable, prob not as doesnt map well to lists
async p_newtable(pubkey, table) {
if (!pubkey) throw new CodingError("p_newtable currently requires a pubkey");
let database = await this.p_newdatabase(pubkey);
// If have use cases without a database, then call p_newdatabase first
return { privateurl: `${database.privateurl}/${table}`, publicurl: `${database.publicurl}/${table}`} // No action required to create it
}
async p_set(url, keyvalues, value) { // url = yjs:/yjs/database/table
/*
Set key values
keyvalues: string (key) in which case value should be set there OR
object in which case value is ignored
*/
let y = await this.p_connection(url);
if (typeof keyvalues === "string") {
y.share.map.set(keyvalues, canonicaljson.stringify(value));
} else {
Object.keys(keyvalues).map((key) => y.share.map.set(key, keyvalues[key]));
}
}
_p_get(y, keys) {
if (Array.isArray(keys)) {
return keys.reduce(function(previous, key) {
let val = y.share.map.get(key);
previous[key] = typeof val === "string" ? JSON.parse(val) : val; // Handle undefined
return previous;
}, {});
} else {
let val = y.share.map.get(keys);
return typeof val === "string" ? JSON.parse(val) : val; // Surprisingly this is sync, the p_connection should have synchronised
}
}
async p_get(url, keys) {
return this._p_get(await this.p_connection(url), keys);
}
async p_delete(url, keys) {
let y = await this.p_connection(url);
if (typeof keys === "string") {
y.share.map.delete(keys);
} else {
keys.map((key) => y.share.map.delete(key)); // Surprisingly this is sync, the p_connection should have synchronised
}
}
async p_keys(url) {
let y = await this.p_connection(url);
return y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
}
async p_getall(url) {
let y = await this.p_connection(url);
let keys = y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
return this._p_get(y, keys);
}
async p_rawfetch(url) {
return { // See identical structure in TransportHTTP
table: "keyvaluetable", //TODO-KEYVALUE its unclear if this is the best way, as maybe want to know the real type of table e.g. domain
_map: await this.p_getall(url)
}; // Data struc is ok as SmartDict.p_fetch will pass to KVT constructor
}
async monitor(url, callback, {current=false}={}) {
/*
Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_getall to get any more items not returned by p_getall.
Stack: KVT()|KVT.p_new => KVT.monitor => (a: Transports.monitor => YJS.monitor)(b: dispatchEvent)
:param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
:param callback: function({type, key, value}) Callback for each new item added to the list
:param current: boolean - true if want events for current items on table
*/
url = typeof url === "string" ? url : url.href;
let y = this.yarrays[url];
if (!y) {
throw new CodingError("Should always exist before calling monitor - async call p__yarray(url) to create");
}
if (current) {
// Iterate over existing items with callback
y.share.map.keys()
.forEach(k => {
let val = y.share.map.get[k];
callback({type: "set", key: k, value: (typeof val === "string" ? JSON.parse(val) : val)});
})
}
y.share.map.observe((event) => {
if (['add','update'].includes(event.type)) { // Currently ignoring deletions.
debugyjs("YJS monitor: %o %s %s %o", url, event.type, event.name, event.value);
// ignores event.path (only in observeDeep) and event.object
if (!(event.type === "update" && event.oldValue === event.value)) {
// Dont trigger on update as seeing some loops with p_set
let newevent = {
"type": {"add": "set", "update": "set", "delete": "delete"}[event.type],
"value": JSON.parse(event.value),
"key": event.name,
};
callback(newevent);
}
}
})
}
static async p_test(opts={}) {
{console.log("TransportHTTP.test")}
try {
let transport = await this.p_setup(opts);
console.log("HTTP connected");
let res = await transport.p_info();
console.log("TransportHTTP info=",res);
res = await transport.p_status();
console.assert(res === Transport.STATUS_CONNECTED);
await transport.p_test_kvt("NACL%20VERIFY");
} catch(err) {
console.log("Exception thrown in TransportHTTP.test:", err.message);
throw err;
}
}
}
//TransportYJS.Y = Y; // Allow node tests to find it
Transports._transportclasses["YJS"] = TransportYJS;
TransportYJS.requires = ["yjs", "y-array", "y-memory", "y-text", "y-map", "y-ipfs-connector", "y-indexeddb"]; //~130KB
// If start using YJS again, it needs something like Y = new yjs; Y.extend(y-array, y-memory ... - see yjs/README.md
TransportYJS.scripts = ["yjs/dist/y.js", "y-array/dist/y-array.js", "y-memory/dist/y-memory.js",
"y-text/dist/y-text.js", "y-map/dist/y-map.js", "y-ipfs-connector/dist/y-ipfs-connector", "y-indexeddb/dist/y-indexeddb"]; //~130KB