Skip to content

Commit

Permalink
Add possibiility to save flow to FS
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Feb 26, 2025
1 parent d02b457 commit 41d1df4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Add column fs_path to Flow
Revision ID: 93e2705fa8d6
Revises: dd9e0804ebd1
Create Date: 2025-02-25 13:08:11.263504
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import sqlmodel
from sqlalchemy.engine.reflection import Inspector
from langflow.utils import migration


# revision identifiers, used by Alembic.
revision: str = '93e2705fa8d6'
down_revision: Union[str, None] = 'dd9e0804ebd1'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "fs_path" not in column_names:
batch_op.add_column(sa.Column("fs_path", sqlmodel.sql.sqltypes.AutoString(), nullable=True))

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "fs_path" in column_names:
batch_op.drop_column("fs_path")
13 changes: 6 additions & 7 deletions src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ def has_api_terms(word: str):

def remove_api_keys(flow: dict):
"""Remove api keys from flow data."""
if flow.get("data") and flow["data"].get("nodes"):
for node in flow["data"]["nodes"]:
node_data = node.get("data").get("node")
template = node_data.get("template")
for value in template.values():
if isinstance(value, dict) and has_api_terms(value["name"]) and value.get("password"):
value["value"] = None
for node in flow.get("data", {}).get("nodes", []):
node_data = node.get("data").get("node")
template = node_data.get("template")
for value in template.values():
if isinstance(value, dict) and has_api_terms(value["name"]) and value.get("password"):
value["value"] = None

return flow

Expand Down
29 changes: 29 additions & 0 deletions src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from uuid import UUID

import orjson
from aiofile import async_open
from anyio import Path
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
Expand All @@ -20,6 +22,7 @@
from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, remove_api_keys, validate_is_component
from langflow.api.v1.schemas import FlowListCreate
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.logging import logger
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate
from langflow.services.database.models.flow.model import FlowHeader
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow
Expand All @@ -32,13 +35,31 @@
router = APIRouter(prefix="/flows", tags=["Flows"])


async def _verify_fs_path(path: str):
if path:
path_ = Path(path)
if not await path_.exists():
await path_.touch()


async def _save_flow_to_fs(flow: Flow):
if flow.fs_path:
async with async_open(flow.fs_path, "w") as f:
try:
await f.write(flow.model_dump_json())
except OSError:
logger.exception("Failed to write flow %s to path %s", flow.name, flow.fs_path)


async def _new_flow(
*,
session: AsyncSession,
flow: FlowCreate,
user_id: UUID,
):
try:
await _verify_fs_path(flow.fs_path)

"""Create a new flow."""
if flow.user_id is None:
flow.user_id = user_id
Expand Down Expand Up @@ -124,6 +145,9 @@ async def create_flow(
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id)
await session.commit()
await session.refresh(db_flow)

await _save_flow_to_fs(db_flow)

except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
Expand Down Expand Up @@ -283,6 +307,8 @@ async def update_flow(
for key, value in update_data.items():
setattr(db_flow, key, value)

await _verify_fs_path(db_flow.fs_path)

webhook_component = get_webhook_component_in_flow(db_flow.data)
db_flow.webhook = webhook_component is not None
db_flow.updated_at = datetime.now(timezone.utc)
Expand All @@ -296,6 +322,8 @@ async def update_flow(
await session.commit()
await session.refresh(db_flow)

await _save_flow_to_fs(db_flow)

except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
Expand Down Expand Up @@ -381,6 +409,7 @@ async def upload_file(
await session.commit()
for db_flow in response_list:
await session.refresh(db_flow)
await _save_flow_to_fs(db_flow)
except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class Flow(FlowBase, table=True): # type: ignore[call-arg]
tags: list[str] | None = Field(sa_column=Column(JSON), default=[])
locked: bool | None = Field(default=False, nullable=True)
folder_id: UUID | None = Field(default=None, foreign_key="folder.id", nullable=True, index=True)
fs_path: str | None = Field(default=None, nullable=True)
folder: Optional["Folder"] = Relationship(back_populates="flows")
messages: list["MessageTable"] = Relationship(back_populates="flow")
transactions: list["TransactionTable"] = Relationship(back_populates="flow")
Expand All @@ -194,6 +195,7 @@ def to_data(self):
class FlowCreate(FlowBase):
user_id: UUID | None = None
folder_id: UUID | None = None
fs_path: str | None = None


class FlowRead(FlowBase):
Expand Down Expand Up @@ -231,6 +233,7 @@ class FlowUpdate(SQLModel):
folder_id: UUID | None = None
endpoint_name: str | None = None
locked: bool | None = None
fs_path: str | None = None

@field_validator("endpoint_name")
@classmethod
Expand Down

0 comments on commit 41d1df4

Please sign in to comment.