Skip to content

Commit

Permalink
🔧 refactor(interpreters): modularize interpreter classes
Browse files Browse the repository at this point in the history
- Move BaseInterpreter and MermaidInterpreter to interpreters.py
- Introduce Interpreters list for managing interpreter instances
- Adjust imports and references in related files to use new module

🗑️ chore(types): extract content types to separate module

- Extract ContentTypes and related classes to type.py
- Update imports and references to use new type module
  • Loading branch information
sudoskys committed Dec 13, 2024
1 parent 8ab5404 commit 12d09bf
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 223 deletions.
2 changes: 1 addition & 1 deletion playground/inspect_telegramify.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from telebot import TeleBot

import telegramify_markdown
from telegramify_markdown import ContentTypes
from telegramify_markdown.interpreters import ContentTypes
from telegramify_markdown.customize import markdown_symbol

tips = """
Expand Down
2 changes: 1 addition & 1 deletion playground/telegramify_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from telebot import TeleBot

import telegramify_markdown
from telegramify_markdown import ContentTypes
from telegramify_markdown.interpreters import ContentTypes
from telegramify_markdown.customize import markdown_symbol

tips = """
Expand Down
230 changes: 9 additions & 221 deletions src/telegramify_markdown/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
import dataclasses
import re
from abc import ABCMeta
from enum import Enum
from typing import Union, List, Tuple, Any, Callable
from typing import Union, List, Tuple, Any

import mistletoe
from mistletoe.block_token import BlockToken, ThematicBreak # noqa
from mistletoe.markdown_renderer import LinkReferenceDefinition, BlankLine
from mistletoe.span_token import SpanToken # noqa

from . import customize
from .interpreters import Text, File, Photo, BaseInterpreter, MermaidInterpreter, Interpreters
from .latex_escape.const import LATEX_SYMBOLS, NOT_MAP, LATEX_STYLES
from .latex_escape.helper import LatexToUnicodeHelper
from .logger import logger
from .mermaid import render_mermaid
from .mime import get_filename
from .render import TelegramMarkdownRenderer, escape_markdown
from .type import Text, File, Photo

__all__ = [
"escape_markdown",
"customize",
"markdownify",
"telegramify",
"ContentTypes",
"BaseInterpreter",
"Interpreters",
]
TaskType = Tuple[str, List[Tuple[Any, Any]]]
SentType = List[Union["Text", "File", "Photo"]]
latex_escape_helper = LatexToUnicodeHelper()


Expand Down Expand Up @@ -97,44 +95,6 @@ def _update_block(token: BlockToken):
_update_text(token)


class ContentTypes(Enum):
TEXT = "text"
FILE = "file"
PHOTO = "photo"


class RenderedContent(object, metaclass=ABCMeta):
"""
The rendered content.
- content: str
- content_type: ContentTypes
"""
content_type: ContentTypes


@dataclasses.dataclass
class Text(RenderedContent):
content: str
content_type: ContentTypes = ContentTypes.TEXT


@dataclasses.dataclass
class File(RenderedContent):
file_name: str
file_data: bytes
caption: str = ""
content_type: ContentTypes = ContentTypes.FILE


@dataclasses.dataclass
class Photo(RenderedContent):
file_name: str
file_data: bytes
caption: str = ""
content_type: ContentTypes = ContentTypes.PHOTO


class PackHelper(object):
@staticmethod
def process_long_pack(__token1_l: list, __token2_l: list, render_func: callable):
Expand Down Expand Up @@ -186,178 +146,6 @@ def process_short_pack(__token1_l, __token2_l, render_func):
return _processed


class BaseInterpreter(object):
name = "base"

def merge(self, tasks: List[TaskType]) -> List[TaskType]:
"""
Merge the tasks.
:param tasks: [(base, [(token1,token2),(token1,token2)]), (base, [(token1,token2),(token1,token2)])]
:return:
"""
return tasks

def split(self, task: TaskType) -> List[TaskType]:
"""
Split the task.
:param task: (base, [(token1,token2),(token1,token2)])
:return: [(base, [(token1,token2),(token1,token2)]),....newTask]
"""
return [task]

def render_task(self,
task: TaskType,
render_block_func: Callable[[List[Any]], str],
render_lines_func: Callable[[str], str],
max_word_count: int = 4090
) -> SentType:
"""
Render the task.
:param render_block_func: The render block function
:param render_lines_func: The render lines function
:param task: (base, [(token1,token2),(token1,token2)])
:param max_word_count: The maximum number of words in a single message.
:return: SentType
"""
task_type, token_pairs = task
if task_type != "base":
logger.warn("Invalid task type for BaseInterpreter.")
token1_l = list(__token1 for __token1, __token2 in token_pairs)
token2_l = list(__token2 for __token1, __token2 in token_pairs)
# 处理超过最大字数限制的情况
if len(render_block_func(token1_l)) > max_word_count:
# 如果超过最大字数限制
if all(isinstance(_per_token1, mistletoe.block_token.CodeFence) for _per_token1 in token1_l) and len(
token1_l) == 1 and len(token2_l) == 1:
# 如果这个 pack 是完全的 code block,那么采用文件形式发送。否则采用文本形式发送。
_escaped_code = token1_l[0]
_unescaped_code_child = list(token1_l[0].children)
file_content = render_block_func(token2_l)
if _unescaped_code_child:
_code_text = _unescaped_code_child[0]
if isinstance(_code_text, mistletoe.span_token.RawText):
file_content = _code_text.content
lang = "txt"
if isinstance(_escaped_code, mistletoe.block_token.CodeFence):
lang = _escaped_code.language
"""
if lang.lower() == "mermaid":
try:
image_io, caption = render_mermaid(file_content.replace("```mermaid", "").replace("```", ""))
return [Photo(file_name="mermaid.png", file_data=image_io.getvalue(), caption=caption)]
except Exception as e:
pass
"""
file_name = get_filename(line=render_block_func(token1_l), language=lang)
return [File(file_name=file_name, file_data=file_content.encode(), caption="")]
# 如果超过最大字数限制
return [File(file_name="letter.txt", file_data=render_block_func(token2_l).encode(), caption="")]
# 没有超过最大字数限制
return [Text(content=render_block_func(token1_l))]


class MermaidInterpreter(BaseInterpreter):
name = "mermaid"

def merge(self, tasks: List[TaskType]) -> List[TaskType]:
"""
Merge the tasks.
:param tasks: [(base, [(token1,token2),(token1,token2)]), (base, [(token1,token2),(token1,token2)])]
:return:
"""
return tasks

def split(self, task: TaskType) -> List[TaskType]:
"""
Split the task.
:param task: (base, [(token1,token2),(token1,token2)])
:return: [(mermaid, [(token1,token2),(token1,token2)]),....newTask]
"""
task_type, token_pairs = task
# 只处理 base 块
if task_type != "base":
return [task]
# 用于存放生成的新任务
tasks = []
# 临时缓存非 Mermaid 块
current_base_tokens = []
for token_pair in token_pairs:
token1, _ = token_pair
# 检查是否为 Mermaid 块
if isinstance(token1, mistletoe.block_token.CodeFence) and token1.language.lower() == "mermaid":
if current_base_tokens:
# 将缓存的非 Mermaid 块生成新的 base 任务
tasks.append(("base", current_base_tokens))
current_base_tokens = []
# 单独添加 Mermaid 块任务
tasks.append(("mermaid", [token_pair]))
else:
# 累积 base 块
current_base_tokens.append(token_pair)
# 处理剩余的 base 块
if current_base_tokens:
tasks.append(("base", current_base_tokens))
return tasks

def render_task(self,
task: TaskType,
render_block_func: Callable[[List[Any]], str],
render_lines_func: Callable[[str], str],
max_word_count: int = 4090
) -> SentType:
"""
Render the task.#
:param task: (base, [(token1,token2),(token1,token2)]) of [(base, [(token1,token2),(token1,token2)]), (base, [(token1,token2),(token1,token2)])]
:param render_block_func: The render block function
:param render_lines_func: The render lines function
:param max_word_count: The maximum number of words in a single message.
:return: SentType
"""
task_type, token_pairs = task
if task_type != "mermaid":
raise ValueError("Invalid task type for MermaidInterpreter.")
# 仅处理 Mermaid 块
if len(token_pairs) != 1:
raise ValueError("Invalid token length for MermaidInterpreter.")
token1_l = list(__token1 for __token1, __token2 in token_pairs)
token2_l = list(__token2 for __token1, __token2 in token_pairs)
if not all(isinstance(_per_token, mistletoe.block_token.CodeFence) for _per_token in token1_l):
raise ValueError("Invalid token type for MermaidInterpreter.")
_escaped_code = token2_l[0]
if (isinstance(
_escaped_code,
mistletoe.block_token.CodeFence
) and _escaped_code.language.lower() == "mermaid"):
file_content = render_block_func(token1_l)
_unescaped_code_child = list(_escaped_code.children)
if _unescaped_code_child:
_raw_text = _unescaped_code_child[0]
if isinstance(_raw_text, mistletoe.span_token.RawText):
file_content = _raw_text.content
try:
img_io, url = render_mermaid(file_content.replace("```mermaid", "").replace("```", ""))
message = f"[edit in mermaid.live]({url})"
except Exception as e:
return [
File(
file_name="mermaid_code.txt",
file_data=render_block_func(token2_l).encode(),
caption=""
)
]
else:
return [
Photo(
file_name="mermaid.png",
file_data=img_io.getvalue(),
caption=render_lines_func(message)
)
]
return [
File(file_name="mermaid_code.txt", file_data=render_block_func(token2_l).encode(), caption="")
]


def telegramify(
content: str,
max_line_length: int = None,
Expand Down Expand Up @@ -428,11 +216,11 @@ def render_lines(lines: str):
_packed.append(_stack)
_task = [("base", cell) for cell in _packed]
# [(base, [(token1,token2),(token1,token2)]), (base, [(token1,token2),(token1,token2)])]
interpreters = [BaseInterpreter(), MermaidInterpreter()]
interpreters_map = {interpreter.name: interpreter for interpreter in interpreters}
for interpreter in interpreters:

interpreters_map = {interpreter.name: interpreter for interpreter in Interpreters}
for interpreter in Interpreters:
_task = interpreter.merge(_task)
for interpreter in interpreters:
for interpreter in Interpreters:
_new_task = []
for _per_task in _task:
_new_task.extend(interpreter.split(_per_task))
Expand Down
Loading

0 comments on commit 12d09bf

Please sign in to comment.