Skip to content

Commit

Permalink
Use detached attribution key (#13991)
Browse files Browse the repository at this point in the history
## Description

Uses detached attribution keys in `MergeTree`'s insert-only attribution
policy. This avoids serializing keys with seq: 0.

Note that we still optimize for size for seq-based attribution keys (by
eliding the object type and serializing it as only the `seq`). Though
it's possible people will create large chunks of content in a detached
state, the optimization to combine adjacent and equivalent attribution
keys will still avoid creating large summaries in this case.

The change mostly adds coverage to various testing layers for this.
  • Loading branch information
Abe27342 authored Feb 6, 2023
1 parent 184913b commit 9215d42
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 108 deletions.
10 changes: 8 additions & 2 deletions packages/dds/merge-tree/src/attributionCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from "@fluidframework/runtime-definitions";
import { AttributionPolicy } from "./mergeTree";
import { Client } from "./client";
import { UnassignedSequenceNumber } from "./constants";
import { UnassignedSequenceNumber, UniversalSequenceNumber } from "./constants";
import {
MergeTreeDeltaCallback,
MergeTreeMaintenanceCallback,
Expand Down Expand Up @@ -281,6 +281,8 @@ export class AttributionCollection implements IAttributionCollection<Attribution
/**
* @alpha
* @returns - An {@link AttributionPolicy} which tracks only insertion of content.
* Content is only attributed at ack time, unless the container is in a detached state.
* Detached content is attributed with a {@link @fluidframework/runtime-definitions#DetachedAttributionKey}.
*/
export function createInsertOnlyAttributionPolicy(): AttributionPolicy {
let unsubscribe: undefined | (() => void);
Expand All @@ -297,8 +299,12 @@ export function createInsertOnlyAttributionPolicy(): AttributionPolicy {

for (const { segment } of deltaSegments) {
if (segment.seq !== undefined && segment.seq !== UnassignedSequenceNumber) {
const key: AttributionKey =
segment.seq === UniversalSequenceNumber
? { type: "detached", id: 0 }
: { type: "op", seq: segment.seq };
segment.attribution ??= new AttributionCollection(
{ type: "op", seq: segment.seq },
key,
segment.cachedLength,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ describe("AttributionCollection", () => {
},
segments: [seg(3), seg(4)],
},
{
name: "detached attribution keys",
blob: {
length: 7,
posBreakpoints: [0, 3],
seqs: [1, { type: "detached", id: 0 }],
},
segments: [seg(3), seg(4)],
},
];

for (const { name, blob, segments } of testCases) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { strict as assert } from "assert";
import { createInsertOnlyAttributionPolicy } from "../attributionCollection";
import { TestClient } from "./testClient";

const localUserLongId = "localUser";
describe("createInsertOnlyAttributionPolicy", () => {
let client: TestClient;
let seq = 0;
beforeEach(() => {
client = new TestClient({
attribution: {
track: true,
policyFactory: createInsertOnlyAttributionPolicy,
},
});
seq = 0;
});

it("Attributes content on insert", () => {
client.startOrUpdateCollaboration(localUserLongId);
client.applyMsg(client.makeOpMessage(client.insertTextLocal(0, "ABC"), ++seq, seq - 1));
assert.deepEqual(client.getAllAttributionSeqs(), [1, 1, 1]);
});

it("Attributes content inserted before starting collaboration with a detached key", () => {
client.insertTextLocal(0, "C");
client.startOrUpdateCollaboration(localUserLongId);
client.applyMsg(client.makeOpMessage(client.insertTextLocal(0, "AB"), ++seq, seq - 1));
assert.deepEqual(client.getAllAttributionSeqs(), [1, 1, { type: "detached", id: 0 }]);
});
});
169 changes: 91 additions & 78 deletions packages/dds/merge-tree/src/test/snapshot.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { SnapshotV1 } from "../snapshotV1";
import { IMergeTreeOptions } from "../mergeTree";
import { createInsertOnlyAttributionPolicy } from "../attributionCollection";
import { TestSerializer } from "./testSerializer";
import { createClientsAtInitialState } from "./testClientLogger";
import { ISegment, TestClient } from ".";

// Reconstitutes a MergeTree client from a summary
Expand Down Expand Up @@ -41,8 +42,12 @@ class TestString {
private seq = 0;
private minSeq = 0;

constructor(id: string, private readonly options?: IMergeTreeOptions) {
this.client = new TestClient(options);
constructor(
id: string,
private readonly options?: IMergeTreeOptions,
initialState: string = "",
) {
this.client = createClientsAtInitialState({ initialState, options }, id)[id];
this.client.startOrUpdateCollaboration(id);
}

Expand Down Expand Up @@ -148,100 +153,108 @@ class TestString {
}

function makeSnapshotSuite(options?: IMergeTreeOptions): void {
let str: TestString;

beforeEach(() => {
str = new TestString("fakeId", options);
});
describe("from an empty initial state", () => {
let str: TestString;
beforeEach(() => {
str = new TestString("fakeId", options);
});

afterEach(async () => {
// Paranoid check that ensures `str` roundtrips through snapshot/load. This helps to catch
// bugs that might be missed if the test case forgets to call/await `str.expect()`.
await str.checkSnapshot({
attribution: { policyFactory: createInsertOnlyAttributionPolicy },
afterEach(async () => {
// Paranoid check that ensures `str` roundtrips through snapshot/load. This helps to catch
// bugs that might be missed if the test case forgets to call/await `str.expect()`.
await str.checkSnapshot({
attribution: { policyFactory: createInsertOnlyAttributionPolicy },
});
});
});

it("excludes un-acked segments", async () => {
str.append("0", /* increaseMsn: */ false);
it("excludes un-acked segments", async () => {
str.append("0", /* increaseMsn: */ false);

// Invoke `load/getSnapshot()` directly instead of `str.expect()` to avoid ACKing the
// pending insert op.
const client2 = await loadSnapshot(str.getSummary(), options);
// Invoke `load/getSnapshot()` directly instead of `str.expect()` to avoid ACKing the
// pending insert op.
const client2 = await loadSnapshot(str.getSummary(), options);

// Original client has inserted text, but the one loaded from the snapshot should be empty.
// This is because un-ACKed ops are not included in snapshots. Instead, these ops are
// retransmitted and applied after the snapshot has loaded.
assert.equal(str.getText(), "0");
assert.equal(client2.getText(), "");
});
// Original client has inserted text, but the one loaded from the snapshot should be empty.
// This is because un-ACKed ops are not included in snapshots. Instead, these ops are
// retransmitted and applied after the snapshot has loaded.
assert.equal(str.getText(), "0");
assert.equal(client2.getText(), "");
});

it("includes segments below MSN", async () => {
str.append("0", /* increaseMsn: */ true);
await str.expect("0");
});
it("includes segments below MSN", async () => {
str.append("0", /* increaseMsn: */ true);
await str.expect("0");
});

it("includes ACKed segments above the MSN", async () => {
str.append("0", /* increaseMsn: */ false);
await str.expect("0");
});
it("includes ACKed segments above the MSN", async () => {
str.append("0", /* increaseMsn: */ false);
await str.expect("0");
});

it("includes removals of segments above the MSN", async () => {
str.append("0x", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
});
it("includes removals of segments above the MSN", async () => {
str.append("0x", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
});

it("includes removals above the MSN of segments below the MSN", async () => {
str.append("0x", /* increaseMsn: */ true);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
});
it("includes removals above the MSN of segments below the MSN", async () => {
str.append("0x", /* increaseMsn: */ true);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
});

it("can insert segments after loading removed segment", async () => {
str.append("0x", /* increaseMsn: */ true);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
str.append("1", /* increaseMsn: */ false);
await str.expect("01");
});
it("can insert segments after loading removed segment", async () => {
str.append("0x", /* increaseMsn: */ true);
str.removeRange(1, 2, /* increaseMsn: */ false);
await str.expect("0");
str.append("1", /* increaseMsn: */ false);
await str.expect("01");
});

it("can insert segments relative to removed segment", async () => {
str.append("0x", /* increaseMsn: */ false);
str.append("2", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);
str.insert(1, "1", /* increaseMsn: */ false);
str.append("3", /* increaseMsn: */ false);
await str.expect("0123");
});
it("can insert segments relative to removed segment", async () => {
str.append("0x", /* increaseMsn: */ false);
str.append("2", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);
str.insert(1, "1", /* increaseMsn: */ false);
str.append("3", /* increaseMsn: */ false);
await str.expect("0123");
});

it("can insert segments relative to removed segment loaded from snapshot", async () => {
str.append("0x", /* increaseMsn: */ false);
str.append("2", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);
it("can insert segments relative to removed segment loaded from snapshot", async () => {
str.append("0x", /* increaseMsn: */ false);
str.append("2", /* increaseMsn: */ false);
str.removeRange(1, 2, /* increaseMsn: */ false);

// Note that calling str.expect() switches the underlying client to the one loaded from the snapshot.
await str.expect("02");
// Note that calling str.expect() switches the underlying client to the one loaded from the snapshot.
await str.expect("02");

str.insert(1, "1", /* increaseMsn: */ false);
str.append("3", /* increaseMsn: */ false);
await str.expect("0123");
});
str.insert(1, "1", /* increaseMsn: */ false);
str.append("3", /* increaseMsn: */ false);
await str.expect("0123");
});

it("includes ACKed segments below MSN in body", async () => {
for (let i = 0; i < SnapshotV1.chunkSize + 10; i++) {
str.append(`${i % 10}`, /* increaseMsn: */ true);
}
it("includes ACKed segments below MSN in body", async () => {
for (let i = 0; i < SnapshotV1.chunkSize + 10; i++) {
str.append(`${i % 10}`, /* increaseMsn: */ true);
}

await str.checkSnapshot();
});
await str.checkSnapshot();
});

it("includes ACKed segments above MSN in body", async () => {
for (let i = 0; i < SnapshotV1.chunkSize + 10; i++) {
str.append(`${i % 10}`, /* increaseMsn: */ false);
}
it("includes ACKed segments above MSN in body", async () => {
for (let i = 0; i < SnapshotV1.chunkSize + 10; i++) {
str.append(`${i % 10}`, /* increaseMsn: */ false);
}

await str.checkSnapshot();
await str.checkSnapshot();
});
});

describe("from a non-empty initial state", () => {
it("includes segments submitted while detached", async () => {
const str = new TestString("A", options, "starting text");
await str.expect("starting text");
});
});
}

Expand Down
Loading

0 comments on commit 9215d42

Please sign in to comment.