In this article, we’ll explore how to implement a scalable cloud DICOM-WEB service using Rust that supports multiple database systems, including MySQL, PostgreSQL, and MongoDB. We’ll focus on creating a flexible database interface that allows for seamless switching between different database backends.
use crate::dicom_dbprovider::{DbError, DbProvider};
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use mysql::prelude::*;
use mysql::*;
pub struct MySqlDbProvider {
db_connection_string: String,
}
impl MySqlDbProvider {
pub fn new(db_connection_string: String) -> Self {
MySqlDbProvider {
db_connection_string,
}
}
}
#[async_trait]
impl DbProvider for MySqlDbProvider {
async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES (
:tenant_id,
:patient_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:patient_name,
:patient_sex,
:patient_birth_date,
:patient_birth_time,
:patient_age,
:patient_size,
:patient_weight,
:study_date,
:study_time,
:accession_number,
:study_id,
:study_description,
:modality,
:series_number,
:series_date,
:series_time,
:series_description,
:body_part_examined,
:protocol_name,
:series_related_instances,
:created_time,
:updated_time
) ON DUPLICATE KEY UPDATE
patient_id = VALUES(patient_id),
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
patient_name = VALUES(patient_name),
patient_sex = VALUES(patient_sex),
patient_birth_date = VALUES(patient_birth_date),
patient_birth_time = VALUES(patient_birth_time),
patient_age = VALUES(patient_age),
patient_size = VALUES(patient_size),
patient_weight = VALUES(patient_weight),
study_date = VALUES(study_date),
study_time = VALUES(study_time),
accession_number = VALUES(accession_number),
study_id = VALUES(study_id),
study_description = VALUES(study_description),
modality = VALUES(modality),
series_number = VALUES(series_number),
series_date = VALUES(series_date),
series_time = VALUES(series_time),
series_description = VALUES(series_description),
body_part_examined = VALUES(body_part_examined),
protocol_name = VALUES(protocol_name),
series_related_instances = VALUES(series_related_instances),
updated_time = VALUES(updated_time)
"#;
conn.exec_drop(
query,
params! {
"tenant_id" => &state_meta.tenant_id,
"patient_id" => &state_meta.patient_id,
"study_uid" => &state_meta.study_uid,
"series_uid" => &state_meta.series_uid,
"study_uid_hash" => &state_meta.study_uid_hash,
"series_uid_hash" => &state_meta.series_uid_hash,
"study_date_origin" => &state_meta.study_date_origin,
"patient_name" => &state_meta.patient_name,
"patient_sex" => &state_meta.patient_sex,
"patient_birth_date" => &state_meta.patient_birth_date,
"patient_birth_time" => &state_meta.patient_birth_time,
"patient_age" => &state_meta.patient_age,
"patient_size" => &state_meta.patient_size,
"patient_weight" => &state_meta.patient_weight,
"study_date" => &state_meta.study_date,
"study_time" => &state_meta.study_time,
"accession_number" => &state_meta.accession_number,
"study_id" => &state_meta.study_id,
"study_description" => &state_meta.study_description,
"modality" => &state_meta.modality,
"series_number" => &state_meta.series_number,
"series_date" => &state_meta.series_date,
"series_time" => &state_meta.series_time,
"series_description" => &state_meta.series_description,
"body_part_examined" => &state_meta.body_part_examined,
"protocol_name" => &state_meta.protocol_name,
"series_related_instances" => &state_meta.series_related_instances,
"created_time" => &state_meta.created_time,
"updated_time" => &state_meta.updated_time,
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(())
}
async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> {
if state_meta_list.is_empty() {
return Ok(());
}
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
conn.query_drop("START TRANSACTION")
.map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;
let query = r#"
INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES (
:tenant_id,
:patient_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:patient_name,
:patient_sex,
:patient_birth_date,
:patient_birth_time,
:patient_age,
:patient_size,
:patient_weight,
:study_date,
:study_time,
:accession_number,
:study_id,
:study_description,
:modality,
:series_number,
:series_date,
:series_time,
:series_description,
:body_part_examined,
:protocol_name,
:series_related_instances,
:created_time,
:updated_time
) ON DUPLICATE KEY UPDATE
patient_id = VALUES(patient_id),
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
patient_name = VALUES(patient_name),
patient_sex = VALUES(patient_sex),
patient_birth_date = VALUES(patient_birth_date),
patient_birth_time = VALUES(patient_birth_time),
patient_age = VALUES(patient_age),
patient_size = VALUES(patient_size),
patient_weight = VALUES(patient_weight),
study_date = VALUES(study_date),
study_time = VALUES(study_time),
accession_number = VALUES(accession_number),
study_id = VALUES(study_id),
study_description = VALUES(study_description),
modality = VALUES(modality),
series_number = VALUES(series_number),
series_date = VALUES(series_date),
series_time = VALUES(series_time),
series_description = VALUES(series_description),
body_part_examined = VALUES(body_part_examined),
protocol_name = VALUES(protocol_name),
series_related_instances = VALUES(series_related_instances),
updated_time = VALUES(updated_time)
"#;
for state_meta in state_meta_list {
let result = conn.exec_drop(
query,
params! {
"tenant_id" => &state_meta.tenant_id,
"patient_id" => &state_meta.patient_id,
"study_uid" => &state_meta.study_uid,
"series_uid" => &state_meta.series_uid,
"study_uid_hash" => &state_meta.study_uid_hash,
"series_uid_hash" => &state_meta.series_uid_hash,
"study_date_origin" => &state_meta.study_date_origin,
"patient_name" => &state_meta.patient_name,
"patient_sex" => &state_meta.patient_sex,
"patient_birth_date" => &state_meta.patient_birth_date,
"patient_birth_time" => &state_meta.patient_birth_time,
"patient_age" => &state_meta.patient_age,
"patient_size" => &state_meta.patient_size,
"patient_weight" => &state_meta.patient_weight,
"study_date" => &state_meta.study_date,
"study_time" => &state_meta.study_time,
"accession_number" => &state_meta.accession_number,
"study_id" => &state_meta.study_id,
"study_description" => &state_meta.study_description,
"modality" => &state_meta.modality,
"series_number" => &state_meta.series_number,
"series_date" => &state_meta.series_date,
"series_time" => &state_meta.series_time,
"series_description" => &state_meta.series_description,
"body_part_examined" => &state_meta.body_part_examined,
"protocol_name" => &state_meta.protocol_name,
"series_related_instances" => &state_meta.series_related_instances,
"created_time" => &state_meta.created_time,
"updated_time" => &state_meta.updated_time,
},
);
if let Err(e) = result {
conn.query_drop("ROLLBACK").map_err(|rollback_err| {
DbError::DatabaseError(format!(
"Failed to rollback transaction after error {}: {}",
e, rollback_err
))
})?;
return Err(DbError::DatabaseError(format!(
"Failed to execute query for state meta: {}",
e
)));
}
}
conn.query_drop("COMMIT")
.map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;
Ok(())
}
async fn save_json_list(
&self,
json_meta_list: &[DicomJsonMeta],
) -> std::result::Result<(), DbError> {
if json_meta_list.is_empty() {
return Ok(());
}
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
conn.query_drop("START TRANSACTION")
.map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;
let query = r#"
INSERT INTO dicom_json_meta (
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
created_time,
flag_time,
json_status,
retry_times
) VALUES (
:tenant_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:created_time,
:flag_time,
:json_status,
:retry_times
) ON DUPLICATE KEY UPDATE
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
created_time = VALUES(created_time),
flag_time = VALUES(flag_time),
json_status = VALUES(json_status),
retry_times = VALUES(retry_times)
"#;
for json_meta in json_meta_list {
let result = conn.exec_drop(
query,
params! {
"tenant_id" => &json_meta.tenant_id,
"study_uid" => &json_meta.study_uid,
"series_uid" => &json_meta.series_uid,
"study_uid_hash" => &json_meta.study_uid_hash,
"series_uid_hash" => &json_meta.series_uid_hash,
"study_date_origin" => &json_meta.study_date_origin,
"created_time" => &json_meta.created_time,
"flag_time" => &json_meta.flag_time,
"json_status" => &json_meta.json_status,
"retry_times" => &json_meta.retry_times,
},
);
if let Err(e) = result {
conn.query_drop("ROLLBACK").map_err(|rollback_err| {
DbError::DatabaseError(format!(
"Failed to rollback transaction after error {}: {}",
e, rollback_err
))
})?;
return Err(DbError::DatabaseError(format!(
"Failed to execute query for json meta: {}",
e
)));
}
}
conn.query_drop("COMMIT")
.map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;
Ok(())
}
async fn get_state_metaes(
&self,
tenant_id: &str,
study_uid: &str,
) -> Result<Vec<DicomStateMeta>, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
SELECT
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
FROM dicom_state_meta
WHERE tenant_id = :tenant_id AND study_uid = :study_uid
"#;
let result: Vec<DicomStateMeta> = conn
.exec_map(
query,
params! {
"tenant_id" => tenant_id,
"study_uid" => study_uid,
},
|row: mysql::Row| {
DicomStateMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
patient_id: row.get("patient_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
patient_name: row.get("patient_name").unwrap_or_default(),
patient_sex: row.get("patient_sex").unwrap_or_default(),
patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
patient_age: row.get("patient_age").unwrap_or_default(),
patient_size: row.get("patient_size").unwrap_or_default(),
patient_weight: row.get("patient_weight").unwrap_or_default(),
study_date: row.get("study_date").unwrap_or_default(),
study_time: row.get("study_time").unwrap_or_default(),
accession_number: row.get("accession_number").unwrap_or_default(),
study_id: row.get("study_id").unwrap_or_default(),
study_description: row.get("study_description").unwrap_or_default(),
modality: row.get("modality").unwrap_or_default(),
series_number: row.get("series_number").unwrap_or_default(),
series_date: row.get("series_date").unwrap_or_default(),
series_time: row.get("series_time").unwrap_or_default(),
series_description: row.get("series_description").unwrap_or_default(),
body_part_examined: row.get("body_part_examined").unwrap_or_default(),
protocol_name: row.get("protocol_name").unwrap_or_default(),
series_related_instances: row
.get("series_related_instances")
.unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
updated_time: row.get("updated_time").unwrap_or_default(),
}
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(result)
}
async fn get_json_metaes(
&self,
end_time: chrono::NaiveDateTime,
) -> std::result::Result<Vec<DicomStateMeta>, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
Select tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
From (SELECT dsm.*
FROM dicom_state_meta dsm
LEFT JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE djm.tenant_id IS NULL AND dsm.updated_time < ?
UNION ALL
SELECT dsm.*
FROM dicom_state_meta dsm
INNER JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE dsm.updated_time != djm.flag_time
AND dsm.updated_time < ?
) AS t
order by t.updated_time asc limit 10;
"#;
let result: Vec<DicomStateMeta> = conn
.exec_map(query, params! { end_time,end_time }, |row: mysql::Row| {
DicomStateMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
patient_id: row.get("patient_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
patient_name: row.get("patient_name").unwrap_or_default(),
patient_sex: row.get("patient_sex").unwrap_or_default(),
patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
patient_age: row.get("patient_age").unwrap_or_default(),
patient_size: row.get("patient_size").unwrap_or_default(),
patient_weight: row.get("patient_weight").unwrap_or_default(),
study_date: row.get("study_date").unwrap_or_default(),
study_time: row.get("study_time").unwrap_or_default(),
accession_number: row.get("accession_number").unwrap_or_default(),
study_id: row.get("study_id").unwrap_or_default(),
study_description: row.get("study_description").unwrap_or_default(),
modality: row.get("modality").unwrap_or_default(),
series_number: row.get("series_number").unwrap_or_default(),
series_date: row.get("series_date").unwrap_or_default(),
series_time: row.get("series_time").unwrap_or_default(),
series_description: row.get("series_description").unwrap_or_default(),
body_part_examined: row.get("body_part_examined").unwrap_or_default(),
protocol_name: row.get("protocol_name").unwrap_or_default(),
series_related_instances: row
.get("series_related_instances")
.unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
updated_time: row.get("updated_time").unwrap_or_default(),
}
})
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(result)
}
async fn get_json_meta(
&self,
tenant_id: &str,
study_uid: &str,
series_uid: &str,
) -> std::result::Result<DicomJsonMeta, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
SELECT
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
created_time,
flag_time,
json_status,
retry_times
FROM dicom_json_meta
WHERE series_uid = :series_uid and tenant_id = :tenant_id and study_uid = :study_uid
"#;
let result: Option<DicomJsonMeta> = conn
.exec_first(
query,
params! {
"series_uid" => series_uid,
"tenant_id" => tenant_id,
"study_uid" => study_uid,
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?
.map(|row: mysql::Row| DicomJsonMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
flag_time: row.get("flag_time").unwrap_or_default(),
json_status: row.get("json_status").unwrap_or_default(),
retry_times: row.get("retry_times").unwrap_or_default(),
});
match result {
Some(json_meta) => Ok(json_meta),
None => Err(DbError::DatabaseError(format!(
"DicomJsonMeta with series_uid {} not found",
series_uid
))),
}
}
}