Skip to content

Commit

Permalink
Merge pull request #347 from nodestream-proj/0.13
Browse files Browse the repository at this point in the history
`0.13` Working Branch
  • Loading branch information
zprobst authored Aug 7, 2024
2 parents a40edcf + dfba1ed commit 4b992f5
Show file tree
Hide file tree
Showing 50 changed files with 8,898 additions and 7,994 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: poetry run pytest --cov nodestream --cov-report term --cov-report xml -m "not e2e"
- name: Run Lints
run: |
poetry run ruff nodestream tests
poetry run ruff check nodestream tests
poetry run black nodestream tests --check
poetry run isort nodestream tests --check-only
- name: Upload coverage reports to Codecov
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ format: venv
.PHONY: lint
lint: venv
poetry run black nodestream tests --check
poetry run ruff nodestream tests
poetry run ruff check nodestream tests

.PHONY: test-unit
test-unit: venv
Expand Down
2 changes: 2 additions & 0 deletions nodestream/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .scaffold import Scaffold
from .show import Show
from .show_migrations import ShowMigrations
from .squash_migrations import SquashMigration

__all__ = (
"AuditCommand",
Expand All @@ -24,4 +25,5 @@
"Scaffold",
"ShowMigrations",
"Show",
"SquashMigration",
)
14 changes: 12 additions & 2 deletions nodestream/cli/commands/run_migrations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from cleo.helpers import option

from ..operations import ExecuteMigrations
from .nodestream_command import NodestreamCommand
from .shared_options import PROJECT_FILE_OPTION, TARGETS_OPTION
Expand All @@ -6,12 +8,20 @@
class RunMigrations(NodestreamCommand):
name = "migrations run"
description = "Execute pending migrations on the specified target."
options = [PROJECT_FILE_OPTION, TARGETS_OPTION]
options = [
PROJECT_FILE_OPTION,
TARGETS_OPTION,
option("all-targets", "a", "Run migrations on all targets", flag=True),
]

async def handle_async(self):
project = self.get_project()
migrations = self.get_migrations()
targets = self.option(TARGETS_OPTION.name)

if self.option("all-targets"):
targets = [target for target in project.targets_by_name]
else:
targets = self.option(TARGETS_OPTION.name)

if len(targets) == 0:
self.info("No targets specified, nothing to do.")
Expand Down
31 changes: 31 additions & 0 deletions nodestream/cli/commands/squash_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from cleo.helpers import option

from ..operations import GenerateSquashedMigration
from .nodestream_command import NodestreamCommand
from .shared_options import PROJECT_FILE_OPTION


class SquashMigration(NodestreamCommand):
name = "migrations squash"
description = "Generate a migration for the current project."
options = [
PROJECT_FILE_OPTION,
option(
"from",
description="The name of the migration to squash from.",
value_required=True,
flag=False,
),
option("to", description="The name of the migration to squash to.", flag=False),
]

async def handle_async(self):
from_migration_name = self.option("from")
to_migration_name = self.option("to")
migrations = self.get_migrations()
operation = GenerateSquashedMigration(
migrations,
from_migration_name,
to_migration_name,
)
await self.run_operation(operation)
2 changes: 2 additions & 0 deletions nodestream/cli/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .execute_migration import ExecuteMigrations
from .generate_migration import GenerateMigration
from .generate_pipeline_scaffold import GeneratePipelineScaffold
from .generate_squashed_migration import GenerateSquashedMigration
from .initialize_logger import InitializeLogger
from .initialize_project import InitializeProject
from .operation import Operation
Expand All @@ -20,6 +21,7 @@
"ExecuteMigrations",
"GenerateMigration",
"GeneratePipelineScaffold",
"GenerateSquashedMigration",
"InitializeLogger",
"InitializeProject",
"Operation",
Expand Down
33 changes: 33 additions & 0 deletions nodestream/cli/operations/generate_squashed_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from ...schema.migrations import ProjectMigrations
from .operation import NodestreamCommand, Operation


class GenerateSquashedMigration(Operation):
def __init__(
self,
migrations: ProjectMigrations,
from_migration_name: str,
to_migration_name: str,
) -> None:
self.migrations = migrations
self.from_migration_name = from_migration_name
self.to_migration_name = to_migration_name

async def perform(self, command: NodestreamCommand):
from_migration = self.migrations.graph.get_migration(self.from_migration_name)
to_migration = (
self.migrations.graph.get_migration(self.to_migration_name)
if self.to_migration_name
else None
)
migration, path = self.migrations.create_squash_between(
from_migration, to_migration
)
command.line(f"Generated squashed migration {migration.name}.")
command.line(
f"The migration contains {len(migration.operations)} schema changes."
)
for operation in migration.operations:
command.line(f" - {operation.describe()}")
command.line(f"Migration written to {path}")
command.line("Run `nodestream migrations run` to apply the migration.")
20 changes: 20 additions & 0 deletions nodestream/cli/operations/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def create_progress_reporter(
callback=indicator.progress_callback,
on_start_callback=indicator.on_start,
on_finish_callback=indicator.on_finish,
on_fatal_error_callback=indicator.on_fatal_error,
)


Expand All @@ -139,8 +140,18 @@ def progress_callback(self, _, __):
def on_finish(self, context: PipelineContext):
pass

def on_fatal_error(self, exception: Exception):
self.command.line(
"<error>Encountered a fatal error while running pipeline</error>"
)
self.command.line(f"<error>{exception}</error>")


class SpinnerProgressIndicator(ProgressIndicator):
def __init__(self, command: NodestreamCommand, pipeline_name: str) -> None:
super().__init__(command, pipeline_name)
self.exception = None

def on_start(self):
self.progress = self.command.progress_indicator()
self.progress.start(f"Running pipeline: '{self.pipeline_name}'")
Expand All @@ -156,3 +167,12 @@ def on_finish(self, context: PipelineContext):
stats = ((k, str(v)) for k, v in context.stats.items())
table = self.command.table(STATS_TABLE_COLS, stats)
table.render()

if self.exception:
raise self.exception

def on_fatal_error(self, exception: Exception):
self.progress.set_message(
"<error>Encountered a fatal error while running pipeline</error>"
)
self.exception = exception
6 changes: 5 additions & 1 deletion nodestream/databases/null.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import AsyncGenerator, Iterable

from ..model import IngestionHook, Node, RelationshipWithNodes, TimeToLiveConfiguration
from ..pipeline.pipeline import empty_async_generator
from ..schema.migrations import Migrator
from ..schema.migrations.operations import Operation
from .copy import TypeRetriever
Expand All @@ -13,6 +12,11 @@
)


async def empty_async_generator():
for i in []:
yield i # pragma: no cover


class NullMigrator(Migrator):
async def execute_operation(self, _: Operation):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,39 @@ def __init__(
additional_indexes: Optional[List[str]] = None,
additional_types: Optional[List[str]] = None,
normalization: Optional[Dict[str, Any]] = None,
properties_normalization: Optional[Dict[str, Any]] = None,
key_normalization: Optional[Dict[str, Any]] = None,
allow_create: bool = True,
):
if normalization and (properties_normalization or key_normalization):
raise ValueError(
"You cannot specify normalization both at the root and at the key/properties level."
)

properties_normalization = properties_normalization or normalization
key_normalization = key_normalization or normalization
self.key_normalization = {
**DEFAULT_NORMALIZATION_ARGUMENTS,
**(key_normalization or {}),
}
self.properties_normalization = properties_normalization or {}
self.node_type = ValueProvider.guarantee_value_provider(node_type)
self.key = PropertyMapping.from_file_data(key or {})
self.properties = PropertyMapping.from_file_data(properties or {})
self.additional_indexes = additional_indexes or []
self.additional_types = tuple(additional_types or [])
self.norm_args = {**DEFAULT_NORMALIZATION_ARGUMENTS, **(normalization or {})}
if allow_create:
self.creation_rule = NodeCreationRule.EAGER
else:
self.creation_rule = NodeCreationRule.MATCH_ONLY

def interpret(self, context: ProviderContext):
normalized_key: PropertySet = PropertySet()
self.key.apply_to(context, normalized_key, self.norm_args)
self.key.apply_to(context, normalized_key, self.key_normalization)
normalized_properties: PropertySet = PropertySet()
self.properties.apply_to(context, normalized_properties, self.norm_args)
self.properties.apply_to(
context, normalized_properties, self.properties_normalization
)

context.desired_ingest.add_source_node(
self.node_type.single_value(context),
Expand Down
Loading

0 comments on commit 4b992f5

Please sign in to comment.