Overview

WADO-StoreSCP is a DICOM Storage SCP (C-STORE) service implementation written in Rust that can receive DICOM files sent from other DICOM devices. This project is part of the DICOM-rs project.

Feature Set

  • Supports DICOM C-STORE protocol
  • Provides both synchronous and asynchronous operation modes
  • Supports multiple transfer syntaxes
  • Automatically transcodes unsupported transfer syntaxes
  • Saves received DICOM files to local storage
  • Sends metadata information via Kafka
  • Supports multi-tenant (hospital/institution) environments
  • Includes certificate validation mechanisms

System Architecture

Core Components

  1. Main Program main.rs

    • Application entry point
    • Command-line argument parsing
    • Certificate validation
    • Operation mode selection (synchronous/asynchronous)
  2. DICOM File Handler dicom_file_handler.rs

    • Process received DICOM data
    • Extract metadata
    • Transcoding processing
    • File storage
  3. Asynchronous Storage Service store_async.rs

    • DICOM C-STORE implementation in asynchronous mode
    • Handle client connections and data transfer
  4. Synchronous Storage Service store_sync.rs

    • DICOM C-STORE implementation in synchronous mode
    • Handle client connections and data transfer
  5. Transfer Syntax Management transfer.rs

    • Define supported abstract syntax lists

Configuration

The application is configured through configuration files with main configuration items including:

  • Server port and AE Title
  • Storage path configuration
  • Kafka message queue configuration
  • Supported transfer syntax list
  • Transcoding target transfer syntax
  • Certificate validation configuration

WADO-StoreSCP project is a storescp service implementation based on the DICOM-rs library.

main.rs

extern crate core;

use clap::Parser;
use common::license_manager::validate_client_certificate;
use common::server_config;
use common::utils::{get_logger, setup_logging};
use dicom_core::{dicom_value, DataElement, VR};
use dicom_dictionary_std::tags;
use dicom_encoding::{snafu};
use dicom_object::{InMemDicomObject, StandardDataDictionary};
use slog::{error, info, o};
use snafu::Report;
use std::{
    net::{Ipv4Addr, SocketAddrV4},
};

mod dicom_file_handler;
mod store_async;
mod store_sync;
mod transfer;

use store_async::run_store_async;
use store_sync::run_store_sync;

/// DICOM C-STORE SCP
#[derive(Debug, Parser)]
#[command(version)]
struct App {
    /// Verbose mode
    #[arg(short = 'v', long = "verbose")]
    verbose: bool,
    /// Calling Application Entity title
    #[arg(long = "calling-ae-title", default_value = "STORE-SCP")]
    calling_ae_title: String,
    /// Enforce max pdu length
    #[arg(short = 's', long = "strict")]
    strict: bool,
    /// Only accept native/uncompressed transfer syntaxes
    #[arg(long)]
    uncompressed_only: bool,
    /// Accept unknown SOP classes
    #[arg(long)]
    promiscuous: bool,
    /// Maximum PDU length
    #[arg(
        short = 'm',
        long = "max-pdu-length",
        default_value = "16384",
        value_parser(clap::value_parser!(u32).range(4096..=131_072))
    )]
    max_pdu_length: u32,

    /// Which port to listen on
    #[arg(short, default_value = "11111")]
    port: u16,
    /// Run in non-blocking mode (spins up an async task to handle each incoming stream)
    #[arg(short, long, default_value = "true")]
    non_blocking: bool,
}

fn create_cstore_response(
    message_id: u16,
    sop_class_uid: &str,
    sop_instance_uid: &str,
) -> InMemDicomObject<StandardDataDictionary> {
    InMemDicomObject::command_from_element_iter([
        DataElement::new(
            tags::AFFECTED_SOP_CLASS_UID,
            VR::UI,
            dicom_value!(Str, sop_class_uid),
        ),
        DataElement::new(tags::COMMAND_FIELD, VR::US, dicom_value!(U16, [0x8001])),
        DataElement::new(
            tags::MESSAGE_ID_BEING_RESPONDED_TO,
            VR::US,
            dicom_value!(U16, [message_id]),
        ),
        DataElement::new(
            tags::COMMAND_DATA_SET_TYPE,
            VR::US,
            dicom_value!(U16, [0x0101]),
        ),
        DataElement::new(tags::STATUS, VR::US, dicom_value!(U16, [0x0000])),
        DataElement::new(
            tags::AFFECTED_SOP_INSTANCE_UID,
            VR::UI,
            dicom_value!(Str, sop_instance_uid),
        ),
    ])
}

fn create_cecho_response(message_id: u16) -> InMemDicomObject<StandardDataDictionary> {
    InMemDicomObject::command_from_element_iter([
        DataElement::new(tags::COMMAND_FIELD, VR::US, dicom_value!(U16, [0x8030])),
        DataElement::new(
            tags::MESSAGE_ID_BEING_RESPONDED_TO,
            VR::US,
            dicom_value!(U16, [message_id]),
        ),
        DataElement::new(
            tags::COMMAND_DATA_SET_TYPE,
            VR::US,
            dicom_value!(U16, [0x0101]),
        ),
        DataElement::new(tags::STATUS, VR::US, dicom_value!(U16, [0x0000])),
    ])
}

#[tokio::main]
async fn main() {
    let log = setup_logging("dicom-store-scp");

    let config = server_config::load_config();
    let config = match config {
        Ok(config) => config,
        Err(e) => {
            println!("{:?}", e);
            std::process::exit(-2);
        }
    };

    let client_info = match validate_client_certificate().await {
        Ok(client_info) => {
            info!(
                log,
                "Client Certificate Validated, Client ID: {:?}, HashCode:{:?}",
                client_info.0,
                client_info.1
            );
            client_info
        }
        Err(e) => {
            let error_string = format!("{}", e);
            info!(
                log,
                "Client Certificate Validation Failed: {}", error_string
            );
            std::process::exit(-2);
        }
    };
    let (client_id, hash_code) = client_info;
    // Ensure client_id and hash_code exist in the certificate
    let cert_client_id = match client_id {
        Some(id) => id,
        None => {
            info!(log, "Certificate does not contain a valid Client ID");
            std::process::exit(-2);
        }
    };

    let cert_hash_code = match hash_code {
        Some(code) => code,
        None => {
            info!(log, "Certificate does not contain a valid Hash Code");
            std::process::exit(-2);
        }
    };

    let license = match &config.dicom_license_server {
        None => {
            info!(log, "Dicom License Server Config is None");
            std::process::exit(-2);
        }
        Some(license_server) => license_server,
    };
    // Use a more secure comparison method to avoid timing attacks
    let client_id_matches = {
        let expected = &license.client_id;
        openssl::memcmp::eq(expected.as_bytes(), cert_client_id.as_bytes())
    };

    let hash_code_matches = {
        let expected = &license.license_key; // license_key actually stores the hash_code
        openssl::memcmp::eq(expected.as_bytes(), cert_hash_code.as_bytes())
    };

    if client_id_matches && hash_code_matches {
        info!(log, "License Server Validation Success");
    } else {
        info!(log, "License Server Validation Failed");
        info!(
            log,
            "Expected Client ID: {}, Certificate Client ID: {}", license.client_id, cert_client_id
        );
        info!(
            log,
            "Expected Hash Code: {}, Certificate Hash Code: {}",
            license.license_key,
            cert_hash_code
        );
        std::process::exit(-2);
    }

    let mut app = App::parse();
    let scp_config = config.dicom_store_scp;

    app.port = scp_config.port;

    app.calling_ae_title = scp_config.ae_title;

    info!(log, "License Server Validation Success");

    match app.non_blocking {
        false => {
            info!(log, "Running in synchronous mode");
            // Use existing tokio runtime
            // Can set maximum concurrent connections and other parameters
            run_async(app).await.unwrap_or_else(|e| {
                error!(log, "{:?}", e);
                std::process::exit(-2);
            });
        }
        true => {
            info!(log, "Running in non-blocking mode");
            // Create dedicated runtime for synchronous mode
            // Synchronous mode is suitable for simple deployments or debugging scenarios
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap_or_else(|e| {
                    error!(log, "Could not create tokio runtime: {}", e);
                    std::process::exit(-2);
                });

            std::thread::spawn(move || {
                rt.block_on(async {
                    run_sync(app).await.unwrap_or_else(|e| {
                        error!(log, "{:?}", e);
                        std::process::exit(-2);
                    });
                });
            })
            .join()
            .unwrap();
        }
    }
}

async fn run_async(args: App) -> Result<(), Box<dyn std::error::Error>> {
    use std::sync::Arc;
    let args = Arc::new(args);
    let rlogger = get_logger();
    let logger = rlogger.new(o!("wado-storescp"=>"run_async"));
    let listen_addr = SocketAddrV4::new(Ipv4Addr::from(0), args.port);
    let listener = tokio::net::TcpListener::bind(listen_addr).await?;
    info!(
        &logger,
        "{} listening on: tcp://{}", &args.calling_ae_title, listen_addr
    );

    loop {
        let (socket, _addr) = listener.accept().await?;
        let args = args.clone();
        let logs = logger.clone();
        tokio::task::spawn(async move {
            if let Err(e) = run_store_async(socket, &args).await {
                error!(logs, "{}", Report::from_error(e));
            }
        });
    }
}

async fn run_sync(args: App) -> Result<(), Box<dyn std::error::Error>> {
    let rlogger = get_logger();
    let logger = rlogger.new(o!("wado-storescp"=>"run_sync"));
    let listen_addr = SocketAddrV4::new(Ipv4Addr::from(0), args.port);
    let listener = std::net::TcpListener::bind(listen_addr)?;
    info!(
        &logger,
        "{} listening on: tcp://{}", &args.calling_ae_title, listen_addr
    );

    for stream in listener.incoming() {
        match stream {
            Ok(scu_stream) => {
                let tcp_logger = logger.clone();
                if let Err(e) = run_store_sync(scu_stream, &args).await {
                    error!(&tcp_logger, "{}", snafu::Report::from_error(e));
                }
            }
            Err(e) => {
                error!(&logger, "{}", snafu::Report::from_error(e));
            }
        }
    }

    Ok(())
}

dicom_file_handler.rs

Extract necessary information from received DICOM files and save to the main database. Provide data for subsequent consumer processes.

use common::dicom_utils::{get_bounder_string, get_date_value_dicom, get_tag_value};
use common::message_sender_kafka::KafkaMessagePublisher;
use common::storage_config::{hash_uid, StorageConfig};
use common::utils::get_logger;
use common::{server_config, storage_config};
use database::dicom_dbtype::{BoundedString, FixedLengthString};
use database::dicom_meta::{DicomStoreMeta, TransferStatus};
use dicom_dictionary_std::tags;
use dicom_encoding::snafu::{whatever, ResultExt, Whatever};
use dicom_encoding::TransferSyntaxIndex;
use dicom_object::{FileMetaTableBuilder, InMemDicomObject};
use dicom_pixeldata::Transcode;
use dicom_transfer_syntax_registry::TransferSyntaxRegistry;
use slog::o;
use slog::{error, info};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::LazyLock;
use uuid::Uuid;

static JS_SUPPORTED_TS: LazyLock<HashSet<String>> = LazyLock::new(|| {
    // Initialize here, can be read from config file
    // load_config is wrapped in INIT_ONCE, will not reload
    let config = server_config::load_config().unwrap();
    config
        .dicom_store_scp
        .cornerstonejs_supported_transfer_syntax
        .iter()
        .cloned()
        .collect()
});

static JS_CHANGE_TO_TS: LazyLock<String> = LazyLock::new(|| {
    // Initialize here, can be read from config file
    // load_config is wrapped in INIT_ONCE, will not reload
    let config = server_config::load_config().unwrap();
    config.dicom_store_scp.unsupported_ts_change_to.clone()
});

pub(crate) async fn process_dicom_file(
    instance_buffer: &[u8],    // DICOM file byte array or binary stream
    tenant_id: &String,        // Institution ID, or hospital ID, used to distinguish multiple hospitals
    ts: &String,               // Transfer syntax
    sop_instance_uid: &String, // Current file's SOP instance ID
    sop_class_uid: &String,    // Current file's SOP class ID
    ip: String,
    client_ae: String,
    storage_config: &StorageConfig,
) -> Result<DicomStoreMeta, Whatever> {
    let root_logger = get_logger();
    let logger = root_logger.new(o!("wado-storescp"=>"process_dicom_file"));
    let obj = InMemDicomObject::read_dataset_with_ts(
        instance_buffer,
        TransferSyntaxRegistry.get(ts).unwrap(),
    )
    .whatever_context("failed to read DICOM data object")?;
    info!(logger, "DICOM data object read successfully");
    let pat_id = match get_bounder_string::<64>(&obj, tags::PATIENT_ID) {
        Some(v) => v,
        None => {
            whatever!("Missing PatientID or PatientID value length is exceeded 64 characters")
        }
    };

    let study_uid = match get_bounder_string::<64>(&obj, tags::STUDY_INSTANCE_UID) {
        Some(v) => v,
        None => {
            whatever!("Missing StudyID or StudyID value length is exceeded 64 characters")
        }
    };

    let series_uid = match get_bounder_string::<64>(&obj, tags::SERIES_INSTANCE_UID) {
        Some(v) => v,
        None => {
            whatever!("Missing SeriesID or SeriesID value length is exceeded 64 characters")
        }
    };

    let accession_number =   get_bounder_string::<16>(&obj, tags::ACCESSION_NUMBER) ;

    let study_date = match get_date_value_dicom(&obj, tags::STUDY_DATE) {
        Some(v) => v,
        None => {
            whatever!("Missing STUDY_DATE or STUDY_DATE value is not invalid format YYYYMMDD")
        }
    };

    let frames = get_tag_value(tags::NUMBER_OF_FRAMES, &obj, 1);
    info!(
        logger,
        "TenantID:{} ,PatientID: {}, StudyUID: {},   StudyDate: {},  Frames: {}",
        tenant_id,
        pat_id,
        study_uid,
        study_date,
        frames
    );

    let file_meta = FileMetaTableBuilder::new()
        .media_storage_sop_class_uid(sop_class_uid)
        .media_storage_sop_instance_uid(sop_instance_uid)
        .transfer_syntax(ts)
        .build()
        .whatever_context("failed to build DICOM meta file information")?;
    let mut file_obj = obj.with_exact_meta(file_meta);

    let study_date_str = study_date.format("%Y%m%d").to_string();
    let dir_path = storage_config
        .make_series_dicom_dir(
            tenant_id,
            &*study_date_str,
            study_uid.as_str(),
            series_uid.as_str(),
            true,
        )
        .whatever_context(format!(
        "failed to get dicom series dir: tenant_id={}, study_date={}, study_uid={}, series_uid={}",
        tenant_id, study_date, study_uid, series_uid
    ))?;
    let study_uid_hash_v = hash_uid(study_uid.as_str());
    let series_uid_hash_v = hash_uid(series_uid.as_str());

    let file_path = storage_config::dicom_file_path(&dir_path, sop_instance_uid);

    info!(logger, "file path: {}", file_path);
    let mut final_ts = ts.to_string();
    let mut transcode_status = TransferStatus::NoNeedTransfer;
    if !JS_SUPPORTED_TS.contains(ts) {
        let target_ts = TransferSyntaxRegistry
            .get(JS_CHANGE_TO_TS.as_str())
            .unwrap();
        match file_obj.transcode(target_ts) {
            Ok(_) => {
                final_ts = target_ts.uid().to_string();
                info!(
                    logger,
                    "transcode success: {} -> {}",
                    ts.to_string(),
                    final_ts
                );
                transcode_status = TransferStatus::Success;
            }
            Err(e) => {
                error!(logger, "transcode failed: {}", e);
                transcode_status = TransferStatus::Failed;
            }
        }
    } else {
        info!(logger, "not need transcode: {}", ts.to_string());
    }
    file_obj
        .write_to_file(&file_path)
        .whatever_context(format!(
            "not need transcode, save file to disk failed: {:?}",
            file_path
        ))?;
    let fsize = std::fs::metadata(&file_path).unwrap().len();
    // Fixed:
    let saved_path = PathBuf::from(file_path); // Now can safely transfer ownership
    let uuid_v7 = Uuid::now_v7();
    let trace_uid = uuid_v7.to_string(); // Or directly use format!("{}", uuid_v7)
                                         // Modified to
    let cdate = chrono::Local::now().naive_local();

    Ok(DicomStoreMeta {
        trace_id: FixedLengthString::<36>::make(trace_uid),
        worker_node_id: BoundedString::<64>::make_str("DICOM_STORE_SCP"),
        tenant_id: BoundedString::<64>::make_str(&tenant_id),
        patient_id: pat_id,
        study_uid,
        series_uid,
        sop_uid: BoundedString::<64>::make_str(&sop_instance_uid),
        file_path: BoundedString::<512>::make_str(saved_path.to_str().unwrap()),
        file_size: fsize as i64,
        transfer_syntax_uid: BoundedString::<64>::make_str(ts),
        target_ts: BoundedString::<64>::make_str(&final_ts),
        study_date,
        transfer_status: transcode_status,
        number_of_frames: frames,
        created_time: cdate,
        // Modified to use from_string method to create BoundedString<20>
        series_uid_hash: BoundedString::<20>::make_str(&series_uid_hash_v),
        study_uid_hash: BoundedString::<20>::make_str(&study_uid_hash_v),
        accession_number,
        source_ip: BoundedString::<24>::make_str(&ip),
        source_ae: BoundedString::<64>::make_str(&client_ae),
    })
}

/// Regardless of transcoding success or failure, files will be saved to local disk
///
/// # Parameters
/// * `dicom_message_lists` - List of DICOM object metadata to process
/// * `storage_producer` - Used to extract PatientInfo, StudyInfo, SeriesInfo DICOM-compliant entity information
/// * `log_producer` - Used to record image acquisition log information for subsequent efficiency statistics
/// * `logger` - Logger
/// * `queue_topic_main` - Topic name (for storage_consumer)
/// * `queue_topic_log` - Topic name (for log extraction)
pub(crate) async fn classify_and_publish_dicom_messages(
    dicom_message_lists: &Vec<DicomStoreMeta>,
    storage_producer: &KafkaMessagePublisher,
    log_producer: &KafkaMessagePublisher,
) -> Result<(), Box<dyn std::error::Error>> {
    let root_logger = get_logger();
    let logger = root_logger.new(o!("wado-storescp"=>"classify_and_publish_dicom_messages"));
    if dicom_message_lists.is_empty() {
        info!(logger, "Empty dicom message list, skip");
        return Ok(());
    }

    let topic_name = storage_producer.topic();

    match common::utils::publish_messages(storage_producer, &dicom_message_lists).await {
        Ok(_) => {
            info!(
                logger,
                "Successfully published {} supported messages to Kafka: {}",
                dicom_message_lists.len(),
                topic_name
            );
        }
        Err(e) => {
            error!(
                logger,
                "Failed to publish messages to Kafka: {}, topic: {}", e, topic_name
            );
        }
    }

    let log_topic_name = log_producer.topic();
    match common::utils::publish_messages(log_producer, &dicom_message_lists).await {
        Ok(_) => {
            info!(
                logger,
                "Successfully published {} messages to Kafka: {}",
                dicom_message_lists.len(),
                log_topic_name
            );
        }
        Err(e) => {
            error!(
                logger,
                "Failed to publish log messages to Kafka: {}, topic: {}", e, log_topic_name
            );
        }
    }

    Ok(())
}

transfer.rs

//! Accepted storage transfer options

use dicom_dictionary_std::uids::*;

/// A list of supported abstract syntaxes for storage services
#[allow(deprecated)]
pub static ABSTRACT_SYNTAXES: &[&str] = &[
    CT_IMAGE_STORAGE,
    ENHANCED_CT_IMAGE_STORAGE,
    STANDALONE_CURVE_STORAGE,
    STANDALONE_OVERLAY_STORAGE,
    SECONDARY_CAPTURE_IMAGE_STORAGE,
    ULTRASOUND_IMAGE_STORAGE_RETIRED,
    NUCLEAR_MEDICINE_IMAGE_STORAGE_RETIRED,
    MR_IMAGE_STORAGE,
    ENHANCED_MR_IMAGE_STORAGE,
    MR_SPECTROSCOPY_STORAGE,
    ENHANCED_MR_COLOR_IMAGE_STORAGE,
    ULTRASOUND_MULTI_FRAME_IMAGE_STORAGE_RETIRED,
    COMPUTED_RADIOGRAPHY_IMAGE_STORAGE,
    DIGITAL_X_RAY_IMAGE_STORAGE_FOR_PRESENTATION,
    DIGITAL_X_RAY_IMAGE_STORAGE_FOR_PROCESSING,
    ENCAPSULATED_PDF_STORAGE,
    ENCAPSULATED_CDA_STORAGE,
    ENCAPSULATED_STL_STORAGE,
    GRAYSCALE_SOFTCOPY_PRESENTATION_STATE_STORAGE,
    POSITRON_EMISSION_TOMOGRAPHY_IMAGE_STORAGE,
    BREAST_TOMOSYNTHESIS_IMAGE_STORAGE,
    BREAST_PROJECTION_X_RAY_IMAGE_STORAGE_FOR_PRESENTATION,
    BREAST_PROJECTION_X_RAY_IMAGE_STORAGE_FOR_PROCESSING,
    ENHANCED_PET_IMAGE_STORAGE,
    RT_IMAGE_STORAGE,
    NUCLEAR_MEDICINE_IMAGE_STORAGE,
    ULTRASOUND_MULTI_FRAME_IMAGE_STORAGE,
    MULTI_FRAME_SINGLE_BIT_SECONDARY_CAPTURE_IMAGE_STORAGE,
    MULTI_FRAME_GRAYSCALE_BYTE_SECONDARY_CAPTURE_IMAGE_STORAGE,
    MULTI_FRAME_GRAYSCALE_WORD_SECONDARY_CAPTURE_IMAGE_STORAGE,
    MULTI_FRAME_TRUE_COLOR_SECONDARY_CAPTURE_IMAGE_STORAGE,
    BASIC_TEXT_SR_STORAGE,
    ENHANCED_SR_STORAGE,
    COMPREHENSIVE_SR_STORAGE,
    VERIFICATION,
];

Key Benefits

  • DICOM Standard Compliance: Full implementation of DICOM C-STORE protocol
  • High Performance: Built with Rust for memory safety and performance
  • Flexible Operation Modes: Both synchronous and asynchronous processing options
  • Multi-Tenancy Support: Handle multiple institutions with tenant isolation
  • Automatic Transcoding: Convert unsupported transfer syntaxes to compatible formats
  • Secure Operations: Certificate validation and license management
  • Scalable Architecture: Kafka integration for distributed processing

Keywords and Descriptions

  • Primary Keywords: DICOM C-STORE, DICOM-WEB, medical imaging, healthcare cloud, DICOM storage, Rust DICOM server
  • Secondary Keywords: DICOM-rs, DICOM SCP, medical imaging software, healthcare IT, DICOM processing
  • Meta Description: Complete implementation of DICOM C-STORE SCP services using Rust and the DICOM-rs library. Features multi-tenant support, automatic transcoding, and scalable architecture.
  • Target Audience: Healthcare software developers, medical imaging system architects, DICOM server administrators
  • Content Value: Detailed implementation guide for building DICOM C-STORE services with Rust, including security features and performance optimizations