Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
handle possible error state
Browse files Browse the repository at this point in the history
  • Loading branch information
oppiliappan committed Jan 2, 2024
1 parent fe85a43 commit cf57724
Showing 1 changed file with 44 additions and 20 deletions.
64 changes: 44 additions & 20 deletions server/bleep/src/indexes/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ struct SyncHandle {
name: Option<String>,
favicon: Option<String>,
description: Option<String>,
join_handle: tokio::task::JoinHandle<()>,
status: &'static str,
join_handle: Option<tokio::task::JoinHandle<Result<(), Error>>>,
progress_stream: tokio::sync::watch::Receiver<Progress>,
}

static STATUS_DONE: &str = "done";
static STATUS_INDEXING: &str = "indexing";
static STATUS_ERROR: &str = "error";

#[derive(serde::Serialize)]
pub struct SqlRecord {
Expand All @@ -60,6 +62,7 @@ pub struct SqlRecord {
pub enum Progress {
Init(i64),
SetMetadata,
Err(String),
Update(Update),
Done(i64),
}
Expand Down Expand Up @@ -295,21 +298,21 @@ impl Doc {
let (tx, rx) = tokio::sync::watch::channel(Progress::Init(id));
let self_ = self.clone();
let url_ = url.clone();
let join_handle = tokio::task::spawn(async move {
self_.begin_sync_job(id, url_, tx, transaction).await; // TODO: handle err
()
});

let mut lock = self.index_queue.write().await;
lock.push(SyncHandle {
let sync_handle = SyncHandle {
id,
url: url.to_string(),
name: None,
favicon: None,
description: None,
join_handle,
status: STATUS_INDEXING,
join_handle: None,
progress_stream: rx,
});
};
lock.push(sync_handle);
lock.last_mut().unwrap().join_handle = Some(tokio::task::spawn(async move {
self_.begin_sync_job(id, url_, tx, transaction).await
}));

Ok(id)
}
Expand Down Expand Up @@ -352,9 +355,23 @@ impl Doc {
.fetch_optional(&mut transaction)
.await?
.ok_or(Error::InvalidDocId(id))?;
error!(doc_source = url.as_str(), "no docs found at url");
error!(doc_source = url.as_str(), %id, "no docs found at url");

let error = Error::EmptyDocs(url);

// send error down the stream
let _ = tx.send(Progress::Err(error.to_string()));

// send job status to error
self.index_queue
.write()
.await
.iter_mut()
.find(|job| job.id == id)
.map(|job| job.status = STATUS_ERROR);

// return error
Err(Error::EmptyDocs(url))?;
Err(error)?;
}

self.set_index_status(STATUS_DONE, id, &mut transaction)
Expand Down Expand Up @@ -393,7 +410,11 @@ impl Doc {
.iter()
.find(|handle| handle.id == id)
.ok_or(Error::InvalidDocId(id))?;
handle.join_handle.abort();
handle
.join_handle
.as_ref()
.ok_or(Error::InvalidDocId(id))?
.abort();
drop(lock);

// remove handle from queue
Expand All @@ -405,7 +426,7 @@ impl Doc {
// - if this is a sync job: this rolls back to before the sync started
// - if this is a resync job: this rolls back to before the old copy was deleted
match self.index_writer.lock().await.rollback() {
Ok(_) => info!(%id, "successfully cancelled sync job"),
Ok(_) => info!(%id, "successfully cancelled sync job, rolled back to old copy"),
Err(e) => error!(%id, %e, "tantivy rollback failed"),
};

Expand All @@ -429,6 +450,9 @@ impl Doc {
url::Url::parse(&record.url).map_err(|e| Error::UrlParse(record.url.clone(), e))?;

// delete old docs from tantivy
//
// create a checkpoint before deletion, so we can revert to here if the job is cancelled
self.index_writer.lock().await.commit();
self.index_writer
.lock()
.await
Expand All @@ -447,21 +471,21 @@ impl Doc {
let transaction = self.sql.begin().await?;
let self_ = self.clone();
let url_ = url.clone();
let join_handle = tokio::task::spawn(async move {
self_.begin_sync_job(id, url_, tx, transaction).await; // TODO: handle err
()
});

let mut lock = self.index_queue.write().await;
lock.push(SyncHandle {
id,
url: url.to_string(),
name: record.name.clone(),
favicon: record.favicon.clone(),
status: STATUS_INDEXING,
description: record.description.clone(),
join_handle,
join_handle: None,
progress_stream: rx,
});
lock.last_mut().unwrap().join_handle = Some(tokio::task::spawn(async move {
self_.begin_sync_job(id, url_, tx, transaction).await
}));

Ok(id)
}
Expand Down Expand Up @@ -513,7 +537,7 @@ impl Doc {
name: job.name.clone(),
favicon: job.favicon.clone(),
description: job.description.clone(),
index_status: STATUS_INDEXING.to_owned(),
index_status: job.status.to_string(),
modified_at: chrono::Utc::now().naive_local(),
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -543,7 +567,7 @@ impl Doc {
name: job.name.clone(),
favicon: job.favicon.clone(),
description: job.description.clone(),
index_status: STATUS_INDEXING.to_owned(),
index_status: job.status.to_string(),
modified_at: chrono::Utc::now().naive_local(),
});

Expand Down

0 comments on commit cf57724

Please sign in to comment.