Overview In this article, we’ll explore how to implement a scalable cloud DICOM-WEB service using Rust that supports multiple database systems, including MySQL, PostgreSQL, and MongoDB. We’ll focus on creating a flexible database interface that allows for seamless switching between different database backends.
Prerequisites: Interface Definition (Traits) First, let’s define our database provider interface using Rust traits:
dicom_dbprovider.rs use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta}; use async_trait::async_trait; use thiserror::Error; #[derive(Error, Debug)] pub enum DbError { #[error("Database operation failed: {0}")] DatabaseError(String), #[error("Data record does not exist: {0}")] RecordNotExists(String), #[error("Record already exists")] AlreadyExists, #[error("Entity extraction failed: {0}")] ExtractionFailed(String), #[error("Transaction failed: {0}")] TransactionFailed(String), } pub fn current_time() -> chrono::NaiveDateTime { chrono::Local::now().naive_local() } #[async_trait] pub trait DbProvider: Send + Sync { async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError>; async fn save_state_list(&self, state_meta: &[DicomStateMeta]) -> Result<(), DbError>; async fn save_json_list(&self, state_meta: &[DicomJsonMeta]) -> Result<(), DbError>; async fn get_state_metaes( &self, tenant_id: &str, study_uid: &str, ) -> Result<Vec<DicomStateMeta>, DbError>; /* * Get metadata series information that needs to generate JSON format. * end_time: cutoff time. */ async fn get_json_metaes(&self, end_time: chrono::NaiveDateTime) -> Result<Vec<DicomStateMeta>, DbError>; async fn get_json_meta(&self, tenant_id:&str, study_uid: &str, series_uid: &str)->Result<DicomJsonMeta, DbError>; } Implementation 3.1 MySQL Database Implementation dicom_mysql.rs use crate::dicom_dbprovider::{DbError, DbProvider}; use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta}; use async_trait::async_trait; use mysql::prelude::*; use mysql::*; pub struct MySqlDbProvider { db_connection_string: String, } impl MySqlDbProvider { pub fn new(db_connection_string: String) -> Self { MySqlDbProvider { db_connection_string, } } } #[async_trait] impl DbProvider for MySqlDbProvider { async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> { let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; let query = r#" INSERT INTO dicom_state_meta ( tenant_id, patient_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, patient_name, patient_sex, patient_birth_date, patient_birth_time, patient_age, patient_size, patient_weight, study_date, study_time, accession_number, study_id, study_description, modality, series_number, series_date, series_time, series_description, body_part_examined, protocol_name, series_related_instances, created_time, updated_time ) VALUES ( :tenant_id, :patient_id, :study_uid, :series_uid, :study_uid_hash, :series_uid_hash, :study_date_origin, :patient_name, :patient_sex, :patient_birth_date, :patient_birth_time, :patient_age, :patient_size, :patient_weight, :study_date, :study_time, :accession_number, :study_id, :study_description, :modality, :series_number, :series_date, :series_time, :series_description, :body_part_examined, :protocol_name, :series_related_instances, :created_time, :updated_time ) ON DUPLICATE KEY UPDATE patient_id = VALUES(patient_id), study_uid_hash = VALUES(study_uid_hash), series_uid_hash = VALUES(series_uid_hash), study_date_origin = VALUES(study_date_origin), patient_name = VALUES(patient_name), patient_sex = VALUES(patient_sex), patient_birth_date = VALUES(patient_birth_date), patient_birth_time = VALUES(patient_birth_time), patient_age = VALUES(patient_age), patient_size = VALUES(patient_size), patient_weight = VALUES(patient_weight), study_date = VALUES(study_date), study_time = VALUES(study_time), accession_number = VALUES(accession_number), study_id = VALUES(study_id), study_description = VALUES(study_description), modality = VALUES(modality), series_number = VALUES(series_number), series_date = VALUES(series_date), series_time = VALUES(series_time), series_description = VALUES(series_description), body_part_examined = VALUES(body_part_examined), protocol_name = VALUES(protocol_name), series_related_instances = VALUES(series_related_instances), updated_time = VALUES(updated_time) "#; conn.exec_drop( query, params! { "tenant_id" => &state_meta.tenant_id, "patient_id" => &state_meta.patient_id, "study_uid" => &state_meta.study_uid, "series_uid" => &state_meta.series_uid, "study_uid_hash" => &state_meta.study_uid_hash, "series_uid_hash" => &state_meta.series_uid_hash, "study_date_origin" => &state_meta.study_date_origin, "patient_name" => &state_meta.patient_name, "patient_sex" => &state_meta.patient_sex, "patient_birth_date" => &state_meta.patient_birth_date, "patient_birth_time" => &state_meta.patient_birth_time, "patient_age" => &state_meta.patient_age, "patient_size" => &state_meta.patient_size, "patient_weight" => &state_meta.patient_weight, "study_date" => &state_meta.study_date, "study_time" => &state_meta.study_time, "accession_number" => &state_meta.accession_number, "study_id" => &state_meta.study_id, "study_description" => &state_meta.study_description, "modality" => &state_meta.modality, "series_number" => &state_meta.series_number, "series_date" => &state_meta.series_date, "series_time" => &state_meta.series_time, "series_description" => &state_meta.series_description, "body_part_examined" => &state_meta.body_part_examined, "protocol_name" => &state_meta.protocol_name, "series_related_instances" => &state_meta.series_related_instances, "created_time" => &state_meta.created_time, "updated_time" => &state_meta.updated_time, }, ) .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?; Ok(()) } async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> { if state_meta_list.is_empty() { return Ok(()); } let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; conn.query_drop("START TRANSACTION") .map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?; let query = r#" INSERT INTO dicom_state_meta ( tenant_id, patient_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, patient_name, patient_sex, patient_birth_date, patient_birth_time, patient_age, patient_size, patient_weight, study_date, study_time, accession_number, study_id, study_description, modality, series_number, series_date, series_time, series_description, body_part_examined, protocol_name, series_related_instances, created_time, updated_time ) VALUES ( :tenant_id, :patient_id, :study_uid, :series_uid, :study_uid_hash, :series_uid_hash, :study_date_origin, :patient_name, :patient_sex, :patient_birth_date, :patient_birth_time, :patient_age, :patient_size, :patient_weight, :study_date, :study_time, :accession_number, :study_id, :study_description, :modality, :series_number, :series_date, :series_time, :series_description, :body_part_examined, :protocol_name, :series_related_instances, :created_time, :updated_time ) ON DUPLICATE KEY UPDATE patient_id = VALUES(patient_id), study_uid_hash = VALUES(study_uid_hash), series_uid_hash = VALUES(series_uid_hash), study_date_origin = VALUES(study_date_origin), patient_name = VALUES(patient_name), patient_sex = VALUES(patient_sex), patient_birth_date = VALUES(patient_birth_date), patient_birth_time = VALUES(patient_birth_time), patient_age = VALUES(patient_age), patient_size = VALUES(patient_size), patient_weight = VALUES(patient_weight), study_date = VALUES(study_date), study_time = VALUES(study_time), accession_number = VALUES(accession_number), study_id = VALUES(study_id), study_description = VALUES(study_description), modality = VALUES(modality), series_number = VALUES(series_number), series_date = VALUES(series_date), series_time = VALUES(series_time), series_description = VALUES(series_description), body_part_examined = VALUES(body_part_examined), protocol_name = VALUES(protocol_name), series_related_instances = VALUES(series_related_instances), updated_time = VALUES(updated_time) "#; for state_meta in state_meta_list { let result = conn.exec_drop( query, params! { "tenant_id" => &state_meta.tenant_id, "patient_id" => &state_meta.patient_id, "study_uid" => &state_meta.study_uid, "series_uid" => &state_meta.series_uid, "study_uid_hash" => &state_meta.study_uid_hash, "series_uid_hash" => &state_meta.series_uid_hash, "study_date_origin" => &state_meta.study_date_origin, "patient_name" => &state_meta.patient_name, "patient_sex" => &state_meta.patient_sex, "patient_birth_date" => &state_meta.patient_birth_date, "patient_birth_time" => &state_meta.patient_birth_time, "patient_age" => &state_meta.patient_age, "patient_size" => &state_meta.patient_size, "patient_weight" => &state_meta.patient_weight, "study_date" => &state_meta.study_date, "study_time" => &state_meta.study_time, "accession_number" => &state_meta.accession_number, "study_id" => &state_meta.study_id, "study_description" => &state_meta.study_description, "modality" => &state_meta.modality, "series_number" => &state_meta.series_number, "series_date" => &state_meta.series_date, "series_time" => &state_meta.series_time, "series_description" => &state_meta.series_description, "body_part_examined" => &state_meta.body_part_examined, "protocol_name" => &state_meta.protocol_name, "series_related_instances" => &state_meta.series_related_instances, "created_time" => &state_meta.created_time, "updated_time" => &state_meta.updated_time, }, ); if let Err(e) = result { conn.query_drop("ROLLBACK").map_err(|rollback_err| { DbError::DatabaseError(format!( "Failed to rollback transaction after error {}: {}", e, rollback_err )) })?; return Err(DbError::DatabaseError(format!( "Failed to execute query for state meta: {}", e ))); } } conn.query_drop("COMMIT") .map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?; Ok(()) } async fn save_json_list( &self, json_meta_list: &[DicomJsonMeta], ) -> std::result::Result<(), DbError> { if json_meta_list.is_empty() { return Ok(()); } let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; conn.query_drop("START TRANSACTION") .map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?; let query = r#" INSERT INTO dicom_json_meta ( tenant_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, created_time, flag_time, json_status, retry_times ) VALUES ( :tenant_id, :study_uid, :series_uid, :study_uid_hash, :series_uid_hash, :study_date_origin, :created_time, :flag_time, :json_status, :retry_times ) ON DUPLICATE KEY UPDATE study_uid_hash = VALUES(study_uid_hash), series_uid_hash = VALUES(series_uid_hash), study_date_origin = VALUES(study_date_origin), created_time = VALUES(created_time), flag_time = VALUES(flag_time), json_status = VALUES(json_status), retry_times = VALUES(retry_times) "#; for json_meta in json_meta_list { let result = conn.exec_drop( query, params! { "tenant_id" => &json_meta.tenant_id, "study_uid" => &json_meta.study_uid, "series_uid" => &json_meta.series_uid, "study_uid_hash" => &json_meta.study_uid_hash, "series_uid_hash" => &json_meta.series_uid_hash, "study_date_origin" => &json_meta.study_date_origin, "created_time" => &json_meta.created_time, "flag_time" => &json_meta.flag_time, "json_status" => &json_meta.json_status, "retry_times" => &json_meta.retry_times, }, ); if let Err(e) = result { conn.query_drop("ROLLBACK").map_err(|rollback_err| { DbError::DatabaseError(format!( "Failed to rollback transaction after error {}: {}", e, rollback_err )) })?; return Err(DbError::DatabaseError(format!( "Failed to execute query for json meta: {}", e ))); } } conn.query_drop("COMMIT") .map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?; Ok(()) } async fn get_state_metaes( &self, tenant_id: &str, study_uid: &str, ) -> Result<Vec<DicomStateMeta>, DbError> { let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; let query = r#" SELECT tenant_id, patient_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, patient_name, patient_sex, patient_birth_date, patient_birth_time, patient_age, patient_size, patient_weight, study_date, study_time, accession_number, study_id, study_description, modality, series_number, series_date, series_time, series_description, body_part_examined, protocol_name, series_related_instances, created_time, updated_time FROM dicom_state_meta WHERE tenant_id = :tenant_id AND study_uid = :study_uid "#; let result: Vec<DicomStateMeta> = conn .exec_map( query, params! { "tenant_id" => tenant_id, "study_uid" => study_uid, }, |row: mysql::Row| { DicomStateMeta { tenant_id: row.get("tenant_id").unwrap_or_default(), patient_id: row.get("patient_id").unwrap_or_default(), study_uid: row.get("study_uid").unwrap_or_default(), series_uid: row.get("series_uid").unwrap_or_default(), study_uid_hash: row.get("study_uid_hash").unwrap_or_default(), series_uid_hash: row.get("series_uid_hash").unwrap_or_default(), study_date_origin: row.get("study_date_origin").unwrap_or_default(), patient_name: row.get("patient_name").unwrap_or_default(), patient_sex: row.get("patient_sex").unwrap_or_default(), patient_birth_date: row.get("patient_birth_date").unwrap_or_default(), patient_birth_time: row.get("patient_birth_time").unwrap_or_default(), patient_age: row.get("patient_age").unwrap_or_default(), patient_size: row.get("patient_size").unwrap_or_default(), patient_weight: row.get("patient_weight").unwrap_or_default(), study_date: row.get("study_date").unwrap_or_default(), study_time: row.get("study_time").unwrap_or_default(), accession_number: row.get("accession_number").unwrap_or_default(), study_id: row.get("study_id").unwrap_or_default(), study_description: row.get("study_description").unwrap_or_default(), modality: row.get("modality").unwrap_or_default(), series_number: row.get("series_number").unwrap_or_default(), series_date: row.get("series_date").unwrap_or_default(), series_time: row.get("series_time").unwrap_or_default(), series_description: row.get("series_description").unwrap_or_default(), body_part_examined: row.get("body_part_examined").unwrap_or_default(), protocol_name: row.get("protocol_name").unwrap_or_default(), series_related_instances: row .get("series_related_instances") .unwrap_or_default(), created_time: row.get("created_time").unwrap_or_default(), updated_time: row.get("updated_time").unwrap_or_default(), } }, ) .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?; Ok(result) } async fn get_json_metaes( &self, end_time: chrono::NaiveDateTime, ) -> std::result::Result<Vec<DicomStateMeta>, DbError> { let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; let query = r#" Select tenant_id, patient_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, patient_name, patient_sex, patient_birth_date, patient_birth_time, patient_age, patient_size, patient_weight, study_date, study_time, accession_number, study_id, study_description, modality, series_number, series_date, series_time, series_description, body_part_examined, protocol_name, series_related_instances, created_time, updated_time From (SELECT dsm.* FROM dicom_state_meta dsm LEFT JOIN dicom_json_meta djm ON dsm.tenant_id = djm.tenant_id AND dsm.study_uid = djm.study_uid AND dsm.series_uid = djm.series_uid WHERE djm.tenant_id IS NULL AND dsm.updated_time < ? UNION ALL SELECT dsm.* FROM dicom_state_meta dsm INNER JOIN dicom_json_meta djm ON dsm.tenant_id = djm.tenant_id AND dsm.study_uid = djm.study_uid AND dsm.series_uid = djm.series_uid WHERE dsm.updated_time != djm.flag_time AND dsm.updated_time < ? ) AS t order by t.updated_time asc limit 10; "#; let result: Vec<DicomStateMeta> = conn .exec_map(query, params! { end_time,end_time }, |row: mysql::Row| { DicomStateMeta { tenant_id: row.get("tenant_id").unwrap_or_default(), patient_id: row.get("patient_id").unwrap_or_default(), study_uid: row.get("study_uid").unwrap_or_default(), series_uid: row.get("series_uid").unwrap_or_default(), study_uid_hash: row.get("study_uid_hash").unwrap_or_default(), series_uid_hash: row.get("series_uid_hash").unwrap_or_default(), study_date_origin: row.get("study_date_origin").unwrap_or_default(), patient_name: row.get("patient_name").unwrap_or_default(), patient_sex: row.get("patient_sex").unwrap_or_default(), patient_birth_date: row.get("patient_birth_date").unwrap_or_default(), patient_birth_time: row.get("patient_birth_time").unwrap_or_default(), patient_age: row.get("patient_age").unwrap_or_default(), patient_size: row.get("patient_size").unwrap_or_default(), patient_weight: row.get("patient_weight").unwrap_or_default(), study_date: row.get("study_date").unwrap_or_default(), study_time: row.get("study_time").unwrap_or_default(), accession_number: row.get("accession_number").unwrap_or_default(), study_id: row.get("study_id").unwrap_or_default(), study_description: row.get("study_description").unwrap_or_default(), modality: row.get("modality").unwrap_or_default(), series_number: row.get("series_number").unwrap_or_default(), series_date: row.get("series_date").unwrap_or_default(), series_time: row.get("series_time").unwrap_or_default(), series_description: row.get("series_description").unwrap_or_default(), body_part_examined: row.get("body_part_examined").unwrap_or_default(), protocol_name: row.get("protocol_name").unwrap_or_default(), series_related_instances: row .get("series_related_instances") .unwrap_or_default(), created_time: row.get("created_time").unwrap_or_default(), updated_time: row.get("updated_time").unwrap_or_default(), } }) .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?; Ok(result) } async fn get_json_meta( &self, tenant_id: &str, study_uid: &str, series_uid: &str, ) -> std::result::Result<DicomJsonMeta, DbError> { let mut conn = mysql::Conn::new(self.db_connection_string.as_str()) .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?; let query = r#" SELECT tenant_id, study_uid, series_uid, study_uid_hash, series_uid_hash, study_date_origin, created_time, flag_time, json_status, retry_times FROM dicom_json_meta WHERE series_uid = :series_uid and tenant_id = :tenant_id and study_uid = :study_uid "#; let result: Option<DicomJsonMeta> = conn .exec_first( query, params! { "series_uid" => series_uid, "tenant_id" => tenant_id, "study_uid" => study_uid, }, ) .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))? .map(|row: mysql::Row| DicomJsonMeta { tenant_id: row.get("tenant_id").unwrap_or_default(), study_uid: row.get("study_uid").unwrap_or_default(), series_uid: row.get("series_uid").unwrap_or_default(), study_uid_hash: row.get("study_uid_hash").unwrap_or_default(), series_uid_hash: row.get("series_uid_hash").unwrap_or_default(), study_date_origin: row.get("study_date_origin").unwrap_or_default(), created_time: row.get("created_time").unwrap_or_default(), flag_time: row.get("flag_time").unwrap_or_default(), json_status: row.get("json_status").unwrap_or_default(), retry_times: row.get("retry_times").unwrap_or_default(), }); match result { Some(json_meta) => Ok(json_meta), None => Err(DbError::DatabaseError(format!( "DicomJsonMeta with series_uid {} not found", series_uid ))), } } } 3.2 PostgreSQL Implementation Here’s the PostgreSQL implementation:
...