Skip to content

Commit

Permalink
moved file conversion to process, added pdf url to output
Browse files Browse the repository at this point in the history
  • Loading branch information
ishaan99k committed Oct 4, 2024
1 parent 41a77f1 commit 1a0e690
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file should undo anything in `up.sql`
-- Drop the new trigger
DROP TRIGGER IF EXISTS validate_usage_trigger ON TASKS;

-- Recreate the original trigger
CREATE OR REPLACE TRIGGER validate_usage_trigger
BEFORE INSERT ON TASKS
FOR EACH ROW
EXECUTE FUNCTION validate_usage();
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Your SQL goes here
-- Drop the existing trigger
DROP TRIGGER IF EXISTS validate_usage_trigger ON TASKS;

-- Create the new trigger
CREATE TRIGGER validate_usage_trigger
BEFORE UPDATE ON TASKS
FOR EACH ROW
WHEN (NEW.page_count > 0)
EXECUTE FUNCTION validate_usage();
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
ALTER TABLE tasks
DROP COLUMN pdf_location,
DROP COLUMN input_file_type;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TABLE tasks
ADD COLUMN pdf_location TEXT,
ADD COLUMN input_file_type TEXT;
1 change: 1 addition & 0 deletions chunkmydocs/src/models/server/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct TaskResponse {
pub configuration: Configuration,
pub file_name: Option<String>,
pub page_count: Option<i32>,
pub pdf_location: Option<String>,
}

#[derive(
Expand Down
89 changes: 9 additions & 80 deletions chunkmydocs/src/utils/server/create_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,17 @@ use crate::models::{
server::extract::{Configuration, ExtractionPayload, SegmentationModel},
server::task::{Status, TaskResponse},
};
use crate::task::pdf::convert_to_pdf;
use crate::utils::configs::extraction_config::Config;
use crate::utils::db::deadpool_postgres::{Client, Pool};
use crate::utils::rrq::service::produce;
use crate::utils::storage::services::{generate_presigned_url, upload_to_s3};
use actix_multipart::form::tempfile::TempFile;
use aws_sdk_s3::Client as S3Client;
use chrono::{DateTime, Utc};
use lopdf::Document;
use std::error::Error;
use std::path::Path;
use std::path::PathBuf;
use uuid::Uuid;


fn is_valid_file_type(
original_file_name: &str,
) -> Result<(bool, String), Box<dyn Error>> {
let extension = Path::new(original_file_name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("");

let is_valid = match extension.to_lowercase().as_str() {
"pdf" | "docx" | "doc" | "pptx" | "ppt" | "xlsx" | "xls" => true,
_ => false,
};

Ok((is_valid, format!("application/{}", extension)))
}

async fn produce_extraction_payloads(
extraction_payload: ExtractionPayload,
) -> Result<(), Box<dyn Error>> {
Expand All @@ -59,7 +39,6 @@ async fn produce_extraction_payloads(

Ok(())
}
use tempfile::NamedTempFile;
pub async fn create_task(
pool: &Pool,
s3_client: &S3Client,
Expand All @@ -84,53 +63,9 @@ pub async fn create_task(
let base_url = config.base_url;
let task_url = format!("{}/api/v1/task/{}", base_url, task_id);

let file_name = file.file_name.as_deref().unwrap_or("unknown");
let original_path = PathBuf::from(file.file.path());
let (is_valid, detected_mime_type) = is_valid_file_type( file_name)?;
if !is_valid {
return Err(format!("Not a valid file type: {}", detected_mime_type).into());
}

let extension = file
.file_name
.as_deref()
.unwrap_or("")
.split('.')
.last()
.unwrap_or("tmp");


let mut final_output_path: PathBuf = original_path.clone();
let final_output_file = tempfile::NamedTempFile::new().unwrap();
let output_file = NamedTempFile::new().unwrap();
let output_path = output_file.path().to_path_buf();

if extension != "pdf" {
let new_path = original_path.with_extension(extension).clone();

std::fs::rename(&original_path, &new_path)?;

let input_path = new_path;


let result = convert_to_pdf(&input_path, &output_path).await;
final_output_path = final_output_file.path().to_path_buf();

match result {
Ok(_) => {
std::fs::copy(&output_path, &final_output_path).unwrap();
}
Err(e) => {
println!("PDF conversion failed: {:?}", e);
panic!("PDF conversion failed: {:?}", e);
}
}
}
let final_output_path: PathBuf = PathBuf::from(file.file.path());

let page_count = match Document::load(&final_output_path) {
Ok(doc) => doc.get_pages().len() as i32,
Err(e) => return Err(format!("Failed to get page count: {}", e).into()),
};
let page_count: i32 = 0;
let file_size = file.size;

let file_name = file
Expand All @@ -139,15 +74,6 @@ pub async fn create_task(
.unwrap_or("unknown.pdf")
.to_string();

let file_name = if file_name.ends_with(".pdf") {
file_name
} else {
format!(
"{}.pdf",
file_name.trim_end_matches(|c| c == '.' || char::is_alphanumeric(c))
)
};

let input_location = format!("s3://{}/{}/{}/{}", bucket_name, user_id, task_id, file_name);

let output_extension = model_internal.get_extension();
Expand All @@ -168,9 +94,9 @@ pub async fn create_task(
task_id, user_id, api_key, file_name, file_size,
page_count, segment_count, expires_at,
status, task_url, input_location, output_location, image_folder_location,
configuration, message
configuration, message, pdf_location, input_file_type
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17
) ON CONFLICT (task_id) DO NOTHING",
&[
&task_id,
Expand All @@ -188,11 +114,13 @@ pub async fn create_task(
&image_folder_location,
&configuration_json,
&message,
&None::<String>,
&None::<String>,
],
)
.await
{
Ok(_) => { }
Ok(_) => {}
Err(e) => {
println!("Error inserting task: {}", e);
if e.to_string().contains("usage limit exceeded") {
Expand Down Expand Up @@ -226,7 +154,7 @@ pub async fn create_task(
Ok(response) => {
println!("Presigned URL generated successfully");
Some(response)
},
}
Err(e) => {
println!("Error getting input file url: {}", e);
return Err("Error getting input file url".into());
Expand All @@ -246,6 +174,7 @@ pub async fn create_task(
configuration: configuration.clone(),
file_name: Some(file_name.to_string()),
page_count: Some(page_count),
pdf_location: None,
})
}
Err(e) => Err(e),
Expand Down
51 changes: 27 additions & 24 deletions chunkmydocs/src/utils/server/get_task.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use crate::models::server::extract::Configuration;
use crate::models::server::task::{ Status, TaskResponse };
use crate::models::server::segment::{ Chunk, SegmentType };
use crate::utils::db::deadpool_postgres::{ Client, Pool };
use crate::utils::storage::services::{ download_to_tempfile, generate_presigned_url };
use crate::models::server::segment::{Chunk, SegmentType};
use crate::models::server::task::{Status, TaskResponse};
use crate::utils::db::deadpool_postgres::{Client, Pool};
use crate::utils::storage::services::{download_to_tempfile, generate_presigned_url};
use aws_sdk_s3::Client as S3Client;
use chrono::{ DateTime, Utc };
use chrono::{DateTime, Utc};
use reqwest;
use serde_json;

pub async fn get_task(
pool: &Pool,
s3_client: &S3Client,
task_id: String,
user_id: String
user_id: String,
) -> Result<TaskResponse, Box<dyn std::error::Error>> {
let client: Client = pool.get().await?;
let task = client.query_one(
"SELECT * FROM TASKS WHERE task_id = $1 AND user_id = $2",
&[&task_id, &user_id]
).await?;
let task = client
.query_one(
"SELECT * FROM TASKS WHERE task_id = $1 AND user_id = $2",
&[&task_id, &user_id],
)
.await?;

let expires_at: Option<DateTime<Utc>> = task.get("expires_at");
if expires_at.is_some() && expires_at.unwrap() < Utc::now() {
Expand All @@ -30,7 +32,7 @@ pub async fn get_task(

pub async fn create_task_from_row(
row: &tokio_postgres::Row,
s3_client: &S3Client
s3_client: &S3Client,
) -> Result<TaskResponse, Box<dyn std::error::Error>> {
let task_id: String = row.get("task_id");
let status: Status = row
Expand All @@ -43,11 +45,11 @@ pub async fn create_task_from_row(
let message = row.get::<_, Option<String>>("message").unwrap_or_default();
let file_name = row.get::<_, Option<String>>("file_name");
let page_count = row.get::<_, Option<i32>>("page_count");

let pdf_location = row.get::<_, Option<String>>("pdf_location");
let input_location: String = row.get("input_location");
let input_file_url = generate_presigned_url(s3_client, &input_location, None).await.map_err(
|_| "Error getting input file url"
)?;
let input_file_url = generate_presigned_url(s3_client, &input_location, None)
.await
.map_err(|_| "Error getting input file url")?;

let output_location: String = row.get("output_location");
let output = if status == Status::Succeeded {
Expand Down Expand Up @@ -75,19 +77,16 @@ pub async fn create_task_from_row(
configuration,
file_name,
page_count,
pdf_location,
})
}

async fn process_output(
s3_client: &S3Client,
output_location: &str
output_location: &str,
) -> Result<Vec<Chunk>, Box<dyn std::error::Error>> {
let temp_file = download_to_tempfile(
s3_client,
&reqwest::Client::new(),
output_location,
None
).await?;
let temp_file =
download_to_tempfile(s3_client, &reqwest::Client::new(), output_location, None).await?;
let json_content: String = tokio::fs::read_to_string(temp_file.path()).await?;
let mut chunks: Vec<Chunk> = serde_json::from_str(&json_content)?;

Expand All @@ -97,8 +96,12 @@ async fn process_output(
let url = generate_presigned_url(s3_client, image, None).await.ok();
segment.image = url.clone();
if segment.segment_type == SegmentType::Picture {
segment.html = Some(format!("<img src=\"{}\" />", url.clone().unwrap_or_default()));
segment.markdown = Some(format!("![Image]({})", url.clone().unwrap_or_default()));
segment.html = Some(format!(
"<img src=\"{}\" />",
url.clone().unwrap_or_default()
));
segment.markdown =
Some(format!("![Image]({})", url.clone().unwrap_or_default()));
}
}
}
Expand Down
1 change: 0 additions & 1 deletion chunkmydocs/src/utils/server/validate_usage.rs

This file was deleted.

Loading

0 comments on commit 1a0e690

Please sign in to comment.