Overview
WADO-StoreSCP is a DICOM Storage SCP (C-STORE) service implementation written in Rust that can receive DICOM files sent from other DICOM devices. This project is part of the DICOM-rs project.
Feature Set
- Supports DICOM C-STORE protocol
- Provides both synchronous and asynchronous operation modes
- Supports multiple transfer syntaxes
- Automatically transcodes unsupported transfer syntaxes
- Saves received DICOM files to local storage
- Sends metadata information via Kafka
- Supports multi-tenant (hospital/institution) environments
- Includes certificate validation mechanisms
System Architecture
Core Components
-
Main Program main.rs
- Application entry point
- Command-line argument parsing
- Certificate validation
- Operation mode selection (synchronous/asynchronous)
-
DICOM File Handler dicom_file_handler.rs
- Process received DICOM data
- Extract metadata
- Transcoding processing
- File storage
-
Asynchronous Storage Service store_async.rs
- DICOM C-STORE implementation in asynchronous mode
- Handle client connections and data transfer
-
Synchronous Storage Service store_sync.rs
- DICOM C-STORE implementation in synchronous mode
- Handle client connections and data transfer
-
Transfer Syntax Management transfer.rs
- Define supported abstract syntax lists
Configuration
The application is configured through configuration files with main configuration items including:
- Server port and AE Title
- Storage path configuration
- Kafka message queue configuration
- Supported transfer syntax list
- Transcoding target transfer syntax
- Certificate validation configuration
WADO-StoreSCP project is a storescp service implementation based on the DICOM-rs library.
main.rs
extern crate core;
use clap::Parser;
use common::license_manager::validate_client_certificate;
use common::server_config;
use common::utils::{get_logger, setup_logging};
use dicom_core::{dicom_value, DataElement, VR};
use dicom_dictionary_std::tags;
use dicom_encoding::{snafu};
use dicom_object::{InMemDicomObject, StandardDataDictionary};
use slog::{error, info, o};
use snafu::Report;
use std::{
net::{Ipv4Addr, SocketAddrV4},
};
mod dicom_file_handler;
mod store_async;
mod store_sync;
mod transfer;
use store_async::run_store_async;
use store_sync::run_store_sync;
/// DICOM C-STORE SCP
#[derive(Debug, Parser)]
#[command(version)]
struct App {
/// Verbose mode
#[arg(short = 'v', long = "verbose")]
verbose: bool,
/// Calling Application Entity title
#[arg(long = "calling-ae-title", default_value = "STORE-SCP")]
calling_ae_title: String,
/// Enforce max pdu length
#[arg(short = 's', long = "strict")]
strict: bool,
/// Only accept native/uncompressed transfer syntaxes
#[arg(long)]
uncompressed_only: bool,
/// Accept unknown SOP classes
#[arg(long)]
promiscuous: bool,
/// Maximum PDU length
#[arg(
short = 'm',
long = "max-pdu-length",
default_value = "16384",
value_parser(clap::value_parser!(u32).range(4096..=131_072))
)]
max_pdu_length: u32,
/// Which port to listen on
#[arg(short, default_value = "11111")]
port: u16,
/// Run in non-blocking mode (spins up an async task to handle each incoming stream)
#[arg(short, long, default_value = "true")]
non_blocking: bool,
}
fn create_cstore_response(
message_id: u16,
sop_class_uid: &str,
sop_instance_uid: &str,
) -> InMemDicomObject<StandardDataDictionary> {
InMemDicomObject::command_from_element_iter([
DataElement::new(
tags::AFFECTED_SOP_CLASS_UID,
VR::UI,
dicom_value!(Str, sop_class_uid),
),
DataElement::new(tags::COMMAND_FIELD, VR::US, dicom_value!(U16, [0x8001])),
DataElement::new(
tags::MESSAGE_ID_BEING_RESPONDED_TO,
VR::US,
dicom_value!(U16, [message_id]),
),
DataElement::new(
tags::COMMAND_DATA_SET_TYPE,
VR::US,
dicom_value!(U16, [0x0101]),
),
DataElement::new(tags::STATUS, VR::US, dicom_value!(U16, [0x0000])),
DataElement::new(
tags::AFFECTED_SOP_INSTANCE_UID,
VR::UI,
dicom_value!(Str, sop_instance_uid),
),
])
}
fn create_cecho_response(message_id: u16) -> InMemDicomObject<StandardDataDictionary> {
InMemDicomObject::command_from_element_iter([
DataElement::new(tags::COMMAND_FIELD, VR::US, dicom_value!(U16, [0x8030])),
DataElement::new(
tags::MESSAGE_ID_BEING_RESPONDED_TO,
VR::US,
dicom_value!(U16, [message_id]),
),
DataElement::new(
tags::COMMAND_DATA_SET_TYPE,
VR::US,
dicom_value!(U16, [0x0101]),
),
DataElement::new(tags::STATUS, VR::US, dicom_value!(U16, [0x0000])),
])
}
#[tokio::main]
async fn main() {
let log = setup_logging("dicom-store-scp");
let config = server_config::load_config();
let config = match config {
Ok(config) => config,
Err(e) => {
println!("{:?}", e);
std::process::exit(-2);
}
};
let client_info = match validate_client_certificate().await {
Ok(client_info) => {
info!(
log,
"Client Certificate Validated, Client ID: {:?}, HashCode:{:?}",
client_info.0,
client_info.1
);
client_info
}
Err(e) => {
let error_string = format!("{}", e);
info!(
log,
"Client Certificate Validation Failed: {}", error_string
);
std::process::exit(-2);
}
};
let (client_id, hash_code) = client_info;
// Ensure client_id and hash_code exist in the certificate
let cert_client_id = match client_id {
Some(id) => id,
None => {
info!(log, "Certificate does not contain a valid Client ID");
std::process::exit(-2);
}
};
let cert_hash_code = match hash_code {
Some(code) => code,
None => {
info!(log, "Certificate does not contain a valid Hash Code");
std::process::exit(-2);
}
};
let license = match &config.dicom_license_server {
None => {
info!(log, "Dicom License Server Config is None");
std::process::exit(-2);
}
Some(license_server) => license_server,
};
// Use a more secure comparison method to avoid timing attacks
let client_id_matches = {
let expected = &license.client_id;
openssl::memcmp::eq(expected.as_bytes(), cert_client_id.as_bytes())
};
let hash_code_matches = {
let expected = &license.license_key; // license_key actually stores the hash_code
openssl::memcmp::eq(expected.as_bytes(), cert_hash_code.as_bytes())
};
if client_id_matches && hash_code_matches {
info!(log, "License Server Validation Success");
} else {
info!(log, "License Server Validation Failed");
info!(
log,
"Expected Client ID: {}, Certificate Client ID: {}", license.client_id, cert_client_id
);
info!(
log,
"Expected Hash Code: {}, Certificate Hash Code: {}",
license.license_key,
cert_hash_code
);
std::process::exit(-2);
}
let mut app = App::parse();
let scp_config = config.dicom_store_scp;
app.port = scp_config.port;
app.calling_ae_title = scp_config.ae_title;
info!(log, "License Server Validation Success");
match app.non_blocking {
false => {
info!(log, "Running in synchronous mode");
// Use existing tokio runtime
// Can set maximum concurrent connections and other parameters
run_async(app).await.unwrap_or_else(|e| {
error!(log, "{:?}", e);
std::process::exit(-2);
});
}
true => {
info!(log, "Running in non-blocking mode");
// Create dedicated runtime for synchronous mode
// Synchronous mode is suitable for simple deployments or debugging scenarios
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
error!(log, "Could not create tokio runtime: {}", e);
std::process::exit(-2);
});
std::thread::spawn(move || {
rt.block_on(async {
run_sync(app).await.unwrap_or_else(|e| {
error!(log, "{:?}", e);
std::process::exit(-2);
});
});
})
.join()
.unwrap();
}
}
}
async fn run_async(args: App) -> Result<(), Box<dyn std::error::Error>> {
use std::sync::Arc;
let args = Arc::new(args);
let rlogger = get_logger();
let logger = rlogger.new(o!("wado-storescp"=>"run_async"));
let listen_addr = SocketAddrV4::new(Ipv4Addr::from(0), args.port);
let listener = tokio::net::TcpListener::bind(listen_addr).await?;
info!(
&logger,
"{} listening on: tcp://{}", &args.calling_ae_title, listen_addr
);
loop {
let (socket, _addr) = listener.accept().await?;
let args = args.clone();
let logs = logger.clone();
tokio::task::spawn(async move {
if let Err(e) = run_store_async(socket, &args).await {
error!(logs, "{}", Report::from_error(e));
}
});
}
}
async fn run_sync(args: App) -> Result<(), Box<dyn std::error::Error>> {
let rlogger = get_logger();
let logger = rlogger.new(o!("wado-storescp"=>"run_sync"));
let listen_addr = SocketAddrV4::new(Ipv4Addr::from(0), args.port);
let listener = std::net::TcpListener::bind(listen_addr)?;
info!(
&logger,
"{} listening on: tcp://{}", &args.calling_ae_title, listen_addr
);
for stream in listener.incoming() {
match stream {
Ok(scu_stream) => {
let tcp_logger = logger.clone();
if let Err(e) = run_store_sync(scu_stream, &args).await {
error!(&tcp_logger, "{}", snafu::Report::from_error(e));
}
}
Err(e) => {
error!(&logger, "{}", snafu::Report::from_error(e));
}
}
}
Ok(())
}
dicom_file_handler.rs
Extract necessary information from received DICOM files and save to the main database. Provide data for subsequent consumer processes.
use common::dicom_utils::{get_bounder_string, get_date_value_dicom, get_tag_value};
use common::message_sender_kafka::KafkaMessagePublisher;
use common::storage_config::{hash_uid, StorageConfig};
use common::utils::get_logger;
use common::{server_config, storage_config};
use database::dicom_dbtype::{BoundedString, FixedLengthString};
use database::dicom_meta::{DicomStoreMeta, TransferStatus};
use dicom_dictionary_std::tags;
use dicom_encoding::snafu::{whatever, ResultExt, Whatever};
use dicom_encoding::TransferSyntaxIndex;
use dicom_object::{FileMetaTableBuilder, InMemDicomObject};
use dicom_pixeldata::Transcode;
use dicom_transfer_syntax_registry::TransferSyntaxRegistry;
use slog::o;
use slog::{error, info};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::LazyLock;
use uuid::Uuid;
static JS_SUPPORTED_TS: LazyLock<HashSet<String>> = LazyLock::new(|| {
// Initialize here, can be read from config file
// load_config is wrapped in INIT_ONCE, will not reload
let config = server_config::load_config().unwrap();
config
.dicom_store_scp
.cornerstonejs_supported_transfer_syntax
.iter()
.cloned()
.collect()
});
static JS_CHANGE_TO_TS: LazyLock<String> = LazyLock::new(|| {
// Initialize here, can be read from config file
// load_config is wrapped in INIT_ONCE, will not reload
let config = server_config::load_config().unwrap();
config.dicom_store_scp.unsupported_ts_change_to.clone()
});
pub(crate) async fn process_dicom_file(
instance_buffer: &[u8], // DICOM file byte array or binary stream
tenant_id: &String, // Institution ID, or hospital ID, used to distinguish multiple hospitals
ts: &String, // Transfer syntax
sop_instance_uid: &String, // Current file's SOP instance ID
sop_class_uid: &String, // Current file's SOP class ID
ip: String,
client_ae: String,
storage_config: &StorageConfig,
) -> Result<DicomStoreMeta, Whatever> {
let root_logger = get_logger();
let logger = root_logger.new(o!("wado-storescp"=>"process_dicom_file"));
let obj = InMemDicomObject::read_dataset_with_ts(
instance_buffer,
TransferSyntaxRegistry.get(ts).unwrap(),
)
.whatever_context("failed to read DICOM data object")?;
info!(logger, "DICOM data object read successfully");
let pat_id = match get_bounder_string::<64>(&obj, tags::PATIENT_ID) {
Some(v) => v,
None => {
whatever!("Missing PatientID or PatientID value length is exceeded 64 characters")
}
};
let study_uid = match get_bounder_string::<64>(&obj, tags::STUDY_INSTANCE_UID) {
Some(v) => v,
None => {
whatever!("Missing StudyID or StudyID value length is exceeded 64 characters")
}
};
let series_uid = match get_bounder_string::<64>(&obj, tags::SERIES_INSTANCE_UID) {
Some(v) => v,
None => {
whatever!("Missing SeriesID or SeriesID value length is exceeded 64 characters")
}
};
let accession_number = get_bounder_string::<16>(&obj, tags::ACCESSION_NUMBER) ;
let study_date = match get_date_value_dicom(&obj, tags::STUDY_DATE) {
Some(v) => v,
None => {
whatever!("Missing STUDY_DATE or STUDY_DATE value is not invalid format YYYYMMDD")
}
};
let frames = get_tag_value(tags::NUMBER_OF_FRAMES, &obj, 1);
info!(
logger,
"TenantID:{} ,PatientID: {}, StudyUID: {}, StudyDate: {}, Frames: {}",
tenant_id,
pat_id,
study_uid,
study_date,
frames
);
let file_meta = FileMetaTableBuilder::new()
.media_storage_sop_class_uid(sop_class_uid)
.media_storage_sop_instance_uid(sop_instance_uid)
.transfer_syntax(ts)
.build()
.whatever_context("failed to build DICOM meta file information")?;
let mut file_obj = obj.with_exact_meta(file_meta);
let study_date_str = study_date.format("%Y%m%d").to_string();
let dir_path = storage_config
.make_series_dicom_dir(
tenant_id,
&*study_date_str,
study_uid.as_str(),
series_uid.as_str(),
true,
)
.whatever_context(format!(
"failed to get dicom series dir: tenant_id={}, study_date={}, study_uid={}, series_uid={}",
tenant_id, study_date, study_uid, series_uid
))?;
let study_uid_hash_v = hash_uid(study_uid.as_str());
let series_uid_hash_v = hash_uid(series_uid.as_str());
let file_path = storage_config::dicom_file_path(&dir_path, sop_instance_uid);
info!(logger, "file path: {}", file_path);
let mut final_ts = ts.to_string();
let mut transcode_status = TransferStatus::NoNeedTransfer;
if !JS_SUPPORTED_TS.contains(ts) {
let target_ts = TransferSyntaxRegistry
.get(JS_CHANGE_TO_TS.as_str())
.unwrap();
match file_obj.transcode(target_ts) {
Ok(_) => {
final_ts = target_ts.uid().to_string();
info!(
logger,
"transcode success: {} -> {}",
ts.to_string(),
final_ts
);
transcode_status = TransferStatus::Success;
}
Err(e) => {
error!(logger, "transcode failed: {}", e);
transcode_status = TransferStatus::Failed;
}
}
} else {
info!(logger, "not need transcode: {}", ts.to_string());
}
file_obj
.write_to_file(&file_path)
.whatever_context(format!(
"not need transcode, save file to disk failed: {:?}",
file_path
))?;
let fsize = std::fs::metadata(&file_path).unwrap().len();
// Fixed:
let saved_path = PathBuf::from(file_path); // Now can safely transfer ownership
let uuid_v7 = Uuid::now_v7();
let trace_uid = uuid_v7.to_string(); // Or directly use format!("{}", uuid_v7)
// Modified to
let cdate = chrono::Local::now().naive_local();
Ok(DicomStoreMeta {
trace_id: FixedLengthString::<36>::make(trace_uid),
worker_node_id: BoundedString::<64>::make_str("DICOM_STORE_SCP"),
tenant_id: BoundedString::<64>::make_str(&tenant_id),
patient_id: pat_id,
study_uid,
series_uid,
sop_uid: BoundedString::<64>::make_str(&sop_instance_uid),
file_path: BoundedString::<512>::make_str(saved_path.to_str().unwrap()),
file_size: fsize as i64,
transfer_syntax_uid: BoundedString::<64>::make_str(ts),
target_ts: BoundedString::<64>::make_str(&final_ts),
study_date,
transfer_status: transcode_status,
number_of_frames: frames,
created_time: cdate,
// Modified to use from_string method to create BoundedString<20>
series_uid_hash: BoundedString::<20>::make_str(&series_uid_hash_v),
study_uid_hash: BoundedString::<20>::make_str(&study_uid_hash_v),
accession_number,
source_ip: BoundedString::<24>::make_str(&ip),
source_ae: BoundedString::<64>::make_str(&client_ae),
})
}
/// Regardless of transcoding success or failure, files will be saved to local disk
///
/// # Parameters
/// * `dicom_message_lists` - List of DICOM object metadata to process
/// * `storage_producer` - Used to extract PatientInfo, StudyInfo, SeriesInfo DICOM-compliant entity information
/// * `log_producer` - Used to record image acquisition log information for subsequent efficiency statistics
/// * `logger` - Logger
/// * `queue_topic_main` - Topic name (for storage_consumer)
/// * `queue_topic_log` - Topic name (for log extraction)
pub(crate) async fn classify_and_publish_dicom_messages(
dicom_message_lists: &Vec<DicomStoreMeta>,
storage_producer: &KafkaMessagePublisher,
log_producer: &KafkaMessagePublisher,
) -> Result<(), Box<dyn std::error::Error>> {
let root_logger = get_logger();
let logger = root_logger.new(o!("wado-storescp"=>"classify_and_publish_dicom_messages"));
if dicom_message_lists.is_empty() {
info!(logger, "Empty dicom message list, skip");
return Ok(());
}
let topic_name = storage_producer.topic();
match common::utils::publish_messages(storage_producer, &dicom_message_lists).await {
Ok(_) => {
info!(
logger,
"Successfully published {} supported messages to Kafka: {}",
dicom_message_lists.len(),
topic_name
);
}
Err(e) => {
error!(
logger,
"Failed to publish messages to Kafka: {}, topic: {}", e, topic_name
);
}
}
let log_topic_name = log_producer.topic();
match common::utils::publish_messages(log_producer, &dicom_message_lists).await {
Ok(_) => {
info!(
logger,
"Successfully published {} messages to Kafka: {}",
dicom_message_lists.len(),
log_topic_name
);
}
Err(e) => {
error!(
logger,
"Failed to publish log messages to Kafka: {}, topic: {}", e, log_topic_name
);
}
}
Ok(())
}
transfer.rs
//! Accepted storage transfer options
use dicom_dictionary_std::uids::*;
/// A list of supported abstract syntaxes for storage services
#[allow(deprecated)]
pub static ABSTRACT_SYNTAXES: &[&str] = &[
CT_IMAGE_STORAGE,
ENHANCED_CT_IMAGE_STORAGE,
STANDALONE_CURVE_STORAGE,
STANDALONE_OVERLAY_STORAGE,
SECONDARY_CAPTURE_IMAGE_STORAGE,
ULTRASOUND_IMAGE_STORAGE_RETIRED,
NUCLEAR_MEDICINE_IMAGE_STORAGE_RETIRED,
MR_IMAGE_STORAGE,
ENHANCED_MR_IMAGE_STORAGE,
MR_SPECTROSCOPY_STORAGE,
ENHANCED_MR_COLOR_IMAGE_STORAGE,
ULTRASOUND_MULTI_FRAME_IMAGE_STORAGE_RETIRED,
COMPUTED_RADIOGRAPHY_IMAGE_STORAGE,
DIGITAL_X_RAY_IMAGE_STORAGE_FOR_PRESENTATION,
DIGITAL_X_RAY_IMAGE_STORAGE_FOR_PROCESSING,
ENCAPSULATED_PDF_STORAGE,
ENCAPSULATED_CDA_STORAGE,
ENCAPSULATED_STL_STORAGE,
GRAYSCALE_SOFTCOPY_PRESENTATION_STATE_STORAGE,
POSITRON_EMISSION_TOMOGRAPHY_IMAGE_STORAGE,
BREAST_TOMOSYNTHESIS_IMAGE_STORAGE,
BREAST_PROJECTION_X_RAY_IMAGE_STORAGE_FOR_PRESENTATION,
BREAST_PROJECTION_X_RAY_IMAGE_STORAGE_FOR_PROCESSING,
ENHANCED_PET_IMAGE_STORAGE,
RT_IMAGE_STORAGE,
NUCLEAR_MEDICINE_IMAGE_STORAGE,
ULTRASOUND_MULTI_FRAME_IMAGE_STORAGE,
MULTI_FRAME_SINGLE_BIT_SECONDARY_CAPTURE_IMAGE_STORAGE,
MULTI_FRAME_GRAYSCALE_BYTE_SECONDARY_CAPTURE_IMAGE_STORAGE,
MULTI_FRAME_GRAYSCALE_WORD_SECONDARY_CAPTURE_IMAGE_STORAGE,
MULTI_FRAME_TRUE_COLOR_SECONDARY_CAPTURE_IMAGE_STORAGE,
BASIC_TEXT_SR_STORAGE,
ENHANCED_SR_STORAGE,
COMPREHENSIVE_SR_STORAGE,
VERIFICATION,
];
Key Benefits
- DICOM Standard Compliance: Full implementation of DICOM C-STORE protocol
- High Performance: Built with Rust for memory safety and performance
- Flexible Operation Modes: Both synchronous and asynchronous processing options
- Multi-Tenancy Support: Handle multiple institutions with tenant isolation
- Automatic Transcoding: Convert unsupported transfer syntaxes to compatible formats
- Secure Operations: Certificate validation and license management
- Scalable Architecture: Kafka integration for distributed processing
Keywords and Descriptions
- Primary Keywords: DICOM C-STORE, DICOM-WEB, medical imaging, healthcare cloud, DICOM storage, Rust DICOM server
- Secondary Keywords: DICOM-rs, DICOM SCP, medical imaging software, healthcare IT, DICOM processing
- Meta Description: Complete implementation of DICOM C-STORE SCP services using Rust and the DICOM-rs library. Features multi-tenant support, automatic transcoding, and scalable architecture.
- Target Audience: Healthcare software developers, medical imaging system architects, DICOM server administrators
- Content Value: Detailed implementation guide for building DICOM C-STORE services with Rust, including security features and performance optimizations