Reimplement mapping scripts (step-01, step-02) in Rust.
step-01 performs W3C Direct Mapping from MariaDB to Turtle using sophia. step-02 applies SPARQL UPDATE queries using oxigraph.
This commit is contained in:
parent
cd85a66c46
commit
34754cc68a
5 changed files with 2268 additions and 1 deletions
1902
Cargo.lock
generated
Normal file
1902
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
18
Cargo.toml
Normal file
18
Cargo.toml
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "mapping"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "step-01"
|
||||
path = "src/map/step_01.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "step-02"
|
||||
path = "src/map/step_02.rs"
|
||||
|
||||
[dependencies]
|
||||
sophia = "0.9"
|
||||
oxigraph = "*"
|
||||
mysql = "*"
|
||||
urlencoding = "*"
|
||||
32
README.md
32
README.md
|
|
@ -2,6 +2,36 @@
|
|||
|
||||
To generate a knowledge graph about migrants in the theatre in Europe.
|
||||
|
||||
## Running the scripts
|
||||
|
||||
The mapping scripts have been reimplemented in Rust for faster execution. Both
|
||||
scripts must be run from this directory (`mapping/`).
|
||||
|
||||
**Prerequisites:** Start the MariaDB container before running step 1:
|
||||
|
||||
```sh
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
**Step 1** — Direct Mapping from MariaDB to RDF (`data/graph-01.ttl`):
|
||||
|
||||
```sh
|
||||
cargo run --release --bin step-01
|
||||
```
|
||||
|
||||
**Step 2** — Apply SPARQL UPDATE queries (`data/graph-02.ttl`):
|
||||
|
||||
```sh
|
||||
cargo run --release --bin step-02
|
||||
```
|
||||
|
||||
Alternatively, after installing with `cargo install --path .`:
|
||||
|
||||
```sh
|
||||
step-01
|
||||
step-02
|
||||
```
|
||||
|
||||
## Generating the ontology
|
||||
|
||||
Next there are set of steps describing how to generate the migrants RDF graph.
|
||||
|
|
@ -110,4 +140,4 @@ To run:
|
|||
|
||||
```sh
|
||||
bundle exec ruby map/step-02.rb
|
||||
```
|
||||
```
|
||||
|
|
|
|||
233
src/map/step_01.rs
Normal file
233
src/map/step_01.rs
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
/// Step 1: Direct Mapping from relational database to RDF.
|
||||
///
|
||||
/// Connects to MariaDB and produces `data/graph-01.ttl` following the
|
||||
/// W3C Direct Mapping specification.
|
||||
///
|
||||
/// Usage: Run from the mapping project directory:
|
||||
/// cargo run --release --bin step-01
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io::BufWriter;
|
||||
|
||||
use mysql::prelude::*;
|
||||
use mysql::*;
|
||||
use sophia::api::ns::{rdf, xsd};
|
||||
use sophia::api::prelude::*;
|
||||
use sophia::api::prefix::{Prefix, PrefixMapPair};
|
||||
use sophia::api::term::SimpleTerm;
|
||||
use sophia::turtle::serializer::turtle::{TurtleConfig, TurtleSerializer};
|
||||
|
||||
type MyGraph = Vec<[SimpleTerm<'static>; 3]>;
|
||||
|
||||
const BASE_IRI: &str = "http://example.org/migrants/";
|
||||
|
||||
/// Primary keys for each table.
|
||||
const PRIMARY_KEYS: &[(&str, &str)] = &[
|
||||
("location", "IDLocation"),
|
||||
("migration_table", "IDMig"),
|
||||
("organisation", "IDOrganisation"),
|
||||
("person", "IDPerson"),
|
||||
("person_profession", "IDProfPerson"),
|
||||
("personnames", "IDPersonname"),
|
||||
("relationship", "IDRel"),
|
||||
("religions", "IDReligion"),
|
||||
("work", "IDWork"),
|
||||
];
|
||||
|
||||
/// Foreign key definitions: (table, column, referenced_table).
|
||||
const FOREIGN_KEYS: &[(&str, &str, &str)] = &[
|
||||
("migration_table", "IDPerson", "person"),
|
||||
("migration_table", "IDStartPlace", "location"),
|
||||
("migration_table", "IDDestPlace", "location"),
|
||||
("organisation", "IDLocation", "location"),
|
||||
("person", "IDBirthPlace", "location"),
|
||||
("person", "IDDeathPlace", "location"),
|
||||
("personnames", "IDPerson", "person"),
|
||||
("person_profession", "IDPerson", "person"),
|
||||
("relationship", "IDPerson_active", "person"),
|
||||
("relationship", "IDPerson_passive", "person"),
|
||||
("relationship", "IDLocation", "location"),
|
||||
("relationship", "IDOrganisation", "organisation"),
|
||||
("religions", "IDPerson", "person"),
|
||||
("work", "IDPerson", "person"),
|
||||
("work", "IDLocation", "location"),
|
||||
("work", "IDOrganisation", "organisation"),
|
||||
("work", "IDOrganisation2", "organisation"),
|
||||
];
|
||||
|
||||
fn build_fk_map() -> HashMap<(&'static str, &'static str), &'static str> {
|
||||
let mut map = HashMap::new();
|
||||
for &(table, col, ref_table) in FOREIGN_KEYS {
|
||||
map.insert((table, col), ref_table);
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
fn row_iri(table: &str, pk_value: &str) -> SimpleTerm<'static> {
|
||||
let encoded = urlencoding::encode(pk_value);
|
||||
SimpleTerm::Iri(
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
format!("{}{}/{}", BASE_IRI, table, encoded).into(),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn class_iri(table: &str) -> SimpleTerm<'static> {
|
||||
SimpleTerm::Iri(
|
||||
sophia::api::term::IriRef::new_unchecked(format!("{}{}", BASE_IRI, table).into()),
|
||||
)
|
||||
}
|
||||
|
||||
fn column_iri(table: &str, column: &str) -> SimpleTerm<'static> {
|
||||
SimpleTerm::Iri(
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
format!("{}{}#{}", BASE_IRI, table, column).into(),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn ref_iri(table: &str, fk_col: &str) -> SimpleTerm<'static> {
|
||||
SimpleTerm::Iri(
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
format!("{}{}#ref-{}", BASE_IRI, table, fk_col).into(),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn rdf_type_term() -> SimpleTerm<'static> {
|
||||
SimpleTerm::Iri(sophia::api::term::IriRef::new_unchecked(
|
||||
rdf::type_.iri().unwrap().as_str().to_string().into(),
|
||||
))
|
||||
}
|
||||
|
||||
fn to_rdf_literal(value: &Value) -> Option<SimpleTerm<'static>> {
|
||||
match value {
|
||||
Value::NULL => None,
|
||||
Value::Int(i) => Some(SimpleTerm::LiteralDatatype(
|
||||
i.to_string().into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::integer.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
)),
|
||||
Value::UInt(u) => Some(SimpleTerm::LiteralDatatype(
|
||||
u.to_string().into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::integer.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
)),
|
||||
Value::Float(f) => Some(SimpleTerm::LiteralDatatype(
|
||||
f.to_string().into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::double.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
)),
|
||||
Value::Double(d) => Some(SimpleTerm::LiteralDatatype(
|
||||
d.to_string().into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::double.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
)),
|
||||
Value::Date(year, month, day, _, _, _, _) => Some(SimpleTerm::LiteralDatatype(
|
||||
format!("{:04}-{:02}-{:02}", year, month, day).into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::date.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
)),
|
||||
Value::Bytes(b) => {
|
||||
let s = String::from_utf8_lossy(b).into_owned();
|
||||
Some(SimpleTerm::LiteralDatatype(
|
||||
s.into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::string.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
))
|
||||
}
|
||||
_ => {
|
||||
// Treat everything else as a plain string literal
|
||||
let s: String = from_value(value.clone());
|
||||
Some(SimpleTerm::LiteralDatatype(
|
||||
s.into(),
|
||||
sophia::api::term::IriRef::new_unchecked(
|
||||
xsd::string.iri().unwrap().as_str().to_string().into(),
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let fk_map = build_fk_map();
|
||||
|
||||
let url = "mysql://migrants:1234@127.0.0.1:3306/migrants";
|
||||
let pool = Pool::new(url)?;
|
||||
let mut conn = pool.get_conn()?;
|
||||
|
||||
let mut graph = MyGraph::new();
|
||||
let rdf_type = rdf_type_term();
|
||||
|
||||
for &(table, pk_col) in PRIMARY_KEYS {
|
||||
let query = format!("SELECT * FROM `{}`", table);
|
||||
let rows: Vec<Row> = conn.query(query)?;
|
||||
|
||||
for row in &rows {
|
||||
let pk_value: String = row
|
||||
.get::<Value, _>(pk_col)
|
||||
.map(|v| from_value::<String>(v))
|
||||
.unwrap_or_default();
|
||||
let subject = row_iri(table, &pk_value);
|
||||
|
||||
// rdf:type
|
||||
graph.push([subject.clone(), rdf_type.clone(), class_iri(table)]);
|
||||
|
||||
let columns = row.columns_ref();
|
||||
for (i, col_def) in columns.iter().enumerate() {
|
||||
let col_name = col_def.name_str().to_string();
|
||||
let value: Value = row.get::<Value, _>(i).unwrap_or(Value::NULL);
|
||||
|
||||
if value == Value::NULL {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(&ref_table) = fk_map.get(&(table, col_name.as_str())) {
|
||||
// Foreign key -> object property
|
||||
let fk_value: String = from_value(value);
|
||||
graph.push([
|
||||
subject.clone(),
|
||||
ref_iri(table, &col_name),
|
||||
row_iri(ref_table, &fk_value),
|
||||
]);
|
||||
} else {
|
||||
// Regular column -> datatype property
|
||||
if let Some(literal) = to_rdf_literal(&value) {
|
||||
graph.push([subject.clone(), column_iri(table, &col_name), literal]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let output_path = "data/graph-01.ttl";
|
||||
fs::create_dir_all("data")?;
|
||||
let file = fs::File::create(output_path)?;
|
||||
let writer = BufWriter::new(file);
|
||||
|
||||
let mut prefix_map: Vec<PrefixMapPair> = TurtleConfig::default_prefix_map();
|
||||
prefix_map.push((
|
||||
Prefix::new_unchecked("rdf".into()),
|
||||
Iri::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#".into()),
|
||||
));
|
||||
prefix_map.push((
|
||||
Prefix::new_unchecked("xsd".into()),
|
||||
Iri::new_unchecked("http://www.w3.org/2001/XMLSchema#".into()),
|
||||
));
|
||||
let config = TurtleConfig::new().with_own_prefix_map(prefix_map);
|
||||
|
||||
let mut serializer = TurtleSerializer::new_with_config(writer, config);
|
||||
serializer.serialize_graph(&graph)?;
|
||||
|
||||
let count = graph.len();
|
||||
eprintln!("Written {} triples to {}", count, output_path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
84
src/map/step_02.rs
Normal file
84
src/map/step_02.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
/// Step 2: Apply SPARQL UPDATE queries to transform the RDF graph.
|
||||
///
|
||||
/// Loads `data/graph-01.ttl`, applies all SPARQL UPDATE queries from the
|
||||
/// `updates/` directory (sorted alphabetically), and writes the result
|
||||
/// to `data/graph-02.ttl`.
|
||||
///
|
||||
/// Usage: Run from the mapping project directory:
|
||||
/// cargo run --release --bin step-02
|
||||
|
||||
use std::fs;
|
||||
|
||||
use oxigraph::io::{RdfFormat, RdfParser};
|
||||
use oxigraph::model::GraphNameRef;
|
||||
use oxigraph::store::Store;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let input_path = "data/graph-01.ttl";
|
||||
let output_path = "data/graph-02.ttl";
|
||||
let updates_dir = "updates";
|
||||
|
||||
// Create in-memory store and load input graph
|
||||
let store = Store::new()?;
|
||||
|
||||
eprintln!("Loading graph from {}...", input_path);
|
||||
let input = fs::File::open(input_path)?;
|
||||
let reader = std::io::BufReader::new(input);
|
||||
let parser = RdfParser::from_format(RdfFormat::Turtle)
|
||||
.without_named_graphs()
|
||||
.with_default_graph(GraphNameRef::DefaultGraph);
|
||||
store.load_from_reader(parser, reader)?;
|
||||
|
||||
let initial_count = count_triples(&store);
|
||||
eprintln!("Loaded {} triples.", initial_count);
|
||||
|
||||
// Read and sort SPARQL UPDATE files
|
||||
let mut update_files: Vec<_> = fs::read_dir(updates_dir)?
|
||||
.filter_map(|e| e.ok())
|
||||
.map(|e| e.path())
|
||||
.filter(|p| {
|
||||
p.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.map_or(false, |e| e == "rq")
|
||||
})
|
||||
.collect();
|
||||
update_files.sort();
|
||||
|
||||
// Apply each SPARQL UPDATE query
|
||||
for query_file in &update_files {
|
||||
let query = fs::read_to_string(query_file)?;
|
||||
let name = query_file
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
let before = count_triples(&store);
|
||||
store.update(&query)?;
|
||||
let after = count_triples(&store);
|
||||
|
||||
let diff = after as i64 - before as i64;
|
||||
let sign = if diff >= 0 { "+" } else { "" };
|
||||
eprintln!(
|
||||
"Applied {}: {} -> {} triples ({}{})",
|
||||
name, before, after, sign, diff
|
||||
);
|
||||
}
|
||||
|
||||
let final_count = count_triples(&store);
|
||||
eprintln!("Writing {} triples to {}...", final_count, output_path);
|
||||
|
||||
// Dump store to Turtle
|
||||
fs::create_dir_all("data")?;
|
||||
let output = fs::File::create(output_path)?;
|
||||
let writer = std::io::BufWriter::new(output);
|
||||
store.dump_graph_to_writer(GraphNameRef::DefaultGraph, RdfFormat::Turtle, writer)?;
|
||||
|
||||
eprintln!("Done.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn count_triples(store: &Store) -> usize {
|
||||
store
|
||||
.quads_for_pattern(None, None, None, None)
|
||||
.count()
|
||||
}
|
||||
Loading…
Reference in a new issue