Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
feat(workflow): support workflow executor pool
Browse files Browse the repository at this point in the history
  • Loading branch information
fu050409 committed Mar 4, 2024
1 parent ae3278a commit 68d16a7
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 35 deletions.
55 changes: 20 additions & 35 deletions src/dicergirl/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from nonebot.adapters import Event
from nonebot.matcher import Matcher
from infini.input import Input
from infini.loader import Loader
from infini.injector import Injector
from diceutils.utils import format_msg
from diceutils.parser import CommandParser, Commands, Optional, Bool
from diceutils.status import StatusPool

from .utils import hmr, get_core
from .workflow import put, workflows
import json
import importlib
import sys


class Interceptor:
Expand All @@ -36,38 +37,11 @@ async def __call__(self) -> bool:
return True


injector = Injector()
interceptor = on_message(Rule(Interceptor()), priority=1, block=True)
ipm = on_startswith(".ipm", priority=0, block=True)

packages = ["dicergirl"]

with Loader() as loader:
for package in packages:
loader.load(package)
core = loader.into_core()


def hmr():
global core
importlib.invalidate_caches()

for package in packages:
for name in [name for name in sys.modules.keys() if name.startswith(package)]:
sys.modules[name] = (
importlib.reload(sys.modules[name])
if name in sys.modules
else importlib.import_module(name)
)
sys.modules[package] = (
importlib.reload(sys.modules[package])
if package in sys.modules
else importlib.import_module(package)
)

with Loader() as loader:
for package in packages:
loader.load(package)
core = loader.into_core()
hmr()


@ipm.handle()
Expand All @@ -85,18 +59,24 @@ async def ipm_handler(event: Event, matcher: Matcher):
auto=True,
).results

status = StatusPool.get("dicergirl")

if commands["hmr"]:
hmr()
return await matcher.send("Infini 热重载完毕")

if commands["add"]:
packages = status.get("bot", "packages") or []
packages.append(commands["add"])
status.set("bot", "packages", packages)
hmr()
return await matcher.send(f"规则包[{commands['add']}]挂载完成")

if commands["remove"]:
packages = status.get("bot", "packages") or []
if commands["remove"] in packages:
packages.remove(commands["remove"])
status.set("bot", "packages", packages)
return await matcher.send(f"规则包[{commands['remove']}]卸载完成")
return await matcher.send(f"规则包[{commands['remove']}]未挂载")

Expand All @@ -122,7 +102,7 @@ async def handler(event: Event, matcher: Matcher):
session_id = event.get_session_id()

plain_text = event.get_plaintext()
message = event.get_message()
message = [{"type": msg.type, "data": msg.data} for msg in event.get_message()]

input = Input(
plain_text,
Expand All @@ -140,5 +120,10 @@ async def handler(event: Event, matcher: Matcher):
},
)

for output in core.input(input):
await matcher.send(output)
for output in get_core().input(input):
if isinstance(output, str):
await matcher.send(output)
else:
parameters = {"output": output}
parameters.update(output.variables)
put(injector.inject(workflows.get(output.name), parameters=parameters))
42 changes: 42 additions & 0 deletions src/dicergirl/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from diceutils.status import StatusPool
from infini.core import Core
from infini.loader import Loader
from infini.output import Output

import importlib
import sys

status = StatusPool.register("dicergirl")
core: Core


def get_core():
return core


def hmr(output: Output = None):
global core
importlib.invalidate_caches()

packages = status.get("bot", "packages") or []

for package in packages:
for name in [name for name in sys.modules.keys() if name.startswith(package)]:
sys.modules[name] = (
importlib.reload(sys.modules[name])
if name in sys.modules
else importlib.import_module(name)
)
sys.modules[package] = (
importlib.reload(sys.modules[package])
if package in sys.modules
else importlib.import_module(package)
)

with Loader() as loader:
for package in packages:
loader.load(package)
core = loader.into_core()

if output:
output.status = 0
10 changes: 10 additions & 0 deletions src/dicergirl/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Callable
from .utils import hmr

pool = ThreadPoolExecutor(20)
workflows = {"echo.hmr": hmr}


def put(func: Callable):
pool.submit(func)

0 comments on commit 68d16a7

Please sign in to comment.