Skip to content

Commit

Permalink
Stop using Future.wait in build_impl.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmorgan committed Mar 4, 2025
1 parent a49bbef commit 4afc864
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 104 deletions.
178 changes: 83 additions & 95 deletions build_runner_core/lib/src/generate/build_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class _SingleBuild {
_buildPhases = buildImpl._buildPhases,
_buildPhasePool = List.generate(
buildImpl._buildPhases.length,
(_) => Pool(buildPhasePoolSize),
(_) => Pool(1),
growable: false,
),
_environment = buildImpl._environment,
Expand Down Expand Up @@ -335,7 +335,7 @@ class _SingleBuild {
},
(e, st) {
if (!done.isCompleted) {
_logger.severe('Unhandled build failure!', e, st);
_logger.severe('Unhandled build failure! $e $st', e, st);
done.complete(BuildResult(BuildStatus.failure, []));
}
},
Expand Down Expand Up @@ -398,34 +398,32 @@ class _SingleBuild {
var phase = _buildPhases[phaseNumber];
var packageNode = _packageGraph[package]!;

await Future.wait(
_assetGraph.outputsForPhase(package, phaseNumber).map((node) async {
if (!shouldBuildForDirs(
node.id,
buildDirs: _buildPaths(_buildDirs),
buildFilters: _buildFilters,
phase: phase,
targetGraph: _targetGraph,
)) {
return;
}
for (final node in _assetGraph.outputsForPhase(package, phaseNumber)) {
if (!shouldBuildForDirs(
node.id,
buildDirs: _buildPaths(_buildDirs),
buildFilters: _buildFilters,
phase: phase,
targetGraph: _targetGraph,
)) {
continue;
}

// Don't build for inputs that aren't visible. This can happen for
// placeholder nodes like `test/$test$` that are added to each package,
// since the test dir is not part of the build for non-root packages.
if (!_targetGraph.isVisibleInBuild(node.id, packageNode)) continue;

// Don't build for inputs that aren't visible. This can happen for
// placeholder nodes like `test/$test$` that are added to each package,
// since the test dir is not part of the build for non-root packages.
if (!_targetGraph.isVisibleInBuild(node.id, packageNode)) return;

var input = _assetGraph.get(node.primaryInput)!;
if (input is GeneratedAssetNode) {
if (input.state != NodeState.upToDate) {
await _runLazyPhaseForInput(input.phaseNumber, input.primaryInput);
}
if (!input.wasOutput) return;
if (input.isFailure) return;
var input = _assetGraph.get(node.primaryInput)!;
if (input is GeneratedAssetNode) {
if (input.state != NodeState.upToDate) {
await _runLazyPhaseForInput(input.phaseNumber, input.primaryInput);
}
ids.add(input.id);
}),
);
if (!input.wasOutput) continue;
if (input.isFailure) continue;
}
ids.add(input.id);
}
return ids;
}

Expand All @@ -439,13 +437,11 @@ class _SingleBuild {
InBuildPhase action,
Iterable<AssetId> primaryInputs,
) async {
var outputLists = await Future.wait(
primaryInputs.map((input) => _runForInput(phaseNumber, action, input)),
);
return outputLists.fold<List<AssetId>>(
<AssetId>[],
(combined, next) => combined..addAll(next),
);
final outputs = <AssetId>[];
for (final input in primaryInputs) {
outputs.addAll(await _runForInput(phaseNumber, action, input));
}
return outputs;
}

/// Lazily runs [phaseNumber] with [input]..
Expand Down Expand Up @@ -616,34 +612,27 @@ class _SingleBuild {
int actionNum,
PostBuildAction action,
) async {
var anchorNodes =
_assetGraph.packageNodes(action.package).where((node) {
if (node is PostProcessAnchorNode && node.actionNumber == actionNum) {
var inputNode = _assetGraph.get(node.primaryInput);
if (inputNode is SourceAssetNode) {
return true;
} else if (inputNode is GeneratedAssetNode) {
return inputNode.wasOutput &&
!inputNode.isFailure &&
inputNode.state == NodeState.upToDate;
}
}
return false;
}).cast<PostProcessAnchorNode>();
var outputLists = await Future.wait(
anchorNodes.map(
(anchorNode) => _runPostProcessBuilderForAnchor(
phaseNum,
actionNum,
action.builder,
anchorNode,
),
),
);
return outputLists.fold<List<AssetId>>(
<AssetId>[],
(combined, next) => combined..addAll(next),
);
final outputs = <AssetId>[];
for (final node in _assetGraph.packageNodes(action.package).toList()) {
if (node is! PostProcessAnchorNode) continue;
if (node.actionNumber != actionNum) continue;
final inputNode = _assetGraph.get(node.primaryInput);
if (inputNode is SourceAssetNode ||
inputNode is GeneratedAssetNode &&
inputNode.wasOutput &&
!inputNode.isFailure &&
inputNode.state == NodeState.upToDate) {
outputs.addAll(
await _runPostProcessBuilderForAnchor(
phaseNum,
actionNum,
action.builder,
node,
),
);
}
}
return outputs;
}

Future<Iterable<AssetId>> _runPostProcessBuilderForAnchor(
Expand Down Expand Up @@ -836,13 +825,14 @@ class _SingleBuild {
/// This should be called after deciding that an asset really needs to be
/// regenerated based on its inputs hash changing. All assets in [outputs]
/// must correspond to a [GeneratedAssetNode].
Future<void> _cleanUpStaleOutputs(Iterable<AssetId> outputs) => Future.wait(
outputs
.map(_assetGraph.get)
.cast<GeneratedAssetNode>()
.where((n) => n.wasOutput)
.map((n) => _delete(n.id)),
);
Future<void> _cleanUpStaleOutputs(Iterable<AssetId> outputs) async {
for (final output in outputs) {
final node = _assetGraph.get(output)!;
if (node is GeneratedAssetNode && node.wasOutput) {
await _delete(output);
}
}
}

Future<void> _buildGlobNode(GlobAssetNode globNode) async {
if (globNode.state == NodeState.upToDate) return;
Expand All @@ -860,9 +850,11 @@ class _SingleBuild {
.where((n) => globNode.glob.matches(n.id.path))
.toList();

await Future.wait(
potentialNodes.whereType<GeneratedAssetNode>().map(_buildAsset),
);
for (final node in potentialNodes) {
if (node is GeneratedAssetNode) {
await _buildAsset(node);
}
}

var actualMatches = <AssetId>[];
for (var node in potentialNodes) {
Expand Down Expand Up @@ -902,29 +894,25 @@ class _SingleBuild {
var builderOptionsNode = _assetGraph.get(builderOptionsId)!;
combine(builderOptionsNode.lastKnownDigest!.bytes as Uint8List);

// Limit the total number of digests we are computing at a time. Otherwise
// this can overload the event queue.
await Future.wait(
ids.map((id) async {
var node = _assetGraph.get(id)!;
if (node is GlobAssetNode) {
await _buildGlobNode(node);
} else if (!await reader.canRead(id)) {
// We want to add something here, a missing/unreadable input should be
// different from no input at all.
//
// This needs to be unique per input so we use the md5 hash of the id.
combine(md5.convert(id.toString().codeUnits).bytes as Uint8List);
return;
} else {
if (node.lastKnownDigest == null) {
await reader.cache.invalidate([id]);
node.lastKnownDigest = await reader.digest(id);
}
for (final id in ids) {
var node = _assetGraph.get(id)!;
if (node is GlobAssetNode) {
await _buildGlobNode(node);
} else if (!await reader.canRead(id)) {
// We want to add something here, a missing/unreadable input should be
// different from no input at all.
//
// This needs to be unique per input so we use the md5 hash of the id.
combine(md5.convert(id.toString().codeUnits).bytes as Uint8List);
continue;
} else {
if (node.lastKnownDigest == null) {
await reader.cache.invalidate([id]);
node.lastKnownDigest = await reader.digest(id);
}
combine(node.lastKnownDigest!.bytes as Uint8List);
}),
);
}
combine(node.lastKnownDigest!.bytes as Uint8List);
}

return Digest(combinedBytes);
}
Expand Down
3 changes: 0 additions & 3 deletions build_runner_core/lib/src/util/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,3 @@ final sdkBin = p.join(sdkPath, 'bin');

/// The path to the sdk on the current platform.
final sdkPath = p.dirname(p.dirname(Platform.resolvedExecutable));

/// The maximum number of concurrent actions to run per build phase.
const buildPhasePoolSize = 16;
11 changes: 5 additions & 6 deletions build_runner_core/test/generate/build_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import 'package:build_runner_core/src/asset_graph/graph.dart';
import 'package:build_runner_core/src/asset_graph/node.dart';
import 'package:build_runner_core/src/generate/options.dart'
show defaultNonRootVisibleAssets;
import 'package:build_runner_core/src/util/constants.dart';
import 'package:glob/glob.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -157,9 +156,9 @@ void main() {
);
});

test('runs a max of 16 concurrent actions per phase', () async {
test('runs a max of one concurrent action per phase', () async {
var assets = <String, String>{};
for (var i = 0; i < buildPhasePoolSize * 2; i++) {
for (var i = 0; i < 2; i++) {
assets['a|web/$i.txt'] = '$i';
}
var concurrentCount = 0;
Expand All @@ -178,8 +177,7 @@ void main() {
concurrentCount,
maxConcurrentCount,
);
if (concurrentCount >= buildPhasePoolSize &&
!reachedMax.isCompleted) {
if (concurrentCount >= 1 && !reachedMax.isCompleted) {
await Future<void>.delayed(
const Duration(milliseconds: 100),
);
Expand All @@ -199,7 +197,7 @@ void main() {
assets,
outputs: {},
);
expect(maxConcurrentCount, buildPhasePoolSize);
expect(maxConcurrentCount, 1);
});

group('with root package inputs', () {
Expand Down Expand Up @@ -813,6 +811,7 @@ void main() {
{'b|lib/b.txt': 'b'},
packageGraph: packageGraph,
outputs: {r'$$b|lib/b.txt.copy': 'b', r'$$b|lib/b.txt.post': 'b'},
onLog: print,
);
});

Expand Down

0 comments on commit 4afc864

Please sign in to comment.