1. 概述

    在这个篇文章中,我们将介绍如何使用Rust实现一个可扩展的云DICOM-WEB服务,该服务支持多种数据库模式,如MySQL、PostgreSQL和MongoDB。

  2. 准备工作 接口定义,或是说是 trait 定义

dicom_dbprovider.rs

use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum DbError {
    #[error("Database operation failed: {0}")]
    DatabaseError(String),

    #[error("DataRow not exists: {0}")]
    RecordNotExists(String),


    #[error("Record already exists")]
    AlreadyExists,

    #[error("Entity extraction failed: {0}")]
    ExtractionFailed(String),

    #[error("Transaction failed: {0}")]
    TransactionFailed(String),
}
pub fn current_time() -> chrono::NaiveDateTime {
    chrono::Local::now().naive_local()
}
#[async_trait]
pub trait DbProvider: Send + Sync {
    async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError>;

    async fn save_state_list(&self, state_meta: &[DicomStateMeta]) -> Result<(), DbError>;

    async fn save_json_list(&self, state_meta: &[DicomJsonMeta]) -> Result<(), DbError>;

    async fn get_state_metaes(
        &self,
        tenant_id: &str,
        study_uid: &str,
    ) -> Result<Vec<DicomStateMeta>, DbError>;


    /*
     * 获取需要生成JSON格式的Metadata的序列信息.
     * end_time: 截止时间.
     */
    async fn get_json_metaes(&self, end_time: chrono::NaiveDateTime) -> Result<Vec<DicomStateMeta>, DbError>;

    async fn get_json_meta(&self, tenant_id:&str, study_uid: &str, series_uid: &str)->Result<DicomJsonMeta, DbError>;
}
  1. 实现

3.1 MySQL数据库实现

dicom_mysql.rs

use crate::dicom_dbprovider::{DbError, DbProvider};
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use mysql::prelude::*;
use mysql::*;
pub struct MySqlDbProvider {
    db_connection_string: String,
}

impl MySqlDbProvider {
    pub fn new(db_connection_string: String) -> Self {
        MySqlDbProvider {
            db_connection_string,
        }
    }
}
#[async_trait]
impl DbProvider for MySqlDbProvider {
    async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> {
        
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

        
        let query = r#"
            INSERT INTO dicom_state_meta (
                tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            ) VALUES (
                :tenant_id,
                :patient_id,
                :study_uid,
                :series_uid,
                :study_uid_hash,
                :series_uid_hash,
                :study_date_origin,
                :patient_name,
                :patient_sex,
                :patient_birth_date,
                :patient_birth_time,
                :patient_age,
                :patient_size,
                :patient_weight,
                :study_date,
                :study_time,
                :accession_number,
                :study_id,
                :study_description,
                :modality,
                :series_number,
                :series_date,
                :series_time,
                :series_description,
                :body_part_examined,
                :protocol_name,
                :series_related_instances,
                :created_time,
                :updated_time
            ) ON DUPLICATE KEY UPDATE
                patient_id = VALUES(patient_id),
                study_uid_hash = VALUES(study_uid_hash),
                series_uid_hash = VALUES(series_uid_hash),
                study_date_origin = VALUES(study_date_origin),
                patient_name = VALUES(patient_name),
                patient_sex = VALUES(patient_sex),
                patient_birth_date = VALUES(patient_birth_date),
                patient_birth_time = VALUES(patient_birth_time),
                patient_age = VALUES(patient_age),
                patient_size = VALUES(patient_size),
                patient_weight = VALUES(patient_weight),
                study_date = VALUES(study_date),
                study_time = VALUES(study_time),
                accession_number = VALUES(accession_number),
                study_id = VALUES(study_id),
                study_description = VALUES(study_description),
                modality = VALUES(modality),
                series_number = VALUES(series_number),
                series_date = VALUES(series_date),
                series_time = VALUES(series_time),
                series_description = VALUES(series_description),
                body_part_examined = VALUES(body_part_examined),
                protocol_name = VALUES(protocol_name),
                series_related_instances = VALUES(series_related_instances),
                updated_time = VALUES(updated_time)
        "#;

        
        conn.exec_drop(
            query,
            params! {
                "tenant_id" => &state_meta.tenant_id,
                "patient_id" => &state_meta.patient_id,
                "study_uid" => &state_meta.study_uid,
                "series_uid" => &state_meta.series_uid,
                "study_uid_hash" => &state_meta.study_uid_hash,
                "series_uid_hash" => &state_meta.series_uid_hash,
                "study_date_origin" => &state_meta.study_date_origin,
                "patient_name" => &state_meta.patient_name,
                "patient_sex" => &state_meta.patient_sex,
                "patient_birth_date" => &state_meta.patient_birth_date,
                "patient_birth_time" => &state_meta.patient_birth_time,
                "patient_age" => &state_meta.patient_age,
                "patient_size" => &state_meta.patient_size,
                "patient_weight" => &state_meta.patient_weight,
                "study_date" => &state_meta.study_date,
                "study_time" => &state_meta.study_time,
                "accession_number" => &state_meta.accession_number,
                "study_id" => &state_meta.study_id,
                "study_description" => &state_meta.study_description,
                "modality" => &state_meta.modality,
                "series_number" => &state_meta.series_number,
                "series_date" => &state_meta.series_date,
                "series_time" => &state_meta.series_time,
                "series_description" => &state_meta.series_description,
                "body_part_examined" => &state_meta.body_part_examined,
                "protocol_name" => &state_meta.protocol_name,
                "series_related_instances" => &state_meta.series_related_instances,
                "created_time" => &state_meta.created_time,
                "updated_time" => &state_meta.updated_time,
            },
        )
        .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;

        Ok(())
    }

    async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> {
        if state_meta_list.is_empty() {
            return Ok(());
        }

        
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

       
        conn.query_drop("START TRANSACTION")
            .map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;

       
        let query = r#"
            INSERT INTO dicom_state_meta (
                tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            ) VALUES (
                :tenant_id,
                :patient_id,
                :study_uid,
                :series_uid,
                :study_uid_hash,
                :series_uid_hash,
                :study_date_origin,
                :patient_name,
                :patient_sex,
                :patient_birth_date,
                :patient_birth_time,
                :patient_age,
                :patient_size,
                :patient_weight,
                :study_date,
                :study_time,
                :accession_number,
                :study_id,
                :study_description,
                :modality,
                :series_number,
                :series_date,
                :series_time,
                :series_description,
                :body_part_examined,
                :protocol_name,
                :series_related_instances,
                :created_time,
                :updated_time
            ) ON DUPLICATE KEY UPDATE
                patient_id = VALUES(patient_id),
                study_uid_hash = VALUES(study_uid_hash),
                series_uid_hash = VALUES(series_uid_hash),
                study_date_origin = VALUES(study_date_origin),
                patient_name = VALUES(patient_name),
                patient_sex = VALUES(patient_sex),
                patient_birth_date = VALUES(patient_birth_date),
                patient_birth_time = VALUES(patient_birth_time),
                patient_age = VALUES(patient_age),
                patient_size = VALUES(patient_size),
                patient_weight = VALUES(patient_weight),
                study_date = VALUES(study_date),
                study_time = VALUES(study_time),
                accession_number = VALUES(accession_number),
                study_id = VALUES(study_id),
                study_description = VALUES(study_description),
                modality = VALUES(modality),
                series_number = VALUES(series_number),
                series_date = VALUES(series_date),
                series_time = VALUES(series_time),
                series_description = VALUES(series_description),
                body_part_examined = VALUES(body_part_examined),
                protocol_name = VALUES(protocol_name),
                series_related_instances = VALUES(series_related_instances),
                updated_time = VALUES(updated_time)
        "#;

        
        for state_meta in state_meta_list {
            let result = conn.exec_drop(
                query,
                params! {
                    "tenant_id" => &state_meta.tenant_id,
                    "patient_id" => &state_meta.patient_id,
                    "study_uid" => &state_meta.study_uid,
                    "series_uid" => &state_meta.series_uid,
                    "study_uid_hash" => &state_meta.study_uid_hash,
                    "series_uid_hash" => &state_meta.series_uid_hash,
                    "study_date_origin" => &state_meta.study_date_origin,
                    "patient_name" => &state_meta.patient_name,
                    "patient_sex" => &state_meta.patient_sex,
                    "patient_birth_date" => &state_meta.patient_birth_date,
                    "patient_birth_time" => &state_meta.patient_birth_time,
                    "patient_age" => &state_meta.patient_age,
                    "patient_size" => &state_meta.patient_size,
                    "patient_weight" => &state_meta.patient_weight,
                    "study_date" => &state_meta.study_date,
                    "study_time" => &state_meta.study_time,
                    "accession_number" => &state_meta.accession_number,
                    "study_id" => &state_meta.study_id,
                    "study_description" => &state_meta.study_description,
                    "modality" => &state_meta.modality,
                    "series_number" => &state_meta.series_number,
                    "series_date" => &state_meta.series_date,
                    "series_time" => &state_meta.series_time,
                    "series_description" => &state_meta.series_description,
                    "body_part_examined" => &state_meta.body_part_examined,
                    "protocol_name" => &state_meta.protocol_name,
                    "series_related_instances" => &state_meta.series_related_instances,
                    "created_time" => &state_meta.created_time,
                    "updated_time" => &state_meta.updated_time,
                },
            );

           
            if let Err(e) = result {
                conn.query_drop("ROLLBACK").map_err(|rollback_err| {
                    DbError::DatabaseError(format!(
                        "Failed to rollback transaction after error {}: {}",
                        e, rollback_err
                    ))
                })?;

                return Err(DbError::DatabaseError(format!(
                    "Failed to execute query for state meta: {}",
                    e
                )));
            }
        }

        
        conn.query_drop("COMMIT")
            .map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;

        Ok(())
    }

    async fn save_json_list(
        &self,
        json_meta_list: &[DicomJsonMeta],
    ) -> std::result::Result<(), DbError> {
        if json_meta_list.is_empty() {
            return Ok(());
        }

        
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

       
        conn.query_drop("START TRANSACTION")
            .map_err(|e| DbError::DatabaseError(format!("Failed to start transaction: {}", e)))?;

      
        let query = r#"
            INSERT INTO dicom_json_meta (
                tenant_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                created_time,
                flag_time,
                json_status,
                retry_times
            ) VALUES (
                :tenant_id,
                :study_uid,
                :series_uid,
                :study_uid_hash,
                :series_uid_hash,
                :study_date_origin,
                :created_time,
                :flag_time,
                :json_status,
                :retry_times
            ) ON DUPLICATE KEY UPDATE
                study_uid_hash = VALUES(study_uid_hash),
                series_uid_hash = VALUES(series_uid_hash),
                study_date_origin = VALUES(study_date_origin),
                created_time = VALUES(created_time),
                flag_time = VALUES(flag_time),
                json_status = VALUES(json_status),
                retry_times = VALUES(retry_times)
        "#;

        
        for json_meta in json_meta_list {
            let result = conn.exec_drop(
                query,
                params! {
                    "tenant_id" => &json_meta.tenant_id,
                    "study_uid" => &json_meta.study_uid,
                    "series_uid" => &json_meta.series_uid,
                    "study_uid_hash" => &json_meta.study_uid_hash,
                    "series_uid_hash" => &json_meta.series_uid_hash,
                    "study_date_origin" => &json_meta.study_date_origin,
                    "created_time" => &json_meta.created_time,
                    "flag_time" => &json_meta.flag_time,
                    "json_status" => &json_meta.json_status,
                    "retry_times" => &json_meta.retry_times,
                },
            );

            
            if let Err(e) = result {
                conn.query_drop("ROLLBACK").map_err(|rollback_err| {
                    DbError::DatabaseError(format!(
                        "Failed to rollback transaction after error {}: {}",
                        e, rollback_err
                    ))
                })?;

                return Err(DbError::DatabaseError(format!(
                    "Failed to execute query for json meta: {}",
                    e
                )));
            }
        }

       
        conn.query_drop("COMMIT")
            .map_err(|e| DbError::DatabaseError(format!("Failed to commit transaction: {}", e)))?;

        Ok(())
    }

    async fn get_state_metaes(
        &self,
        tenant_id: &str,
        study_uid: &str,
    ) -> Result<Vec<DicomStateMeta>, DbError> {
       
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

        
        let query = r#"
            SELECT
                tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            FROM dicom_state_meta
            WHERE tenant_id = :tenant_id AND study_uid = :study_uid
        "#;

       
        let result: Vec<DicomStateMeta> = conn
            .exec_map(
                query,
                params! {
                    "tenant_id" => tenant_id,
                    "study_uid" => study_uid,
                },
                |row: mysql::Row| {
                    
                    DicomStateMeta {
                        tenant_id: row.get("tenant_id").unwrap_or_default(),
                        patient_id: row.get("patient_id").unwrap_or_default(),
                        study_uid: row.get("study_uid").unwrap_or_default(),
                        series_uid: row.get("series_uid").unwrap_or_default(),
                        study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
                        series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
                        study_date_origin: row.get("study_date_origin").unwrap_or_default(),
                        patient_name: row.get("patient_name").unwrap_or_default(),
                        patient_sex: row.get("patient_sex").unwrap_or_default(),
                        patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
                        patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
                        patient_age: row.get("patient_age").unwrap_or_default(),
                        patient_size: row.get("patient_size").unwrap_or_default(),
                        patient_weight: row.get("patient_weight").unwrap_or_default(),
                        study_date: row.get("study_date").unwrap_or_default(),
                        study_time: row.get("study_time").unwrap_or_default(),
                        accession_number: row.get("accession_number").unwrap_or_default(),
                        study_id: row.get("study_id").unwrap_or_default(),
                        study_description: row.get("study_description").unwrap_or_default(),
                        modality: row.get("modality").unwrap_or_default(),
                        series_number: row.get("series_number").unwrap_or_default(),
                        series_date: row.get("series_date").unwrap_or_default(),
                        series_time: row.get("series_time").unwrap_or_default(),
                        series_description: row.get("series_description").unwrap_or_default(),
                        body_part_examined: row.get("body_part_examined").unwrap_or_default(),
                        protocol_name: row.get("protocol_name").unwrap_or_default(),
                        series_related_instances: row
                            .get("series_related_instances")
                            .unwrap_or_default(),
                        created_time: row.get("created_time").unwrap_or_default(),
                        updated_time: row.get("updated_time").unwrap_or_default(),
                    }
                },
            )
            .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;

        Ok(result)
    }

    async fn get_json_metaes(
        &self,
        end_time: chrono::NaiveDateTime,
    ) -> std::result::Result<Vec<DicomStateMeta>, DbError> {
        
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

        
        let query = r#"
            Select tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            From (SELECT dsm.*
                  FROM dicom_state_meta dsm
                           LEFT JOIN dicom_json_meta djm
                                     ON dsm.tenant_id = djm.tenant_id
                                         AND dsm.study_uid = djm.study_uid
                                         AND dsm.series_uid = djm.series_uid
                  WHERE djm.tenant_id IS NULL  AND dsm.updated_time < ?
                  UNION ALL
                  SELECT dsm.*
                  FROM dicom_state_meta dsm
                           INNER JOIN dicom_json_meta djm
                                      ON dsm.tenant_id = djm.tenant_id
                                          AND dsm.study_uid = djm.study_uid
                                          AND dsm.series_uid = djm.series_uid
                  WHERE dsm.updated_time != djm.flag_time
                   AND  dsm.updated_time < ?
                  ) AS t
                  order by t.updated_time asc limit 10;
        "#;

     
        let result: Vec<DicomStateMeta> = conn
            .exec_map(query, params! { end_time,end_time }, |row: mysql::Row| {
              
                DicomStateMeta {
                    tenant_id: row.get("tenant_id").unwrap_or_default(),
                    patient_id: row.get("patient_id").unwrap_or_default(),
                    study_uid: row.get("study_uid").unwrap_or_default(),
                    series_uid: row.get("series_uid").unwrap_or_default(),
                    study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
                    series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
                    study_date_origin: row.get("study_date_origin").unwrap_or_default(),
                    patient_name: row.get("patient_name").unwrap_or_default(),
                    patient_sex: row.get("patient_sex").unwrap_or_default(),
                    patient_birth_date: row.get("patient_birth_date").unwrap_or_default(),
                    patient_birth_time: row.get("patient_birth_time").unwrap_or_default(),
                    patient_age: row.get("patient_age").unwrap_or_default(),
                    patient_size: row.get("patient_size").unwrap_or_default(),
                    patient_weight: row.get("patient_weight").unwrap_or_default(),
                    study_date: row.get("study_date").unwrap_or_default(),
                    study_time: row.get("study_time").unwrap_or_default(),
                    accession_number: row.get("accession_number").unwrap_or_default(),
                    study_id: row.get("study_id").unwrap_or_default(),
                    study_description: row.get("study_description").unwrap_or_default(),
                    modality: row.get("modality").unwrap_or_default(),
                    series_number: row.get("series_number").unwrap_or_default(),
                    series_date: row.get("series_date").unwrap_or_default(),
                    series_time: row.get("series_time").unwrap_or_default(),
                    series_description: row.get("series_description").unwrap_or_default(),
                    body_part_examined: row.get("body_part_examined").unwrap_or_default(),
                    protocol_name: row.get("protocol_name").unwrap_or_default(),
                    series_related_instances: row
                        .get("series_related_instances")
                        .unwrap_or_default(),
                    created_time: row.get("created_time").unwrap_or_default(),
                    updated_time: row.get("updated_time").unwrap_or_default(),
                }
            })
            .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?;

        Ok(result)
    }

    async fn get_json_meta(
        &self,
        tenant_id: &str,
        study_uid: &str,
        series_uid: &str,
    ) -> std::result::Result<DicomJsonMeta, DbError> {
       
        let mut conn = mysql::Conn::new(self.db_connection_string.as_str())
            .map_err(|e| DbError::DatabaseError(format!("Failed to connect to MySQL: {}", e)))?;

      
        let query = r#"
        SELECT
            tenant_id,
            study_uid,
            series_uid,
            study_uid_hash,
            series_uid_hash,
            study_date_origin,
            created_time,
            flag_time,
            json_status,
            retry_times
        FROM dicom_json_meta
        WHERE series_uid = :series_uid and tenant_id = :tenant_id and study_uid = :study_uid
    "#;

       
        let result: Option<DicomJsonMeta> = conn
            .exec_first(
                query,
                params! {
                    "series_uid" => series_uid,
                    "tenant_id" => tenant_id,
                    "study_uid" => study_uid,
                },
            )
            .map_err(|e| DbError::DatabaseError(format!("Failed to execute query: {}", e)))?
            .map(|row: mysql::Row| DicomJsonMeta {
                tenant_id: row.get("tenant_id").unwrap_or_default(),
                study_uid: row.get("study_uid").unwrap_or_default(),
                series_uid: row.get("series_uid").unwrap_or_default(),
                study_uid_hash: row.get("study_uid_hash").unwrap_or_default(),
                series_uid_hash: row.get("series_uid_hash").unwrap_or_default(),
                study_date_origin: row.get("study_date_origin").unwrap_or_default(),
                created_time: row.get("created_time").unwrap_or_default(),
                flag_time: row.get("flag_time").unwrap_or_default(),
                json_status: row.get("json_status").unwrap_or_default(),
                retry_times: row.get("retry_times").unwrap_or_default(),
            });

        
        match result {
            Some(json_meta) => Ok(json_meta),
            None => Err(DbError::DatabaseError(format!(
                "DicomJsonMeta with series_uid {} not found",
                series_uid
            ))),
        }
    }
}

3.2 PostgresSQL 的实现

PostgreSQL 的实现代码如下:

use crate::dicom_dbprovider::{DbError, DbProvider};
use crate::dicom_meta::{DicomJsonMeta, DicomStateMeta};
use async_trait::async_trait;
use tokio_postgres::{Client, NoTls};
#[derive(Debug, Clone)]
pub struct PgDbProvider {
    db_connection_string: String,
}

impl PgDbProvider {
    pub fn new(database_url: String) -> Self {
        Self {
            db_connection_string: database_url,
        }
    }
    async fn make_client(&self) -> Result<Client, DbError> {
        let (client, connection) =
            tokio_postgres::connect(self.db_connection_string.as_str(), NoTls)
                .await
                .map_err(|e| DbError::DatabaseError(e.to_string()))?;
        // Spawn the connection processor
        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("Database connection error: {}", e);
            }
        });
        Ok(client)
    }
}

#[async_trait]
impl DbProvider for PgDbProvider {
    async fn save_state_info(&self, state_meta: &DicomStateMeta) -> Result<(), DbError> {
        let client = self.make_client().await?;
        let statement = client
                .prepare(
                    "INSERT INTO dicom_state_meta (
                       tenant_id,
                       patient_id,
                       study_uid,
                       series_uid,
                       study_uid_hash,
                       series_uid_hash,
                       study_date_origin,
                       patient_name,
                       patient_sex,
                       patient_birth_date,
                       patient_birth_time,
                       patient_age,
                       patient_size,
                       patient_weight,
                       study_date,
                       study_time,
                       accession_number,
                       study_id,
                       study_description,
                       modality,
                       series_number,
                       series_date,
                       series_time,
                       series_description,
                       body_part_examined,
                       protocol_name,
                       series_related_instances,
                       created_time,
                       updated_time
                   ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
                   ON CONFLICT (tenant_id, study_uid, series_uid)
                   DO UPDATE SET
                       patient_id = EXCLUDED.patient_id,
                       study_uid_hash = EXCLUDED.study_uid_hash,
                       series_uid_hash = EXCLUDED.series_uid_hash,
                       study_date_origin = EXCLUDED.study_date_origin,
                       patient_name = EXCLUDED.patient_name,
                       patient_sex = EXCLUDED.patient_sex,
                       patient_birth_date = EXCLUDED.patient_birth_date,
                       patient_birth_time = EXCLUDED.patient_birth_time,
                       patient_age = EXCLUDED.patient_age,
                       patient_size = EXCLUDED.patient_size,
                       patient_weight = EXCLUDED.patient_weight,
                       study_date = EXCLUDED.study_date,
                       study_time = EXCLUDED.study_time,
                       accession_number = EXCLUDED.accession_number,
                       study_id = EXCLUDED.study_id,
                       study_description = EXCLUDED.study_description,
                       modality = EXCLUDED.modality,
                       series_number = EXCLUDED.series_number,
                       series_date = EXCLUDED.series_date,
                       series_time = EXCLUDED.series_time,
                       series_description = EXCLUDED.series_description,
                       body_part_examined = EXCLUDED.body_part_examined,
                       protocol_name = EXCLUDED.protocol_name,
                       series_related_instances = EXCLUDED.series_related_instances,
                       updated_time = EXCLUDED.updated_time"
                )
                .await
                .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        client
            .execute(
                &statement,
                &[
                    &state_meta.tenant_id,
                    &state_meta.patient_id,
                    &state_meta.study_uid,
                    &state_meta.series_uid,
                    &state_meta.study_uid_hash,
                    &state_meta.series_uid_hash,
                    &state_meta.study_date_origin,
                    &state_meta.patient_name,
                    &state_meta.patient_sex,
                    &state_meta.patient_birth_date,
                    &state_meta.patient_birth_time,
                    &state_meta.patient_age,
                    &state_meta.patient_size,
                    &state_meta.patient_weight,
                    &state_meta.study_date,
                    &state_meta.study_time,
                    &state_meta.accession_number,
                    &state_meta.study_id,
                    &state_meta.study_description,
                    &state_meta.modality,
                    &state_meta.series_number,
                    &state_meta.series_date,
                    &state_meta.series_time,
                    &state_meta.series_description,
                    &state_meta.body_part_examined,
                    &state_meta.protocol_name,
                    &state_meta.series_related_instances,
                    &state_meta.created_time,
                    &state_meta.updated_time,
                ],
            )
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        Ok(())
    }

    async fn save_state_list(&self, state_meta_list: &[DicomStateMeta]) -> Result<(), DbError> {
        // 使用事务确保所有数据要么全部保存成功,要么全部失败
        let mut client = self.make_client().await?;
        let transaction = client.transaction().await.map_err(|e| {
            println!("Failed to start transaction: {}", e);
            DbError::DatabaseError(e.to_string())
        })?;
        println!(
            "Starting transaction to save state meta list of length {}",
            state_meta_list.len()
        );

        let statement = transaction
        .prepare(
            "INSERT INTO dicom_state_meta (
                tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
            ON CONFLICT (tenant_id, study_uid, series_uid)
            DO UPDATE SET
                patient_id = EXCLUDED.patient_id,
                study_uid_hash = EXCLUDED.study_uid_hash,
                series_uid_hash = EXCLUDED.series_uid_hash,
                study_date_origin = EXCLUDED.study_date_origin,
                patient_name = EXCLUDED.patient_name,
                patient_sex = EXCLUDED.patient_sex,
                patient_birth_date = EXCLUDED.patient_birth_date,
                patient_birth_time = EXCLUDED.patient_birth_time,
                patient_age = EXCLUDED.patient_age,
                patient_size = EXCLUDED.patient_size,
                patient_weight = EXCLUDED.patient_weight,
                study_date = EXCLUDED.study_date,
                study_time = EXCLUDED.study_time,
                accession_number = EXCLUDED.accession_number,
                study_id = EXCLUDED.study_id,
                study_description = EXCLUDED.study_description,
                modality = EXCLUDED.modality,
                series_number = EXCLUDED.series_number,
                series_date = EXCLUDED.series_date,
                series_time = EXCLUDED.series_time,
                series_description = EXCLUDED.series_description,
                body_part_examined = EXCLUDED.body_part_examined,
                protocol_name = EXCLUDED.protocol_name,
                series_related_instances = EXCLUDED.series_related_instances,
                updated_time = EXCLUDED.updated_time"
        )
        .await
        .map_err(|e| {
            println!("Error transaction.prepare: {:?}", e);
            DbError::DatabaseError(e.to_string())
        })?;

        // 遍历所有 DicomStateMeta 对象并执行插入操作
        for state_meta in state_meta_list {
            transaction
                .execute(
                    &statement,
                    &[
                        &state_meta.tenant_id,
                        &state_meta.patient_id,
                        &state_meta.study_uid,
                        &state_meta.series_uid,
                        &state_meta.study_uid_hash,
                        &state_meta.series_uid_hash,
                        &state_meta.study_date_origin,
                        &state_meta.patient_name,
                        &state_meta.patient_sex,
                        &state_meta.patient_birth_date,
                        &state_meta.patient_birth_time,
                        &state_meta.patient_age,
                        &state_meta.patient_size,
                        &state_meta.patient_weight,
                        &state_meta.study_date,
                        &state_meta.study_time,
                        &state_meta.accession_number,
                        &state_meta.study_id,
                        &state_meta.study_description,
                        &state_meta.modality,
                        &state_meta.series_number,
                        &state_meta.series_date,
                        &state_meta.series_time,
                        &state_meta.series_description,
                        &state_meta.body_part_examined,
                        &state_meta.protocol_name,
                        &state_meta.series_related_instances,
                        &state_meta.created_time,
                        &state_meta.updated_time,
                    ],
                )
                .await
                .map_err(|e| {
                    println!("Error transaction.execute: {:?}", e);
                    DbError::DatabaseError(e.to_string())
                })?;
        }

        // 提交事务
        transaction.commit().await.map_err(|e| {
            println!("Error transaction.commit: {:?}", e);
            DbError::DatabaseError(e.to_string())
        })?;

        Ok(())
    }

    async fn save_json_list(&self, json_meta_list: &[DicomJsonMeta]) -> Result<(), DbError> {
        if json_meta_list.is_empty() {
            return Ok(());
        }

        let mut client = self.make_client().await?;
        let transaction = client.transaction().await.map_err(|e| {
            println!("Failed to start transaction: {}", e);
            DbError::DatabaseError(e.to_string())
        })?;

        println!(
            "Starting transaction to save json meta list of length {}",
            json_meta_list.len()
        );

        let statement = transaction
            .prepare(
                "INSERT INTO dicom_json_meta (
                tenant_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                flag_time,
                created_time,json_status,retry_times

            ) VALUES ($1, $2, $3, $4, $5, $6, $7,$8,$9,$10)
            ON CONFLICT (tenant_id, study_uid, series_uid)
            DO UPDATE SET
                study_uid_hash = EXCLUDED.study_uid_hash,
                series_uid_hash = EXCLUDED.series_uid_hash,
                study_date_origin = EXCLUDED.study_date_origin,
                flag_time = EXCLUDED.flag_time,
                created_time = EXCLUDED.created_time,
                json_status = EXCLUDED.json_status,
                retry_times = EXCLUDED.retry_times
                ",
            )
            .await
            .map_err(|e| {
                println!("Error transaction.prepare: {:?}", e);
                DbError::DatabaseError(e.to_string())
            })?;

        // 遍历所有 DicomJsonMeta 对象并执行插入操作
        for json_meta in json_meta_list {
            transaction
                .execute(
                    &statement,
                    &[
                        &json_meta.tenant_id,
                        &json_meta.study_uid,
                        &json_meta.series_uid,
                        &json_meta.study_uid_hash,
                        &json_meta.series_uid_hash,
                        &json_meta.study_date_origin,
                        &json_meta.flag_time,
                        &json_meta.created_time,
                        &json_meta.json_status,
                        &json_meta.retry_times,
                    ],
                )
                .await
                .map_err(|e| {
                    println!("Error transaction.execute: {:?}", e);
                    DbError::DatabaseError(e.to_string())
                })?;
        }

        // 提交事务
        transaction.commit().await.map_err(|e| {
            println!("Error transaction.commit: {:?}", e);
            DbError::DatabaseError(e.to_string())
        })?;

        Ok(())
    }

    async fn get_state_metaes(
        &self,
        tenant_id: &str,
        study_uid: &str,
    ) -> Result<Vec<DicomStateMeta>, DbError> {
        let client = self.make_client().await?;
        let statement = client
            .prepare(
                "SELECT
                tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
            FROM dicom_state_meta
            WHERE tenant_id = $1 AND study_uid = $2",
            )
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        let rows = client
            .query(&statement, &[&tenant_id, &study_uid])
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        let mut result = Vec::with_capacity(rows.len());
        for row in rows {
            let state_meta = DicomStateMeta {
                tenant_id: row.get(0),
                patient_id: row.get(1),
                study_uid: row.get(2),
                series_uid: row.get(3),
                study_uid_hash: row.get(4),
                series_uid_hash: row.get(5),
                study_date_origin: row.get(6),
                patient_name: row.get(7),
                patient_sex: row.get(8),
                patient_birth_date: row.get(9),
                patient_birth_time: row.get(10),
                patient_age: row.get(11),
                patient_size: row.get(12),
                patient_weight: row.get(13),
                study_date: row.get(14),
                study_time: row.get(15),
                accession_number: row.get(16),
                study_id: row.get(17),
                study_description: row.get(18),
                modality: row.get(19),
                series_number: row.get(20),
                series_date: row.get(21),
                series_time: row.get(22),
                series_description: row.get(23),
                body_part_examined: row.get(24),
                protocol_name: row.get(25),
                series_related_instances: row.get(26),
                created_time: row.get(27),
                updated_time: row.get(28),
            };
            result.push(state_meta);
        }

        Ok(result)
    }

    async fn get_json_metaes(
        &self,
        end_time: chrono::NaiveDateTime,
    ) -> Result<Vec<DicomStateMeta>, DbError> {
        let client = self.make_client().await?;
        let statement = client
            .prepare(
                " Select tenant_id,
                patient_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                patient_name,
                patient_sex,
                patient_birth_date,
                patient_birth_time,
                patient_age,
                patient_size,
                patient_weight,
                study_date,
                study_time,
                accession_number,
                study_id,
                study_description,
                modality,
                series_number,
                series_date,
                series_time,
                series_description,
                body_part_examined,
                protocol_name,
                series_related_instances,
                created_time,
                updated_time
                From (SELECT dsm.*
                      FROM dicom_state_meta dsm
                               LEFT JOIN dicom_json_meta djm
                                         ON dsm.tenant_id = djm.tenant_id
                                             AND dsm.study_uid = djm.study_uid
                                             AND dsm.series_uid = djm.series_uid
                      WHERE djm.tenant_id IS NULL AND dsm.updated_time <  $1
                      UNION ALL
                      SELECT dsm.*
                      FROM dicom_state_meta dsm
                               INNER JOIN dicom_json_meta djm
                                          ON dsm.tenant_id = djm.tenant_id
                                              AND dsm.study_uid = djm.study_uid
                                              AND dsm.series_uid = djm.series_uid
                      WHERE dsm.updated_time != djm.flag_time AND dsm.updated_time <  $1
                      ) AS t
                order by t.updated_time asc limit 10;",
            )
            .await
            .map_err(|e| {
                println!("{:?}", e);
                DbError::DatabaseError(e.to_string())
            })?;

        let rows = client
            .query(&statement, &[&end_time])
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        let mut result = Vec::with_capacity(rows.len());
        for row in rows {
            let state_meta = DicomStateMeta {
                tenant_id: row.get(0),
                patient_id: row.get(1),
                study_uid: row.get(2),
                series_uid: row.get(3),
                study_uid_hash: row.get(4),
                series_uid_hash: row.get(5),
                study_date_origin: row.get(6),
                patient_name: row.get(7),
                patient_sex: row.get(8),
                patient_birth_date: row.get(9),
                patient_birth_time: row.get(10),
                patient_age: row.get(11),
                patient_size: row.get(12),
                patient_weight: row.get(13),
                study_date: row.get(14),
                study_time: row.get(15),
                accession_number: row.get(16),
                study_id: row.get(17),
                study_description: row.get(18),
                modality: row.get(19),
                series_number: row.get(20),
                series_date: row.get(21),
                series_time: row.get(22),
                series_description: row.get(23),
                body_part_examined: row.get(24),
                protocol_name: row.get(25),
                series_related_instances: row.get(26),
                created_time: row.get(27),
                updated_time: row.get(28),
            };
            result.push(state_meta);
        }

        Ok(result)
    }

    async fn get_json_meta(
        &self,
        tenant_id: &str,
        study_uid: &str,
        series_uid: &str,
    ) -> Result<DicomJsonMeta, DbError> {
        let client = self.make_client().await?;
        let statement = client
            .prepare(
                "SELECT
                tenant_id,
                study_uid,
                series_uid,
                study_uid_hash,
                series_uid_hash,
                study_date_origin,
                flag_time,
                created_time,
                json_status,
                retry_times
            FROM dicom_json_meta
            WHERE series_uid = $1  and tenant_id  = $2 and study_uid = $3 ",
            )
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        let rows = client
            .query(&statement, &[&series_uid, &tenant_id, &study_uid])
            .await
            .map_err(|e| DbError::DatabaseError(e.to_string()))?;

        if rows.is_empty() {
            return Err(DbError::RecordNotExists(format!(
                "DicomJsonMeta with series_uid {} not found",
                series_uid
            )));
        }

        let row = &rows[0];
        let json_meta = DicomJsonMeta {
            tenant_id: row.get(0),
            study_uid: row.get(1),
            series_uid: row.get(2),
            study_uid_hash: row.get(3),
            series_uid_hash: row.get(4),
            study_date_origin: row.get(5),
            flag_time: row.get(6),
            created_time: row.get(7),
            json_status: row.get(8),
            retry_times: row.get(9),
        };

        Ok(json_meta)
    }
}

好了. 现在, 我们已经完成了数据库接口的定义及实现.下一节, 我们将开始编写我们的服务.

Waiting for next

GoTo Summary : how-to-build-cloud-dicom

GoTo Database-Sign : how-to-build-cloud-dicom:Part 1 Database Design