Skip to content

Commit

Permalink
Refactor JSON-RPC handling in Python host
Browse files Browse the repository at this point in the history
  • Loading branch information
qianlifeng committed Dec 7, 2024
1 parent 358ac01 commit 0ed7215
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 71 deletions.
4 changes: 3 additions & 1 deletion Wox.Plugin.Host.Nodejs/src/jsonrpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async function action(ctx: Context, request: PluginJsonRpcRequest) {
return
}

return pluginAction({
pluginAction({
ContextData: request.Params.ContextData
})

return
}

async function refresh(ctx: Context, request: PluginJsonRpcRequest) {
Expand Down
37 changes: 24 additions & 13 deletions Wox.Plugin.Host.Python/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import uuid
from typing import Dict, Any
import traceback

import websockets
import logger
Expand All @@ -14,19 +15,23 @@

async def handle_message(ws: websockets.WebSocketServerProtocol, message: str):
"""Handle incoming WebSocket message"""

trace_id = str(uuid.uuid4())
try:
msg_data = json.loads(message)
trace_id = msg_data.get("TraceId", str(uuid.uuid4()))
if msg_data.get("TraceId"):
trace_id = msg_data.get("TraceId")

ctx = new_context_with_value("traceId", trace_id)

if PLUGIN_JSONRPC_TYPE_RESPONSE in message:
# Handle response from Wox
if msg_data.get("Id") in waiting_for_response:
deferred = waiting_for_response[msg_data["Id"]]
if msg_data.get("Error"):
deferred.reject(msg_data["Error"])
deferred.set_exception(Exception(msg_data["Error"]))
else:
deferred.resolve(msg_data.get("Result"))
deferred.set_result(msg_data.get("Result"))
del waiting_for_response[msg_data["Id"]]
elif PLUGIN_JSONRPC_TYPE_REQUEST in message:
# Handle request from Wox
Expand All @@ -39,33 +44,39 @@ async def handle_message(ws: websockets.WebSocketServerProtocol, message: str):
"Type": PLUGIN_JSONRPC_TYPE_RESPONSE,
"Result": result
}
await ws.send(json.dumps(response))
await ws.send(json.dumps(response, default=lambda o: '<not serializable>'))
except Exception as e:
error_stack = traceback.format_exc()
error_response = {
"TraceId": trace_id,
"Id": msg_data["Id"],
"Method": msg_data["Method"],
"Type": PLUGIN_JSONRPC_TYPE_RESPONSE,
"Error": str(e)
}
await logger.error(trace_id, f"handle request failed: {str(e)}")
await ws.send(json.dumps(error_response))
await logger.error(trace_id, f"handle request failed: {str(e)}\nStack trace:\n{error_stack}")
await ws.send(json.dumps(error_response, default=lambda o: '<not serializable>'))
else:
await logger.error(trace_id, f"unknown message type: {message}")
except Exception as e:
await logger.error(str(uuid.uuid4()), f"receive and handle msg error: {message}, err: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(trace_id, f"receive and handle msg error: {message}, err: {str(e)}\nStack trace:\n{error_stack}")

async def handler(websocket: websockets.WebSocketServerProtocol):
"""WebSocket connection handler"""
logger.update_websocket(websocket)

try:
async for message in websocket:
await handle_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
await logger.info(str(uuid.uuid4()), "connection closed")
except Exception as e:
await logger.error(str(uuid.uuid4()), f"connection error: {str(e)}")
while True:
try:
message = await websocket.recv()
asyncio.create_task(handle_message(websocket, message))
except websockets.exceptions.ConnectionClosed:
await logger.info(str(uuid.uuid4()), "connection closed")
break
except Exception as e:
error_stack = traceback.format_exc()
await logger.error(str(uuid.uuid4()), f"connection error: {str(e)}\nStack trace:\n{error_stack}")
finally:
logger.update_websocket(None)

Expand Down
72 changes: 54 additions & 18 deletions Wox.Plugin.Host.Python/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import zipimport
import websockets
import logger
import inspect
from wox_plugin import (
Context,
Plugin,
Expand All @@ -18,9 +19,10 @@
new_context_with_value,
PluginInitParams
)
from constants import PLUGIN_JSONRPC_TYPE_REQUEST, PLUGIN_JSONRPC_TYPE_RESPONSE
from plugin_manager import plugin_instances, waiting_for_response
from plugin_manager import plugin_instances
from plugin_api import PluginAPI
import traceback
import asyncio

async def handle_request_from_wox(ctx: Context, request: Dict[str, Any], ws: websockets.WebSocketServerProtocol) -> Any:
"""Handle incoming request from Wox"""
Expand Down Expand Up @@ -88,7 +90,8 @@ async def load_plugin(ctx: Context, request: Dict[str, Any]) -> None:

await logger.info(ctx["Values"]["traceId"], f"<{plugin_name}> load plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin_name}> load plugin failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin_name}> load plugin failed: {str(e)}\nStack trace:\n{error_stack}")
raise e

async def init_plugin(ctx: Context, request: Dict[str, Any], ws: websockets.WebSocketServerProtocol) -> None:
Expand All @@ -102,6 +105,8 @@ async def init_plugin(ctx: Context, request: Dict[str, Any], ws: websockets.WebS
# Create plugin API instance
api = PluginAPI(ws, plugin_id, plugin["name"])
plugin["api"] = api
plugin["actions"] = {} # Add actions cache
plugin["refreshes"] = {} # Add refreshes cache

# Call plugin's init method if it exists
if hasattr(plugin["plugin"], "init"):
Expand All @@ -110,7 +115,8 @@ async def init_plugin(ctx: Context, request: Dict[str, Any], ws: websockets.WebS

await logger.info(ctx["Values"]["traceId"], f"<{plugin['name']}> init plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> init plugin failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> init plugin failed: {str(e)}\nStack trace:\n{error_stack}")
raise e

async def query(ctx: Context, request: Dict[str, Any]) -> list:
Expand All @@ -124,6 +130,10 @@ async def query(ctx: Context, request: Dict[str, Any]) -> list:
if not hasattr(plugin["plugin"], "query"):
return []

# Clear action and refresh caches before query
plugin["actions"].clear()
plugin["refreshes"].clear()

query_params = Query(
Type=QueryType(request["Params"]["Type"]),
RawQuery=request["Params"]["RawQuery"],
Expand All @@ -135,8 +145,8 @@ async def query(ctx: Context, request: Dict[str, Any]) -> list:
)

results = await plugin["plugin"].query(ctx, query_params)
# Ensure each result has an ID

# Ensure each result has an ID and cache actions and refreshes
if results:
for result in results:
if not result.Id:
Expand All @@ -145,10 +155,16 @@ async def query(ctx: Context, request: Dict[str, Any]) -> list:
for action in result.Actions:
if not action.Id:
action.Id = str(uuid.uuid4())
# Cache action
plugin["actions"][action.Id] = action.Action
# Cache refresh callback if exists
if hasattr(result, "RefreshInterval") and result.RefreshInterval is not None and result.RefreshInterval > 0 and hasattr(result, "OnRefresh"):
plugin["refreshes"][result.Id] = result.OnRefresh

return [result.__dict__ for result in results] if results else []
return [result.to_dict() for result in results]
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> query failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> query failed: {str(e)}\nStack trace:\n{error_stack}")
raise e

async def action(ctx: Context, request: Dict[str, Any]) -> Any:
Expand All @@ -162,13 +178,16 @@ async def action(ctx: Context, request: Dict[str, Any]) -> Any:
action_id = request["Params"]["ActionId"]
context_data = request["Params"].get("ContextData")

# Find the action in the plugin's results
if hasattr(plugin["plugin"], "handle_action"):
return await plugin["plugin"].handle_action(action_id, context_data)
# Get action from cache
action_func = plugin["actions"].get(action_id)
if action_func:
# Don't await the action, let it run independently
asyncio.create_task(action_func({"ContextData": context_data}))

return None
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> action failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> action failed: {str(e)}\nStack trace:\n{error_stack}")
raise e

async def refresh(ctx: Context, request: Dict[str, Any]) -> Any:
Expand All @@ -180,14 +199,30 @@ async def refresh(ctx: Context, request: Dict[str, Any]) -> Any:

try:
result_id = request["Params"]["ResultId"]

# Find the refresh callback in the plugin's results
if hasattr(plugin["plugin"], "handle_refresh"):
return await plugin["plugin"].handle_refresh(result_id)
refreshable_result = json.loads(request["Params"]["RefreshableResult"])

# Get refresh callback from cache
refresh_func = plugin["refreshes"].get(result_id)
if refresh_func:
refreshed_result = await refresh_func(refreshable_result)

# Cache any new actions from the refreshed result
if refreshed_result.Actions:
for action in refreshed_result.Actions:
if not action.Id:
action.Id = str(uuid.uuid4())
plugin["actions"][action.Id] = action.Action

# Cache refresh callback if exists
if hasattr(refreshed_result, "RefreshInterval") and refreshed_result.RefreshInterval is not None and refreshed_result.RefreshInterval > 0 and hasattr(refreshed_result, "OnRefresh"):
plugin["refreshes"][result_id] = refreshed_result.OnRefresh

return refreshed_result.to_dict()

return None
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> refresh failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> refresh failed: {str(e)}\nStack trace:\n{error_stack}")
raise e

async def unload_plugin(ctx: Context, request: Dict[str, Any]) -> None:
Expand All @@ -211,5 +246,6 @@ async def unload_plugin(ctx: Context, request: Dict[str, Any]) -> None:

await logger.info(ctx["Values"]["traceId"], f"<{plugin['name']}> unload plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> unload plugin failed: {str(e)}")
error_stack = traceback.format_exc()
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> unload plugin failed: {str(e)}\nStack trace:\n{error_stack}")
raise e
3 changes: 2 additions & 1 deletion Wox.Plugin.Host.Python/plugin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
Conversation,
ChatStreamFunc,
)
from jsonrpc import PLUGIN_JSONRPC_TYPE_REQUEST, waiting_for_response
from constants import PLUGIN_JSONRPC_TYPE_REQUEST
from plugin_manager import waiting_for_response

class PluginAPI(PublicAPI):
def __init__(self, ws: websockets.WebSocketServerProtocol, plugin_id: str, plugin_name: str):
Expand Down
2 changes: 1 addition & 1 deletion Wox.Plugin.Host.Python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Python host for Wox plugins"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"wox-plugin>=0.1.0",
"loguru",
"websockets",
"wox-plugin==0.0.24",
]
3 changes: 0 additions & 3 deletions Wox.Plugin.Host.Python/requirements.txt

This file was deleted.

14 changes: 7 additions & 7 deletions Wox.Plugin.Host.Python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Wox.Plugin.Python/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
publish:
release:
python publish.py patch
12 changes: 11 additions & 1 deletion Wox.Plugin.Python/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def run_command(command: str) -> int:
return subprocess.call(command, shell=True)

def update_version(version_type: str) -> str:
"""Update version number
"""Update version number in setup.py, pyproject.toml and __init__.py
version_type: major, minor, or patch
"""
# Read setup.py
Expand Down Expand Up @@ -47,6 +47,16 @@ def update_version(version_type: str) -> str:
)
setup_path.write_text(new_content)

# Update pyproject.toml
pyproject_path = Path("pyproject.toml")
pyproject_content = pyproject_path.read_text()
new_pyproject_content = re.sub(
r'version = "(\d+)\.(\d+)\.(\d+)"',
f'version = "{new_version}"',
pyproject_content
)
pyproject_path.write_text(new_pyproject_content)

# Update __init__.py
init_path = Path("wox_plugin/__init__.py")
init_content = init_path.read_text()
Expand Down
2 changes: 1 addition & 1 deletion Wox.Plugin.Python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "wox-plugin"
version = "0.1.0"
version = "0.0.24"
description = "Python types for Wox plugins"
readme = "README.md"
requires-python = ">=3.12"
Expand Down
3 changes: 2 additions & 1 deletion Wox.Plugin.Python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

setup(
name="wox-plugin",
version="0.0.18",
version="0.0.24",
description="All Python plugins for Wox should use types in this package",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Wox-launcher",
author_email="",
url="https://github.com/Wox-launcher/Wox",
packages=find_packages(),
package_data={"wox_plugin": ["py.typed"]},
install_requires=[
"typing_extensions>=4.0.0; python_version < '3.8'"
],
Expand Down
4 changes: 2 additions & 2 deletions Wox.Plugin.Python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Wox.Plugin.Python/wox_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@
PluginInitParams,
)

__version__ = "0.0.18"
__version__ = "0.0.24"
Empty file.
Loading

0 comments on commit 0ed7215

Please sign in to comment.