-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
124 lines (96 loc) · 3.61 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import os
from contextlib import asynccontextmanager
import fitz
import magic_pdf.model as model_config
import xoscar as xo
from fastapi import FastAPI, WebSocket
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from loguru import logger
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
from starlette.websockets import WebSocketDisconnect
from settings import StorageType, settings
model_config.__use_inside_model__ = True
ACTORS = []
@asynccontextmanager
async def lifespan(app: FastAPI):
await xo.create_actor_pool(address=settings.XOSCAR_ADDRESS, n_process=settings.XOSCAR_ACTORS)
pool_config = await xo.get_pool_config(settings.XOSCAR_ADDRESS)
pool_addresses = pool_config.get_external_addresses()
for i, address in enumerate(pool_addresses):
actor = await xo.create_actor(
PDFOscarActor,
address=address,
uid=str(i),
)
ACTORS.append(actor)
yield
for actor in ACTORS:
await xo.destroy_actor(actor)
app = FastAPI(lifespan=lifespan)
class PDFOscarActor(xo.Actor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._available = True
@classmethod
def get_image_writer(cls):
if settings.STORAGE_TYPE == StorageType.LOCAL:
return DiskReaderWriter(settings.IMAGE_DIR)
return S3ReaderWriter(
settings.MINIO_ACCESS_KEY,
settings.MINIO_SECRET_KEY,
settings.minio_endpoint_url,
parent_path=f"s3://{settings.MINIO_BUCKET}/",
)
@classmethod
def _get_img_parent_path(cls):
if settings.STORAGE_TYPE == StorageType.LOCAL:
return f"{settings.SERVER_URL}/images"
return f"{settings.minio_endpoint_url}/{settings.MINIO_BUCKET}"
@classmethod
async def convert_pdf_to_md(cls, pdf_bytes):
jso_useful_key = {"_pdf_type": "", "model_list": []}
image_writer = cls.get_image_writer()
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
pipe.pipe_classify()
pipe.pipe_analyze()
pipe.pipe_parse()
img_parent_path = cls._get_img_parent_path()
md_content: str = pipe.pipe_mk_markdown(img_parent_path, drop_mode="none")
return md_content
def available(self):
return self._available
async def serve(self, websocket: WebSocket):
self._available = False
while True:
try:
page_data = await websocket.receive_bytes()
if not page_data:
break
doc = fitz.open("pdf", page_data)
text = await self.convert_pdf_to_md(doc.tobytes())
await websocket.send_text(text)
except WebSocketDisconnect:
logger.info("Connection <{}:{}> closed", websocket.client.host, websocket.client.port)
break
except Exception as e:
logger.exception("Unknown Error: {}", e)
break
self._available = True
@app.get("/")
async def get_index():
return FileResponse("web/index.html")
@app.websocket("/ws")
async def stage(websocket: WebSocket):
await websocket.accept()
while True:
for actor in ACTORS:
if await actor.available():
await actor.serve(websocket)
return
await asyncio.sleep(1)
os.makedirs(settings.IMAGE_DIR, exist_ok=True)
app.mount("/images", StaticFiles(directory=settings.IMAGE_DIR), name="images")