Skip to content

Commit

Permalink
Merge pull request #493 from VikParuchuri/hotfix-2
Browse files Browse the repository at this point in the history
Hotfix scripts
  • Loading branch information
VikParuchuri authored Jan 19, 2025
2 parents d154d8d + bf749b9 commit f3cec23
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 531 deletions.
22 changes: 2 additions & 20 deletions chunk_convert.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
import argparse
import subprocess
import pkg_resources


def main():
parser = argparse.ArgumentParser(description="Convert a folder of PDFs to a folder of markdown files in chunks.")
parser.add_argument("in_folder", help="Input folder with pdfs.")
parser.add_argument("out_folder", help="Output folder")
args = parser.parse_args()

script_path = pkg_resources.resource_filename(__name__, 'chunk_convert.sh')

# Construct the command
cmd = f"{script_path} {args.in_folder} {args.out_folder}"

# Execute the shell script
subprocess.run(cmd, shell=True, check=True)

from marker.scripts import chunk_convert_cli

if __name__ == "__main__":
main()
chunk_convert_cli()
114 changes: 1 addition & 113 deletions convert.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,4 @@
import os

os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Transformers uses .isin for a simple op, which is not supported on MPS
os.environ["IN_STREAMLIT"] = "true" # Avoid multiprocessing inside surya

import math
import traceback

import click
import torch.multiprocessing as mp
from tqdm import tqdm

from marker.config.parser import ConfigParser
from marker.converters.pdf import PdfConverter
from marker.logger import configure_logging
from marker.models import create_model_dict
from marker.output import output_exists, save_output
from marker.settings import settings

configure_logging()


def worker_init(model_dict):
if model_dict is None:
model_dict = create_model_dict()

global model_refs
model_refs = model_dict


def worker_exit():
global model_refs
del model_refs


def process_single_pdf(args):
fpath, cli_options = args
config_parser = ConfigParser(cli_options)

out_folder = config_parser.get_output_folder(fpath)
base_name = config_parser.get_base_filename(fpath)
if cli_options.get('skip_existing') and output_exists(out_folder, base_name):
return

try:
converter = PdfConverter(
config=config_parser.generate_config_dict(),
artifact_dict=model_refs,
processor_list=config_parser.get_processors(),
renderer=config_parser.get_renderer()
)
rendered = converter(fpath)
out_folder = config_parser.get_output_folder(fpath)
save_output(rendered, out_folder, base_name)
except Exception as e:
print(f"Error converting {fpath}: {e}")
print(traceback.format_exc())


@click.command()
@click.argument("in_folder", type=str)
@ConfigParser.common_options
@click.option("--chunk_idx", type=int, default=0, help="Chunk index to convert")
@click.option("--num_chunks", type=int, default=1, help="Number of chunks being processed in parallel")
@click.option("--max_files", type=int, default=None, help="Maximum number of pdfs to convert")
@click.option("--workers", type=int, default=5, help="Number of worker processes to use.")
@click.option("--skip_existing", is_flag=True, default=False, help="Skip existing converted files.")
def main(in_folder: str, **kwargs):
in_folder = os.path.abspath(in_folder)
files = [os.path.join(in_folder, f) for f in os.listdir(in_folder)]
files = [f for f in files if os.path.isfile(f)]

# Handle chunks if we're processing in parallel
# Ensure we get all files into a chunk
chunk_size = math.ceil(len(files) / kwargs["num_chunks"])
start_idx = kwargs["chunk_idx"] * chunk_size
end_idx = start_idx + chunk_size
files_to_convert = files[start_idx:end_idx]

# Limit files converted if needed
if kwargs["max_files"]:
files_to_convert = files_to_convert[:kwargs["max_files"]]

# Disable nested multiprocessing
kwargs["disable_multiprocessing"] = True

total_processes = min(len(files_to_convert), kwargs["workers"])

try:
mp.set_start_method('spawn') # Required for CUDA, forkserver doesn't work
except RuntimeError:
raise RuntimeError("Set start method to spawn twice. This may be a temporary issue with the script. Please try running it again.")

if settings.TORCH_DEVICE == "mps" or settings.TORCH_DEVICE_MODEL == "mps":
model_dict = None
else:
model_dict = create_model_dict()
for k, v in model_dict.items():
v.share_memory()

print(f"Converting {len(files_to_convert)} pdfs in chunk {kwargs['chunk_idx'] + 1}/{kwargs['num_chunks']} with {total_processes} processes and saving to {kwargs['output_dir']}")
task_args = [(f, kwargs) for f in files_to_convert]

with mp.Pool(processes=total_processes, initializer=worker_init, initargs=(model_dict,)) as pool:
list(tqdm(pool.imap(process_single_pdf, task_args), total=len(task_args), desc="Processing PDFs", unit="pdf"))

pool._worker_handler.terminate = worker_exit

# Delete all CUDA tensors
del model_dict

from marker.scripts import convert_cli

if __name__ == "__main__":
main()
43 changes: 2 additions & 41 deletions convert_single.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,4 @@
import os

os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Transformers uses .isin for a simple op, which is not supported on MPS

import time
import click

from marker.config.parser import ConfigParser
from marker.config.printer import CustomClickPrinter
from marker.converters.pdf import PdfConverter
from marker.logger import configure_logging
from marker.models import create_model_dict
from marker.output import save_output

configure_logging()


@click.command(cls=CustomClickPrinter, help="Convert a single PDF to markdown.")
@click.argument("fpath", type=str)
@ConfigParser.common_options
def main(fpath: str, **kwargs):
models = create_model_dict()
start = time.time()
config_parser = ConfigParser(kwargs)

converter = PdfConverter(
config=config_parser.generate_config_dict(),
artifact_dict=models,
processor_list=config_parser.get_processors(),
renderer=config_parser.get_renderer()
)
rendered = converter(fpath)
out_folder = config_parser.get_output_folder(fpath)
save_output(rendered, out_folder, config_parser.get_base_filename(fpath))

print(f"Saved markdown to {out_folder}")
print(f"Total time: {time.time() - start}")

from marker.scripts import convert_single_cli

if __name__ == "__main__":
main()
convert_single_cli()
5 changes: 5 additions & 0 deletions marker/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from marker.scripts.convert_single import convert_single_cli
from marker.scripts.convert import convert_cli
from marker.scripts.server import server_cli
from marker.scripts.run_streamlit_app import streamlit_app_cli
from marker.scripts.chunk_convert import chunk_convert_cli
20 changes: 20 additions & 0 deletions marker/scripts/chunk_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import argparse
import os
import subprocess
import pkg_resources


def chunk_convert_cli():
parser = argparse.ArgumentParser(description="Convert a folder of PDFs to a folder of markdown files in chunks.")
parser.add_argument("in_folder", help="Input folder with pdfs.")
parser.add_argument("out_folder", help="Output folder")
args = parser.parse_args()

cur_dir = os.path.dirname(os.path.abspath(__file__))
script_path = os.path.join(cur_dir, "chunk_convert.sh")

# Construct the command
cmd = f"{script_path} {args.in_folder} {args.out_folder}"

# Execute the shell script
subprocess.run(cmd, shell=True, check=True)
File renamed without changes.
114 changes: 114 additions & 0 deletions marker/scripts/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os

os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Transformers uses .isin for a simple op, which is not supported on MPS
os.environ["IN_STREAMLIT"] = "true" # Avoid multiprocessing inside surya

import math
import traceback

import click
import torch.multiprocessing as mp
from tqdm import tqdm

from marker.config.parser import ConfigParser
from marker.config.printer import CustomClickPrinter
from marker.logger import configure_logging
from marker.models import create_model_dict
from marker.output import output_exists, save_output
from marker.settings import settings

configure_logging()


def worker_init(model_dict):
if model_dict is None:
model_dict = create_model_dict()

global model_refs
model_refs = model_dict


def worker_exit():
global model_refs
del model_refs


def process_single_pdf(args):
fpath, cli_options = args
config_parser = ConfigParser(cli_options)

out_folder = config_parser.get_output_folder(fpath)
base_name = config_parser.get_base_filename(fpath)
if cli_options.get('skip_existing') and output_exists(out_folder, base_name):
return

converter_cls = config_parser.get_converter_cls()

try:
converter = converter_cls(
config=config_parser.generate_config_dict(),
artifact_dict=model_refs,
processor_list=config_parser.get_processors(),
renderer=config_parser.get_renderer()
)
rendered = converter(fpath)
out_folder = config_parser.get_output_folder(fpath)
save_output(rendered, out_folder, base_name)
except Exception as e:
print(f"Error converting {fpath}: {e}")
print(traceback.format_exc())


@click.command(cls=CustomClickPrinter)
@click.argument("in_folder", type=str)
@ConfigParser.common_options
@click.option("--chunk_idx", type=int, default=0, help="Chunk index to convert")
@click.option("--num_chunks", type=int, default=1, help="Number of chunks being processed in parallel")
@click.option("--max_files", type=int, default=None, help="Maximum number of pdfs to convert")
@click.option("--workers", type=int, default=5, help="Number of worker processes to use.")
@click.option("--skip_existing", is_flag=True, default=False, help="Skip existing converted files.")
def convert_cli(in_folder: str, **kwargs):
in_folder = os.path.abspath(in_folder)
files = [os.path.join(in_folder, f) for f in os.listdir(in_folder)]
files = [f for f in files if os.path.isfile(f)]

# Handle chunks if we're processing in parallel
# Ensure we get all files into a chunk
chunk_size = math.ceil(len(files) / kwargs["num_chunks"])
start_idx = kwargs["chunk_idx"] * chunk_size
end_idx = start_idx + chunk_size
files_to_convert = files[start_idx:end_idx]

# Limit files converted if needed
if kwargs["max_files"]:
files_to_convert = files_to_convert[:kwargs["max_files"]]

# Disable nested multiprocessing
kwargs["disable_multiprocessing"] = True

total_processes = min(len(files_to_convert), kwargs["workers"])

try:
mp.set_start_method('spawn') # Required for CUDA, forkserver doesn't work
except RuntimeError:
raise RuntimeError("Set start method to spawn twice. This may be a temporary issue with the script. Please try running it again.")

if settings.TORCH_DEVICE == "mps" or settings.TORCH_DEVICE_MODEL == "mps":
model_dict = None
else:
model_dict = create_model_dict()
for k, v in model_dict.items():
v.share_memory()

print(f"Converting {len(files_to_convert)} pdfs in chunk {kwargs['chunk_idx'] + 1}/{kwargs['num_chunks']} with {total_processes} processes and saving to {kwargs['output_dir']}")
task_args = [(f, kwargs) for f in files_to_convert]

with mp.Pool(processes=total_processes, initializer=worker_init, initargs=(model_dict,)) as pool:
list(tqdm(pool.imap(process_single_pdf, task_args), total=len(task_args), desc="Processing PDFs", unit="pdf"))

pool._worker_handler.terminate = worker_exit

# Delete all CUDA tensors
del model_dict
39 changes: 39 additions & 0 deletions marker/scripts/convert_single.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os

os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Transformers uses .isin for a simple op, which is not supported on MPS

import time
import click

from marker.config.parser import ConfigParser
from marker.config.printer import CustomClickPrinter
from marker.logger import configure_logging
from marker.models import create_model_dict
from marker.output import save_output

configure_logging()


@click.command(cls=CustomClickPrinter, help="Convert a single PDF to markdown.")
@click.argument("fpath", type=str)
@ConfigParser.common_options
def convert_single_cli(fpath: str, **kwargs):
models = create_model_dict()
start = time.time()
config_parser = ConfigParser(kwargs)

converter_cls = config_parser.get_converter_cls()
converter = converter_cls(
config=config_parser.generate_config_dict(),
artifact_dict=models,
processor_list=config_parser.get_processors(),
renderer=config_parser.get_renderer()
)
rendered = converter(fpath)
out_folder = config_parser.get_output_folder(fpath)
save_output(rendered, out_folder, config_parser.get_base_filename(fpath))

print(f"Saved markdown to {out_folder}")
print(f"Total time: {time.time() - start}")
8 changes: 2 additions & 6 deletions run_marker_app.py → marker/scripts/run_streamlit_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
import os


def run():
def streamlit_app_cli():
cur_dir = os.path.dirname(os.path.abspath(__file__))
app_path = os.path.join(cur_dir, "marker_app.py")
app_path = os.path.join(cur_dir, "streamlit_app.py")
cmd = ["streamlit", "run", app_path]
subprocess.run(cmd, env={**os.environ, "IN_STREAMLIT": "true"})


if __name__ == "__main__":
run()
Loading

0 comments on commit f3cec23

Please sign in to comment.