Skip to content

Commit

Permalink
Bump major version
Browse files Browse the repository at this point in the history
- Added functionality to run commands and set environment variables
- Using client.disconnect removes the client from the server
- Introduced `CILLOW_DISABLE_AUTO_INSTALL` environment variable to
  disable auto installation of import packages
- Resolved process cleanup issue in docker containers
- Added tests for server utilities
- Updated documentation
  • Loading branch information
synacktraa committed Jan 14, 2025
1 parent 083035f commit cc29351
Show file tree
Hide file tree
Showing 19 changed files with 669 additions and 149 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Cillow is an open-source library that enables you to execute AI-generated code i

It offers key features such as:

- **Environment Switching**: Easily switch between multiple python environments.
- **Automated Package Management**: Automatic installation of imported packages through `uv` or `pip`.
- **Functionality Patches**: Apply patches to limit the scope of AI-generated code, capture outputs like `stdout`, `stderr`, images, plots, etc., and more.
- **Environment Switching**: Effortlessly switch between multiple Python environments.
- **Automated Package Installation**: Automatically install imported packages using `uv` or `pip`.
- **Functionality Patches**: Apply patches to restrict the scope of AI-generated code, capture outputs such as `stdout`, `stderr`, images, plots, and more.

### Check Documentation

Expand Down Expand Up @@ -78,6 +78,6 @@ img.show()

---

At the moment, Cillow only supports Python since it doesn't use Jupyter Kernel.
At the moment, Cillow only supports Python, as it does not rely on Jupyter Kernel/Lab.

This project began as an exploration of [E2B](https://e2b.dev/)'s code interpreter. I implemented the Python interpreter from scratch using ZeroMQ, taking a different approach by adding features like environment switching and functionality patching. Seeing the potential in this project, I evolved it into a client-server architecture using threading and multiprocessing.
This project began as an exploration of [E2B](https://e2b.dev/)'s code interpreter. I implemented the Python interpreter from scratch, taking a different approach by adding features like environment switching and functionality patching. Recognizing the potential of the project, I expanded it into a client-server architecture using ZeroMQ, threading, and multiprocessing.
130 changes: 99 additions & 31 deletions cillow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
Disconnect,
ExceptionInfo,
Execution,
GetEnvrionment,
GetPythonEnvironment,
InstallRequirements,
ModifyInterpreter,
PythonEnvironment,
Result,
RunCode,
RunCommand,
SetEnvironmentVariables,
Stream,
)

Expand Down Expand Up @@ -52,9 +54,16 @@ class Client:
...
... img.show()
... \"\"\")
>>> client.switch_interpreter("/path/to/python/env") # Switch to interpreter process with given environment
>>> client.delete_interpreter("/path/to/python/env") # Stop interpreter process running in given environment
>>> client.install_requirements(["pkg-name1", "pkg-name2"]) # Install requirements in the current interpreter process
>>> # Switch to interpreter process with given environment
>>> client.switch_interpreter("/path/to/python/env")
>>> # Stop interpreter process running in given environment
>>> client.delete_interpreter("/path/to/python/env")
>>> # Install requirements in the current selected environment
>>> client.install_requirements("pkg-name1", "pkg-name2")
>>> # Run commands
>>> client.run_command("echo", "Hello World")
>>> # Set environment variables
>>> client.set_environment_variables({"VAR1": "value1", "VAR2": "value2"})
"""

def __init__(
Expand Down Expand Up @@ -82,13 +91,19 @@ def __init__(
self.__id = id
self.__timeout: int | None = None
self.__current_environment: PythonEnvironment | None = None
self.__default_environment: PythonEnvironment | None = None

self.switch_interpreter(environment)

# fmt: off
@classmethod
def new(
cls, host: str | None = None, port: int | None = None, environment: PythonEnvironment | str = "$system"
cls,
host: str | None = None,
port: int | None = None,
environment: PythonEnvironment | str = "$system"
) -> Client:
# fmt: on
"""
Connect to the server as a new client.
Expand All @@ -104,29 +119,36 @@ def __enter__(self) -> Client:

@property
def id(self) -> str:
"""Identifier of the client."""
"""Client's identifier."""
return self.__id

@property
def timeout(self) -> int | None:
def request_timeout(self) -> int | None:
"""Timeout for request in milliseconds."""
return self.__timeout

@timeout.setter
def timeout(self, value: int) -> None:
@request_timeout.setter
def request_timeout(self, value: int) -> None:
self.__timeout = value

@property
def default_environment(self) -> PythonEnvironment:
"""Default Python environment."""
if self.__default_environment is None:
self.__default_environment = self._get_return_value(GetPythonEnvironment(type="default"))
return self.__default_environment

@property
def current_environment(self) -> PythonEnvironment:
"""Current Python environment"""
"""Current interpreter's python environment."""
if self.__current_environment is None:
self.__current_environment = self._get_return_value(GetEnvrionment(environment_type="current"))
self.__current_environment = self._get_return_value(GetPythonEnvironment(type="current"))
return self.__current_environment

@property
def all_environments(self) -> list[PythonEnvironment]:
"""All running Python environments"""
return self._get_return_value(GetEnvrionment(environment_type="all")) # type: ignore[no-any-return]
"""All running interpreter's python environments."""
return self._get_return_value(GetPythonEnvironment(type="all")) # type: ignore[no-any-return]

def _send_request(self, request_dataclass: Any) -> Generator[tuple[bytes, bytes], None, bytes]:
"""
Expand Down Expand Up @@ -167,40 +189,82 @@ def _get_return_value(self, request_dataclass: Any) -> Any:

def switch_interpreter(self, environment: PythonEnvironment | str) -> None:
"""
Switch to interpreter associated with the given Python environment.
Switch to specified python environment's interpreter process.
Creates a new interpreter process if it doesn't exists.
Creates a new interpreter process if it is not already running.
Args:
environment: The Python environment to use
"""
self.__current_environment = self._get_return_value(ModifyInterpreter(environment=environment, mode="switch"))
self.__current_environment = self._get_return_value(ModifyInterpreter(environment, mode="switch"))

def delete_interpreter(self, environment: PythonEnvironment | str) -> None:
"""
Delete the interpreter associated with the given Python environment.
After deletion, the current environment is set to `$system`.
Stop the specified python environment's interpreter process.
Switches to default python environment's interpreter process.
Args:
environment: The Python environment to use
"""
self.__current_environment = self._get_return_value(ModifyInterpreter(environment=environment, mode="delete"))
self.__current_environment = self._get_return_value(ModifyInterpreter(environment, mode="delete"))

def install_requirements(self, requirements: list[str], on_stream: Callable[[Stream], None] | None = None) -> None:
def set_environment_variables(self, environment_variables: dict[str, str]) -> None:
"""
Set environment variables for the current interpreter.
Args:
environment_variables: The environment variables to set
"""
for _ in self._send_request(SetEnvironmentVariables(environment_variables)):
...

def run_command(self, *cmd: str, on_stream: Callable[[Stream], None] | None = None) -> None:
"""
Run the given command.
⚠️ WARNING: This class allows execution of system commands and should be used with EXTREME CAUTION.
- Never run commands with user-supplied or untrusted input
- Always validate and sanitize any command arguments
- Be aware of potential security risks, especially with privilege escalation
Args:
cmd: The command to run
on_stream: The callback to capture streaming output.
"""
on_stream = on_stream or default_stream_processor
for msg_type, body in self._send_request(RunCommand(cmd=cmd)):
if msg_type != b"interpreter":
continue

on_stream(pickle.loads(body))

# fmt: off
def install_requirements(
self, *requirements: str, on_stream: Callable[[Stream], None] | None = None
) -> None:
# fmt: on
"""
Install the given requirements in the current Python environment.
Args:
requirements: The requirements to install
"""
on_stream = on_stream or default_stream_processor
for msg_type, body in self._send_request(InstallRequirements(requirements=requirements)):
for msg_type, body in self._send_request(InstallRequirements(requirements)):
if msg_type != b"interpreter":
continue

on_stream(pickle.loads(body))

def run_code(self, code: str, on_stream: Callable[[Stream | ByteStream], None] | None = None) -> Execution:
# fmt: off
def run_code(
self,
code: str,
on_stream: Callable[[Stream | ByteStream], None] | None = None
) -> Execution:
# fmt: on
"""
Run the code in the current selected interpreter.
Expand All @@ -212,19 +276,18 @@ def run_code(self, code: str, on_stream: Callable[[Stream | ByteStream], None] |
The execution result containing the result, streams, byte streams and exception.
"""
on_stream = on_stream or default_stream_processor
streams, byte_streams = [], [] # type: ignore[var-annotated]
result, streams, byte_streams, exception = Result(value=None), [], [], None
for msg_type, body in self._send_request(RunCode(code=code)):
if msg_type != b"interpreter":
continue

response = pickle.loads(body)
if isinstance(response, Result):
return Execution(result=response, streams=streams, byte_streams=byte_streams)

result = response
continue
elif isinstance(response, ExceptionInfo):
return Execution(
result=Result(value=None), streams=streams, byte_streams=byte_streams, exception=response
)
exception = response
continue

if isinstance(response, Stream):
streams.append(response)
Expand All @@ -233,14 +296,19 @@ def run_code(self, code: str, on_stream: Callable[[Stream | ByteStream], None] |

on_stream(response)

return Execution( # This should never happen
result=Result(value=None), streams=streams, byte_streams=byte_streams, exception=None
return Execution(
result=result, streams=streams, byte_streams=byte_streams, exception=exception
)

def disconnect(self) -> None:
"""Close the connection to the server and clean up all the resources being used by the client."""
"""
Close the connection to the server and remove the client.
Don't use this if you want to reconnect to the server later.
"""
for _ in self._send_request(Disconnect()):
...

self._socket.close()
self._socket.context.term()

Expand Down
48 changes: 39 additions & 9 deletions cillow/interpreter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import sys
from shutil import which as find_executable
from tempfile import NamedTemporaryFile
Expand All @@ -16,7 +17,7 @@
__all__ = ("Interpreter",)


PIP_INSTALL_CMD = ["uv", "pip", "install"] if find_executable("uv") else ["pip", "install"]
PIP_INSTALL_CMD = ("uv", "pip", "install") if find_executable("uv") else ("pip", "install")


class Interpreter:
Expand Down Expand Up @@ -74,7 +75,33 @@ def environment(self) -> PythonEnvironment:
"""The current Python environment"""
return getattr(self._import_hook, "environment", "$system")

def install_requirements(self, requirements: list[str], on_stream: Callable[[Stream], None] | None = None) -> None:
# fmt: off
def run_command(
self, *cmd: str, on_stream: Callable[[Stream], Any] | None = None
) -> None:
# fmt: on
"""
Run the given command.
⚠️ WARNING: This class allows execution of system commands and should be used with EXTREME CAUTION.
- Never run commands with user-supplied or untrusted input
- Always validate and sanitize any command arguments
- Be aware of potential security risks, especially with privilege escalation
Args:
cmd: The command to run
on_stream: The callback to capture streaming output.
"""
on_stream = on_stream or default_stream_processor
for line in shell.stream(*cmd):
on_stream(Stream(type="cmd_exec", data=line))

# fmt: off
def install_requirements(
self, *requirements: str, on_stream: Callable[[Stream], None] | None = None
) -> None:
# fmt: on
"""
Install the given requirements.
Expand All @@ -93,9 +120,7 @@ def install_requirements(self, requirements: list[str], on_stream: Callable[[Str
handler.flush()
install_args.extend(["-r", handler.name])

on_stream = on_stream or default_stream_processor
for line in shell.stream(*PIP_INSTALL_CMD, *install_args):
on_stream(Stream(type="cmd_exec", data=line))
self.run_command(*PIP_INSTALL_CMD, *install_args, on_stream=on_stream)

def run_code(
self, code: str, on_stream: Callable[[Stream | ByteStream], None] | None = None
Expand All @@ -115,12 +140,12 @@ def run_code(
except Exception as exc:
return ExceptionInfo(type=exc.__class__.__name__, message=str(exc))

on_stream = on_stream or default_stream_processor # TODO: create a function that can process byte stream
if module_names := code_meta.module_names:
on_stream = on_stream or default_stream_processor
if not is_auto_install_disabled() and (module_names := code_meta.module_names):
to_install = (module_names - sys.stdlib_module_names) - get_installed_modules()
if to_install:
packages = [MODULE_TO_PACKAGE_MAP.get(name, name) for name in to_install]
self.install_requirements(packages, on_stream=on_stream)
self.install_requirements(*packages, on_stream=on_stream)

try:
with patch.load_patches(on_stream=on_stream):
Expand Down Expand Up @@ -149,6 +174,11 @@ def __del__(self) -> None:
sys.path.pop(0)


def is_auto_install_disabled() -> bool:
"""Check if auto-install is disabled."""
return os.environ.get("CILLOW_DISABLE_AUTO_INSTALL", "").lower() in ("1", "true", "yes")


def is_running_in_jupyter() -> bool:
"""Check if the interpreter is running in a Jupyter notebook"""
try:
Expand All @@ -159,7 +189,7 @@ def is_running_in_jupyter() -> bool:


def default_stream_processor(stream: Stream | ByteStream) -> None:
"""Default stream processor for the interpreter"""
"""Interpreter's default stream processor."""
if isinstance(stream, Stream):
if stream.type == "stdout":
original = patch.prebuilt.stdout_write_switchable.original
Expand Down
6 changes: 3 additions & 3 deletions cillow/patch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ class PatchProtocol(Protocol):
def __call__(self) -> ContextManager[None]: ...


class PatchWithStreamCaptureProtcol(Protocol):
class StreamCapturePatchProtcol(Protocol):
"""Patch callable protocol with stream capture callback"""

def __call__(self, on_stream: Callable[[Stream | ByteStream], Any]) -> ContextManager[None]: ...


_patches_with_callback: list[PatchWithStreamCaptureProtcol] = []
_patches_with_callback: list[StreamCapturePatchProtcol] = []
_patches_without_callback: list[PatchProtocol] = []


def add_patches(*patches: PatchProtocol | PatchWithStreamCaptureProtcol) -> None:
def add_patches(*patches: PatchProtocol | StreamCapturePatchProtcol) -> None:
"""
Add new patches to be used by all Interpreter instances.
Expand Down
2 changes: 1 addition & 1 deletion cillow/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
max_queue_size: Maximum queue size (defaults to `max_clients * interpreters_per_client * 2`)
"""
self.socket = zmq.Context().socket(zmq.ROUTER)
self._url = f"tcp://127.0.0.1:{port}"
self._url = f"tcp://0.0.0.0:{port}"
self.socket.bind(self._url)

self._client_manager = ClientManager(max_interpreters, interpreters_per_client)
Expand Down
Loading

0 comments on commit cc29351

Please sign in to comment.