Skip to content

Commit

Permalink
Subgraph composition: spec version
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jan 31, 2025
1 parent e61452f commit c8d8103
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 27 deletions.
20 changes: 18 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,25 @@ impl EntityCache {
};

// Always test the cache consistency in debug mode. The test only
// makes sense when we were actually asked to read from the store
// makes sense when we were actually asked to read from the store.
// We need to remove the VID as the one from the DB might come from
// a legacy subgraph that has VID autoincremented while this trait
// always creates it in a new style.
debug_assert!(match scope {
GetScope::Store => entity == self.store.get(key).unwrap().map(Arc::new),
GetScope::Store => {
// Release build will never call this function and hence it's OK
// when that implementation is not correct.
fn remove_vid(entity: Option<Arc<Entity>>) -> Option<Entity> {
entity.map(|e| {
#[allow(unused_mut)]
let mut entity = (*e).clone();
#[cfg(debug_assertions)]
entity.remove("vid");
entity
})
}
remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new))
}
GetScope::InBlock => true,
});

Expand Down
8 changes: 8 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ impl EntityType {
pub fn is_object_type(&self) -> bool {
self.schema.is_object_type(self.atom)
}

// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub fn strict_vid_order(&self) -> bool {
// Currently the agregations entities don't have VIDs in insertion order
self.schema.strict_vid_order() && self.is_object_type()
}
}

impl fmt::Display for EntityType {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data::graphql::{DirectiveExt, DocumentExt, ObjectTypeExt, TypeExt, Va
use crate::data::store::{
self, EntityValidationError, IdType, IntoEntityIterator, TryIntoEntityIterator, ValueType, ID,
};
use crate::data::subgraph::SPEC_VERSION_1_3_0;
use crate::data::value::Word;
use crate::derive::CheapClone;
use crate::prelude::q::Value;
Expand Down Expand Up @@ -955,6 +956,7 @@ pub struct Inner {
pool: Arc<AtomPool>,
/// A list of all timeseries types by interval
agg_mappings: Box<[AggregationMapping]>,
spec_version: Version,
}

impl InputSchema {
Expand Down Expand Up @@ -1042,6 +1044,7 @@ impl InputSchema {
enum_map,
pool,
agg_mappings,
spec_version: spec_version.clone(),
}),
})
}
Expand Down Expand Up @@ -1585,6 +1588,10 @@ impl InputSchema {
}?;
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

pub fn strict_vid_order(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
}

/// Create a new pool that contains the names of all the types defined
Expand Down
11 changes: 7 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use diesel::{
sql_query,
sql_types::{Nullable, Text},
};
use graph::semver::Version;
use graph::{
blockchain::block_stream::FirehoseCursor,
data::subgraph::schema::SubgraphError,
Expand Down Expand Up @@ -305,11 +306,13 @@ pub fn debug_fork(

pub fn schema(conn: &mut PgConnection, site: &Site) -> Result<(InputSchema, bool), StoreError> {
use subgraph_manifest as sm;
let (s, use_bytea_prefix) = sm::table
.select((sm::schema, sm::use_bytea_prefix))
let (s, spec_ver, use_bytea_prefix) = sm::table
.select((sm::schema, sm::spec_version, sm::use_bytea_prefix))
.filter(sm::id.eq(site.id))
.first::<(String, bool)>(conn)?;
InputSchema::parse_latest(s.as_str(), site.deployment.clone())
.first::<(String, String, bool)>(conn)?;
let spec_version =
Version::parse(spec_ver.as_str()).map_err(|err| StoreError::Unknown(err.into()))?;
InputSchema::parse(&spec_version, s.as_str(), site.deployment.clone())
.map_err(StoreError::Unknown)
.map(|schema| (schema, use_bytea_prefix))
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
let vid_type = if self.object.strict_vid_order() {
"bigint"
} else {
"bigserial"
Expand Down
7 changes: 3 additions & 4 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl TablePair {

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.is_object_type();
let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -253,9 +252,9 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
// Make sure the vid sequence continues from where it was in case
// that we use autoincrementing order of the DB
if !self.src.object.strict_vid_order() {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
Expand Down
12 changes: 6 additions & 6 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.is_object_type();
let strict_vid_order = self.table.object.strict_vid_order();

// Construct a query
// insert into schema.table(column, ...)
Expand All @@ -2404,7 +2404,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}
out.push_sql(") values\n");
Expand All @@ -2424,7 +2424,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
Expand Down Expand Up @@ -4827,7 +4827,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.is_object_type();
let strict_vid_order = self.src.object.strict_vid_order();

// Construct a query
// insert into {dst}({columns})
Expand All @@ -4849,7 +4849,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down Expand Up @@ -4917,7 +4917,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down
4 changes: 2 additions & 2 deletions store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn create_subgraph(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: BTreeSet::new(),
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn create_test_subgraph_with_features(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features,
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: LOAD_RELATED_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/graft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
.expect("Failed to parse user schema");
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn create_subgraph() {

let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id,
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -547,7 +547,7 @@ fn subgraph_features() {
} = get_subgraph_features(id.to_string()).unwrap();

assert_eq!(NAME, subgraph_id.as_str());
assert_eq!("1.0.0", spec_version);
assert_eq!("1.3.0", spec_version);
assert_eq!("1.0.0", api_version.unwrap());
assert_eq!(NETWORK_NAME, network);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lazy_static! {
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down

0 comments on commit c8d8103

Please sign in to comment.