-
概述
在这个篇文章中,我们将介绍如何使用Rust实现一个可扩展的云DICOM-WEB服务,该服务支持多种数据库模式,如MySQL、PostgreSQL和MongoDB。
-
准备工作 接口定义,或是说是 trait 定义
dicom_dbprovider.rs
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DbError {
#[error("Database operation failed: {0}")]
DatabaseError(String),
#[error("DataRow not exists: {0}")]
RecordNotExists(String),
#[error("Record already exists")]
AlreadyExists,
#[error("Entity extraction failed: {0}")]
ExtractionFailed(String),
#[error("Transaction failed: {0}")]
TransactionFailed(String),
}
pub fn current_time() -> chrono::NaiveDateTime {
chrono::Local::now().naive_local()
}
#[async_trait]
pub trait DbProvider: Send + Sync {
async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError>;
async fn save_state_list(&self, state_meta: &[DicomStateMeta]) -> Result<(), DbError>;
async fn save_json_list(&self, state_meta: &[DicomJsonMeta]) -> Result<(), DbError>;
async fn get_state_metaes(
&self,
tenant_id: &str,
study_uid: &str,
) -> Result<Vec<DicomStateMeta>, DbError>;
/*
* 获取需要生成JSON格式的Metadata的序列信息.
* end_time: 截止时间.
*/
async fn get_json_metaes(&self, end_time: chrono::NaiveDateTime) -> Result<Vec<DicomStateMeta>, DbError>;
async fn get_json_meta(&self, tenant_id:&str, study_uid: &str, series_uid: &str)->Result<DicomJsonMeta, DbError>;
}
- 实现
3.1 MySQL数据库实现
dicom_mysql.rs
use crate::dicom_dbprovider::{DbError, DbProvider};
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use mysql::prelude::*;
use mysql::*;
pub struct MySqlDbProvider {
db_connection_string: String,
}
impl MySqlDbProvider {
pub fn new(db_connection_string: String) -> Self {
MySqlDbProvider {
db_connection_string,
}
}
}
#[async_trait]
impl DbProvider for MySqlDbProvider {
async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES (
:tenant_id,
:patient_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:patient_name,
:patient_sex,
:patient_birth_date,
:patient_birth_time,
:patient_age,
:patient_size,
:patient_weight,
:study_date,
:study_time,
:accession_number,
:study_id,
:study_description,
:modality,
:series_number,
:series_date,
:series_time,
:series_description,
:body_part_examined,
:protocol_name,
:series_related_instances,
:created_time,
:updated_time
) ON DUPLICATE KEY UPDATE
patient_id = VALUES(patient_id),
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
patient_name = VALUES(patient_name),
patient_sex = VALUES(patient_sex),
patient_birth_date = VALUES(patient_birth_date),
patient_birth_time = VALUES(patient_birth_time),
patient_age = VALUES(patient_age),
patient_size = VALUES(patient_size),
patient_weight = VALUES(patient_weight),
study_date = VALUES(study_date),
study_time = VALUES(study_time),
accession_number = VALUES(accession_number),
study_id = VALUES(study_id),
study_description = VALUES(study_description),
modality = VALUES(modality),
series_number = VALUES(series_number),
series_date = VALUES(series_date),
series_time = VALUES(series_time),
series_description = VALUES(series_description),
body_part_examined = VALUES(body_part_examined),
protocol_name = VALUES(protocol_name),
series_related_instances = VALUES(series_related_instances),
updated_time = VALUES(updated_time)
"#;
conn.exec_drop(
query,
params! {
"tenant_id" => &state_meta.tenant_id,
"patient_id" => &state_meta.patient_id,
"study_uid" => &state_meta.study_uid,
"series_uid" => &state_meta.series_uid,
"study_uid_hash" => &state_meta.study_uid_hash,
"series_uid_hash" => &state_meta.series_uid_hash,
"study_date_origin" => &state_meta.study_date_origin,
"patient_name" => &state_meta.patient_name,
"patient_sex" => &state_meta.patient_sex,
"patient_birth_date" => &state_meta.patient_birth_date,
"patient_birth_time" => &state_meta.patient_birth_time,
"patient_age" => &state_meta.patient_age,
"patient_size" => &state_meta.patient_size,
"patient_weight" => &state_meta.patient_weight,
"study_date" => &state_meta.study_date,
"study_time" => &state_meta.study_time,
"accession_number" => &state_meta.accession_number,
"study_id" => &state_meta.study_id,
"study_description" => &state_meta.study_description,
"modality" => &state_meta.modality,
"series_number" => &state_meta.series_number,
"series_date" => &state_meta.series_date,
"series_time" => &state_meta.series_time,
"series_description" => &state_meta.series_description,
"body_part_examined" => &state_meta.body_part_examined,
"protocol_name" => &state_meta.protocol_name,
"series_related_instances" => &state_meta.series_related_instances,
"created_time" => &state_meta.created_time,
"updated_time" => &state_meta.updated_time,
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(())
}
async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> {
if state_meta_list.is_empty() {
return Ok(());
}
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
conn.query_drop("START TRANSACTION")
.map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;
let query = r#"
INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES (
:tenant_id,
:patient_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:patient_name,
:patient_sex,
:patient_birth_date,
:patient_birth_time,
:patient_age,
:patient_size,
:patient_weight,
:study_date,
:study_time,
:accession_number,
:study_id,
:study_description,
:modality,
:series_number,
:series_date,
:series_time,
:series_description,
:body_part_examined,
:protocol_name,
:series_related_instances,
:created_time,
:updated_time
) ON DUPLICATE KEY UPDATE
patient_id = VALUES(patient_id),
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
patient_name = VALUES(patient_name),
patient_sex = VALUES(patient_sex),
patient_birth_date = VALUES(patient_birth_date),
patient_birth_time = VALUES(patient_birth_time),
patient_age = VALUES(patient_age),
patient_size = VALUES(patient_size),
patient_weight = VALUES(patient_weight),
study_date = VALUES(study_date),
study_time = VALUES(study_time),
accession_number = VALUES(accession_number),
study_id = VALUES(study_id),
study_description = VALUES(study_description),
modality = VALUES(modality),
series_number = VALUES(series_number),
series_date = VALUES(series_date),
series_time = VALUES(series_time),
series_description = VALUES(series_description),
body_part_examined = VALUES(body_part_examined),
protocol_name = VALUES(protocol_name),
series_related_instances = VALUES(series_related_instances),
updated_time = VALUES(updated_time)
"#;
for state_meta in state_meta_list {
let result = conn.exec_drop(
query,
params! {
"tenant_id" => &state_meta.tenant_id,
"patient_id" => &state_meta.patient_id,
"study_uid" => &state_meta.study_uid,
"series_uid" => &state_meta.series_uid,
"study_uid_hash" => &state_meta.study_uid_hash,
"series_uid_hash" => &state_meta.series_uid_hash,
"study_date_origin" => &state_meta.study_date_origin,
"patient_name" => &state_meta.patient_name,
"patient_sex" => &state_meta.patient_sex,
"patient_birth_date" => &state_meta.patient_birth_date,
"patient_birth_time" => &state_meta.patient_birth_time,
"patient_age" => &state_meta.patient_age,
"patient_size" => &state_meta.patient_size,
"patient_weight" => &state_meta.patient_weight,
"study_date" => &state_meta.study_date,
"study_time" => &state_meta.study_time,
"accession_number" => &state_meta.accession_number,
"study_id" => &state_meta.study_id,
"study_description" => &state_meta.study_description,
"modality" => &state_meta.modality,
"series_number" => &state_meta.series_number,
"series_date" => &state_meta.series_date,
"series_time" => &state_meta.series_time,
"series_description" => &state_meta.series_description,
"body_part_examined" => &state_meta.body_part_examined,
"protocol_name" => &state_meta.protocol_name,
"series_related_instances" => &state_meta.series_related_instances,
"created_time" => &state_meta.created_time,
"updated_time" => &state_meta.updated_time,
},
);
if let Err(e) = result {
conn.query_drop("ROLLBACK").map_err(|rollback_err| {
DbError::DatabaseError(format!(
"Failed to rollback transaction after error {}: {}",
e, rollback_err
))
})?;
return Err(DbError::DatabaseError(format!(
"Failed to execute query for state meta: {}",
e
)));
}
}
conn.query_drop("COMMIT")
.map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;
Ok(())
}
async fn save_json_list(
&self,
json_meta_list: &[DicomJsonMeta],
) -> std::result::Result<(), DbError> {
if json_meta_list.is_empty() {
return Ok(());
}
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
conn.query_drop("START TRANSACTION")
.map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;
let query = r#"
INSERT INTO dicom_json_meta (
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
created_time,
flag_time,
json_status,
retry_times
) VALUES (
:tenant_id,
:study_uid,
:series_uid,
:study_uid_hash,
:series_uid_hash,
:study_date_origin,
:created_time,
:flag_time,
:json_status,
:retry_times
) ON DUPLICATE KEY UPDATE
study_uid_hash = VALUES(study_uid_hash),
series_uid_hash = VALUES(series_uid_hash),
study_date_origin = VALUES(study_date_origin),
created_time = VALUES(created_time),
flag_time = VALUES(flag_time),
json_status = VALUES(json_status),
retry_times = VALUES(retry_times)
"#;
for json_meta in json_meta_list {
let result = conn.exec_drop(
query,
params! {
"tenant_id" => &json_meta.tenant_id,
"study_uid" => &json_meta.study_uid,
"series_uid" => &json_meta.series_uid,
"study_uid_hash" => &json_meta.study_uid_hash,
"series_uid_hash" => &json_meta.series_uid_hash,
"study_date_origin" => &json_meta.study_date_origin,
"created_time" => &json_meta.created_time,
"flag_time" => &json_meta.flag_time,
"json_status" => &json_meta.json_status,
"retry_times" => &json_meta.retry_times,
},
);
if let Err(e) = result {
conn.query_drop("ROLLBACK").map_err(|rollback_err| {
DbError::DatabaseError(format!(
"Failed to rollback transaction after error {}: {}",
e, rollback_err
))
})?;
return Err(DbError::DatabaseError(format!(
"Failed to execute query for json meta: {}",
e
)));
}
}
conn.query_drop("COMMIT")
.map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;
Ok(())
}
async fn get_state_metaes(
&self,
tenant_id: &str,
study_uid: &str,
) -> Result<Vec<DicomStateMeta>, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
SELECT
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
FROM dicom_state_meta
WHERE tenant_id = :tenant_id AND study_uid = :study_uid
"#;
let result: Vec<DicomStateMeta> = conn
.exec_map(
query,
params! {
"tenant_id" => tenant_id,
"study_uid" => study_uid,
},
|row: mysql::Row| {
DicomStateMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
patient_id: row.get("patient_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
patient_name: row.get("patient_name").unwrap_or_default(),
patient_sex: row.get("patient_sex").unwrap_or_default(),
patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
patient_age: row.get("patient_age").unwrap_or_default(),
patient_size: row.get("patient_size").unwrap_or_default(),
patient_weight: row.get("patient_weight").unwrap_or_default(),
study_date: row.get("study_date").unwrap_or_default(),
study_time: row.get("study_time").unwrap_or_default(),
accession_number: row.get("accession_number").unwrap_or_default(),
study_id: row.get("study_id").unwrap_or_default(),
study_description: row.get("study_description").unwrap_or_default(),
modality: row.get("modality").unwrap_or_default(),
series_number: row.get("series_number").unwrap_or_default(),
series_date: row.get("series_date").unwrap_or_default(),
series_time: row.get("series_time").unwrap_or_default(),
series_description: row.get("series_description").unwrap_or_default(),
body_part_examined: row.get("body_part_examined").unwrap_or_default(),
protocol_name: row.get("protocol_name").unwrap_or_default(),
series_related_instances: row
.get("series_related_instances")
.unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
updated_time: row.get("updated_time").unwrap_or_default(),
}
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(result)
}
async fn get_json_metaes(
&self,
end_time: chrono::NaiveDateTime,
) -> std::result::Result<Vec<DicomStateMeta>, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
Select tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
From (SELECT dsm.*
FROM dicom_state_meta dsm
LEFT JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE djm.tenant_id IS NULL AND dsm.updated_time < ?
UNION ALL
SELECT dsm.*
FROM dicom_state_meta dsm
INNER JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE dsm.updated_time != djm.flag_time
AND dsm.updated_time < ?
) AS t
order by t.updated_time asc limit 10;
"#;
let result: Vec<DicomStateMeta> = conn
.exec_map(query, params! { end_time,end_time }, |row: mysql::Row| {
DicomStateMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
patient_id: row.get("patient_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
patient_name: row.get("patient_name").unwrap_or_default(),
patient_sex: row.get("patient_sex").unwrap_or_default(),
patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
patient_age: row.get("patient_age").unwrap_or_default(),
patient_size: row.get("patient_size").unwrap_or_default(),
patient_weight: row.get("patient_weight").unwrap_or_default(),
study_date: row.get("study_date").unwrap_or_default(),
study_time: row.get("study_time").unwrap_or_default(),
accession_number: row.get("accession_number").unwrap_or_default(),
study_id: row.get("study_id").unwrap_or_default(),
study_description: row.get("study_description").unwrap_or_default(),
modality: row.get("modality").unwrap_or_default(),
series_number: row.get("series_number").unwrap_or_default(),
series_date: row.get("series_date").unwrap_or_default(),
series_time: row.get("series_time").unwrap_or_default(),
series_description: row.get("series_description").unwrap_or_default(),
body_part_examined: row.get("body_part_examined").unwrap_or_default(),
protocol_name: row.get("protocol_name").unwrap_or_default(),
series_related_instances: row
.get("series_related_instances")
.unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
updated_time: row.get("updated_time").unwrap_or_default(),
}
})
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;
Ok(result)
}
async fn get_json_meta(
&self,
tenant_id: &str,
study_uid: &str,
series_uid: &str,
) -> std::result::Result<DicomJsonMeta, DbError> {
let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
.map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;
let query = r#"
SELECT
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
created_time,
flag_time,
json_status,
retry_times
FROM dicom_json_meta
WHERE series_uid = :series_uid and tenant_id = :tenant_id and study_uid = :study_uid
"#;
let result: Option<DicomJsonMeta> = conn
.exec_first(
query,
params! {
"series_uid" => series_uid,
"tenant_id" => tenant_id,
"study_uid" => study_uid,
},
)
.map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?
.map(|row: mysql::Row| DicomJsonMeta {
tenant_id: row.get("tenant_id").unwrap_or_default(),
study_uid: row.get("study_uid").unwrap_or_default(),
series_uid: row.get("series_uid").unwrap_or_default(),
study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
study_date_origin: row.get("study_date_origin").unwrap_or_default(),
created_time: row.get("created_time").unwrap_or_default(),
flag_time: row.get("flag_time").unwrap_or_default(),
json_status: row.get("json_status").unwrap_or_default(),
retry_times: row.get("retry_times").unwrap_or_default(),
});
match result {
Some(json_meta) => Ok(json_meta),
None => Err(DbError::DatabaseError(format!(
"DicomJsonMeta with series_uid {} not found",
series_uid
))),
}
}
}
3.2 PostgresSQL 的实现
PostgreSQL 的实现代码如下:
use crate::dicom_dbprovider::{DbError, DbProvider};
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use tokio_postgres::{Client, NoTls};
#[derive(Debug, Clone)]
pub struct PgDbProvider {
db_connection_string: String,
}
impl PgDbProvider {
pub fn new(database_url: String) -> Self {
Self {
db_connection_string: database_url,
}
}
async fn make_client(&self) -> Result<Client, DbError> {
let (client, connection) =
tokio_postgres::connect(self.db_connection_string.as_str(), NoTls)
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
// Spawn the connection processor
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Database connection error: {}", e);
}
});
Ok(client)
}
}
#[async_trait]
impl DbProvider for PgDbProvider {
async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> {
let client = self.make_client().await?;
let statement = client
.prepare(
"INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (tenant_id, study_uid, series_uid)
DO UPDATE SET
patient_id = EXCLUDED.patient_id,
study_uid_hash = EXCLUDED.study_uid_hash,
series_uid_hash = EXCLUDED.series_uid_hash,
study_date_origin = EXCLUDED.study_date_origin,
patient_name = EXCLUDED.patient_name,
patient_sex = EXCLUDED.patient_sex,
patient_birth_date = EXCLUDED.patient_birth_date,
patient_birth_time = EXCLUDED.patient_birth_time,
patient_age = EXCLUDED.patient_age,
patient_size = EXCLUDED.patient_size,
patient_weight = EXCLUDED.patient_weight,
study_date = EXCLUDED.study_date,
study_time = EXCLUDED.study_time,
accession_number = EXCLUDED.accession_number,
study_id = EXCLUDED.study_id,
study_description = EXCLUDED.study_description,
modality = EXCLUDED.modality,
series_number = EXCLUDED.series_number,
series_date = EXCLUDED.series_date,
series_time = EXCLUDED.series_time,
series_description = EXCLUDED.series_description,
body_part_examined = EXCLUDED.body_part_examined,
protocol_name = EXCLUDED.protocol_name,
series_related_instances = EXCLUDED.series_related_instances,
updated_time = EXCLUDED.updated_time"
)
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
client
.execute(
&statement,
&[
&state_meta.tenant_id,
&state_meta.patient_id,
&state_meta.study_uid,
&state_meta.series_uid,
&state_meta.study_uid_hash,
&state_meta.series_uid_hash,
&state_meta.study_date_origin,
&state_meta.patient_name,
&state_meta.patient_sex,
&state_meta.patient_birth_date,
&state_meta.patient_birth_time,
&state_meta.patient_age,
&state_meta.patient_size,
&state_meta.patient_weight,
&state_meta.study_date,
&state_meta.study_time,
&state_meta.accession_number,
&state_meta.study_id,
&state_meta.study_description,
&state_meta.modality,
&state_meta.series_number,
&state_meta.series_date,
&state_meta.series_time,
&state_meta.series_description,
&state_meta.body_part_examined,
&state_meta.protocol_name,
&state_meta.series_related_instances,
&state_meta.created_time,
&state_meta.updated_time,
],
)
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> {
// 使用事务确保所有数据要么全部保存成功,要么全部失败
let mut client = self.make_client().await?;
let transaction = client.transaction().await.map_err(|e| {
println!("Failed to start transaction: {}", e);
DbError::DatabaseError(e.to_string())
})?;
println!(
"Starting transaction to save state meta list of length {}",
state_meta_list.len()
);
let statement = transaction
.prepare(
"INSERT INTO dicom_state_meta (
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (tenant_id, study_uid, series_uid)
DO UPDATE SET
patient_id = EXCLUDED.patient_id,
study_uid_hash = EXCLUDED.study_uid_hash,
series_uid_hash = EXCLUDED.series_uid_hash,
study_date_origin = EXCLUDED.study_date_origin,
patient_name = EXCLUDED.patient_name,
patient_sex = EXCLUDED.patient_sex,
patient_birth_date = EXCLUDED.patient_birth_date,
patient_birth_time = EXCLUDED.patient_birth_time,
patient_age = EXCLUDED.patient_age,
patient_size = EXCLUDED.patient_size,
patient_weight = EXCLUDED.patient_weight,
study_date = EXCLUDED.study_date,
study_time = EXCLUDED.study_time,
accession_number = EXCLUDED.accession_number,
study_id = EXCLUDED.study_id,
study_description = EXCLUDED.study_description,
modality = EXCLUDED.modality,
series_number = EXCLUDED.series_number,
series_date = EXCLUDED.series_date,
series_time = EXCLUDED.series_time,
series_description = EXCLUDED.series_description,
body_part_examined = EXCLUDED.body_part_examined,
protocol_name = EXCLUDED.protocol_name,
series_related_instances = EXCLUDED.series_related_instances,
updated_time = EXCLUDED.updated_time"
)
.await
.map_err(|e| {
println!("Error transaction.prepare: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
// 遍历所有 DicomStateMeta 对象并执行插入操作
for state_meta in state_meta_list {
transaction
.execute(
&statement,
&[
&state_meta.tenant_id,
&state_meta.patient_id,
&state_meta.study_uid,
&state_meta.series_uid,
&state_meta.study_uid_hash,
&state_meta.series_uid_hash,
&state_meta.study_date_origin,
&state_meta.patient_name,
&state_meta.patient_sex,
&state_meta.patient_birth_date,
&state_meta.patient_birth_time,
&state_meta.patient_age,
&state_meta.patient_size,
&state_meta.patient_weight,
&state_meta.study_date,
&state_meta.study_time,
&state_meta.accession_number,
&state_meta.study_id,
&state_meta.study_description,
&state_meta.modality,
&state_meta.series_number,
&state_meta.series_date,
&state_meta.series_time,
&state_meta.series_description,
&state_meta.body_part_examined,
&state_meta.protocol_name,
&state_meta.series_related_instances,
&state_meta.created_time,
&state_meta.updated_time,
],
)
.await
.map_err(|e| {
println!("Error transaction.execute: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
}
// 提交事务
transaction.commit().await.map_err(|e| {
println!("Error transaction.commit: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
Ok(())
}
async fn save_json_list(&self, json_meta_list: &[DicomJsonMeta]) -> Result<(), DbError> {
if json_meta_list.is_empty() {
return Ok(());
}
let mut client = self.make_client().await?;
let transaction = client.transaction().await.map_err(|e| {
println!("Failed to start transaction: {}", e);
DbError::DatabaseError(e.to_string())
})?;
println!(
"Starting transaction to save json meta list of length {}",
json_meta_list.len()
);
let statement = transaction
.prepare(
"INSERT INTO dicom_json_meta (
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
flag_time,
created_time,json_status,retry_times
) VALUES ($1, $2, $3, $4, $5, $6, $7,$8,$9,$10)
ON CONFLICT (tenant_id, study_uid, series_uid)
DO UPDATE SET
study_uid_hash = EXCLUDED.study_uid_hash,
series_uid_hash = EXCLUDED.series_uid_hash,
study_date_origin = EXCLUDED.study_date_origin,
flag_time = EXCLUDED.flag_time,
created_time = EXCLUDED.created_time,
json_status = EXCLUDED.json_status,
retry_times = EXCLUDED.retry_times
",
)
.await
.map_err(|e| {
println!("Error transaction.prepare: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
// 遍历所有 DicomJsonMeta 对象并执行插入操作
for json_meta in json_meta_list {
transaction
.execute(
&statement,
&[
&json_meta.tenant_id,
&json_meta.study_uid,
&json_meta.series_uid,
&json_meta.study_uid_hash,
&json_meta.series_uid_hash,
&json_meta.study_date_origin,
&json_meta.flag_time,
&json_meta.created_time,
&json_meta.json_status,
&json_meta.retry_times,
],
)
.await
.map_err(|e| {
println!("Error transaction.execute: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
}
// 提交事务
transaction.commit().await.map_err(|e| {
println!("Error transaction.commit: {:?}", e);
DbError::DatabaseError(e.to_string())
})?;
Ok(())
}
async fn get_state_metaes(
&self,
tenant_id: &str,
study_uid: &str,
) -> Result<Vec<DicomStateMeta>, DbError> {
let client = self.make_client().await?;
let statement = client
.prepare(
"SELECT
tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
FROM dicom_state_meta
WHERE tenant_id = $1 AND study_uid = $2",
)
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
let rows = client
.query(&statement, &[&tenant_id, &study_uid])
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let state_meta = DicomStateMeta {
tenant_id: row.get(0),
patient_id: row.get(1),
study_uid: row.get(2),
series_uid: row.get(3),
study_uid_hash: row.get(4),
series_uid_hash: row.get(5),
study_date_origin: row.get(6),
patient_name: row.get(7),
patient_sex: row.get(8),
patient_birth_date: row.get(9),
patient_birth_time: row.get(10),
patient_age: row.get(11),
patient_size: row.get(12),
patient_weight: row.get(13),
study_date: row.get(14),
study_time: row.get(15),
accession_number: row.get(16),
study_id: row.get(17),
study_description: row.get(18),
modality: row.get(19),
series_number: row.get(20),
series_date: row.get(21),
series_time: row.get(22),
series_description: row.get(23),
body_part_examined: row.get(24),
protocol_name: row.get(25),
series_related_instances: row.get(26),
created_time: row.get(27),
updated_time: row.get(28),
};
result.push(state_meta);
}
Ok(result)
}
async fn get_json_metaes(
&self,
end_time: chrono::NaiveDateTime,
) -> Result<Vec<DicomStateMeta>, DbError> {
let client = self.make_client().await?;
let statement = client
.prepare(
" Select tenant_id,
patient_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
patient_name,
patient_sex,
patient_birth_date,
patient_birth_time,
patient_age,
patient_size,
patient_weight,
study_date,
study_time,
accession_number,
study_id,
study_description,
modality,
series_number,
series_date,
series_time,
series_description,
body_part_examined,
protocol_name,
series_related_instances,
created_time,
updated_time
From (SELECT dsm.*
FROM dicom_state_meta dsm
LEFT JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE djm.tenant_id IS NULL AND dsm.updated_time < $1
UNION ALL
SELECT dsm.*
FROM dicom_state_meta dsm
INNER JOIN dicom_json_meta djm
ON dsm.tenant_id = djm.tenant_id
AND dsm.study_uid = djm.study_uid
AND dsm.series_uid = djm.series_uid
WHERE dsm.updated_time != djm.flag_time AND dsm.updated_time < $1
) AS t
order by t.updated_time asc limit 10;",
)
.await
.map_err(|e| {
println!("{:?}", e);
DbError::DatabaseError(e.to_string())
})?;
let rows = client
.query(&statement, &[&end_time])
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let state_meta = DicomStateMeta {
tenant_id: row.get(0),
patient_id: row.get(1),
study_uid: row.get(2),
series_uid: row.get(3),
study_uid_hash: row.get(4),
series_uid_hash: row.get(5),
study_date_origin: row.get(6),
patient_name: row.get(7),
patient_sex: row.get(8),
patient_birth_date: row.get(9),
patient_birth_time: row.get(10),
patient_age: row.get(11),
patient_size: row.get(12),
patient_weight: row.get(13),
study_date: row.get(14),
study_time: row.get(15),
accession_number: row.get(16),
study_id: row.get(17),
study_description: row.get(18),
modality: row.get(19),
series_number: row.get(20),
series_date: row.get(21),
series_time: row.get(22),
series_description: row.get(23),
body_part_examined: row.get(24),
protocol_name: row.get(25),
series_related_instances: row.get(26),
created_time: row.get(27),
updated_time: row.get(28),
};
result.push(state_meta);
}
Ok(result)
}
async fn get_json_meta(
&self,
tenant_id: &str,
study_uid: &str,
series_uid: &str,
) -> Result<DicomJsonMeta, DbError> {
let client = self.make_client().await?;
let statement = client
.prepare(
"SELECT
tenant_id,
study_uid,
series_uid,
study_uid_hash,
series_uid_hash,
study_date_origin,
flag_time,
created_time,
json_status,
retry_times
FROM dicom_json_meta
WHERE series_uid = $1 and tenant_id = $2 and study_uid = $3 ",
)
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
let rows = client
.query(&statement, &[&series_uid, &tenant_id, &study_uid])
.await
.map_err(|e| DbError::DatabaseError(e.to_string()))?;
if rows.is_empty() {
return Err(DbError::RecordNotExists(format!(
"DicomJsonMeta with series_uid {} not found",
series_uid
)));
}
let row = &rows[0];
let json_meta = DicomJsonMeta {
tenant_id: row.get(0),
study_uid: row.get(1),
series_uid: row.get(2),
study_uid_hash: row.get(3),
series_uid_hash: row.get(4),
study_date_origin: row.get(5),
flag_time: row.get(6),
created_time: row.get(7),
json_status: row.get(8),
retry_times: row.get(9),
};
Ok(json_meta)
}
}
好了. 现在, 我们已经完成了数据库接口的定义及实现.下一节, 我们将开始编写我们的服务.
Waiting for next
GoTo Summary : how-to-build-cloud-dicom
GoTo Database-Sign : how-to-build-cloud-dicom:Part 1 Database Design