WADO-Consumer Module Documentation

Overview

storage_consumer.rs is the core consumption module in the WADO system, responsible for consuming DICOM storage messages from the Kafka message queue, performing batch processing, persistent storage, and republishing to other topics.

Main Functions

Message Consumption and Processing Flow

  • Subscribe to DicomStoreMeta messages from Kafka topics
  • Batch collect messages and process according to time and quantity thresholds
  • Classify processed messages into state metadata and image metadata
  • Persist to databases
  • Republish to corresponding Kafka topics

Core Components

start_process Function

System entry point, responsible for initializing the entire consumption process:

  • Load configuration and set up logging system
  • Create Kafka consumer and subscribe to specified topics
  • Start message reading and processing in two concurrent threads
  • Manage thread lifecycle and error handling

read_message Function

Core function responsible for consuming messages from Kafka:

  • Continuously monitor Kafka message stream
  • Deserialize messages to DicomStoreMeta objects
  • Store messages in shared buffer
  • Commit message offsets to ensure no duplicate consumption

persist_message_loop Function

Main loop for batch message processing:

  • Monitor messages in shared buffer
  • Decide processing timing based on batch size (MAX_MESSAGES_PER_BATCH=50) and time interval (MAX_TIME_BETWEEN_BATCHES=5 seconds)
  • Group process messages, extract state and image information
  • Call database persistence and Kafka republishing

publish_dicom_meta Function

Responsible for publishing processed metadata to Kafka:

  • Parallel publishing of state messages and image messages
  • Error handling and logging

Technical Features

Concurrent Design

Adopts dual-thread model:

  • One thread dedicated to message reading
  • Another thread for batch message processing
  • Uses Arc<Mutex<Vec<DicomStoreMeta>>> for thread-safe data sharing between threads

Batch Processing Mechanism

Key performance optimization design:

  • Maximum batch processing size: 50 messages
  • Maximum processing interval: 5 seconds
  • Avoids frequent database operations and network requests

Error Handling

  • Comprehensive logging system
  • Continue committing offsets when message processing fails to avoid blocking
  • Exception capture and handling at all critical steps

Data Flow

Kafka Topic(main) → DicomStoreMeta → Batch Processing → 
Database Storage + Kafka Topic(dicom_state/dicom_image)

Dependencies

  • rdkafka: Kafka client library
  • serde_json: JSON serialization/deserialization
  • tokio: Asynchronous runtime
  • slog: Logging system
  • Custom common and database modules

storage_consumer.rs

use common::message_sender_kafka::KafkaMessagePublisher;
use common::utils::{get_logger, group_dicom_state};
use common::{database_factory, server_config};
use database::dicom_meta::{DicomImageMeta, DicomStateMeta, DicomStoreMeta};
use futures::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};
use slog;
use slog::{error, info, o};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use tokio::runtime::Handle;

pub async fn start_process() {
    // Set global logger

    let rlogger = get_logger();
    let global_logger = rlogger.new(o!("wado-consume"=>"start_process"));

    // Set up logging system
    info!(global_logger, "start process");

    let config = server_config::load_config();
    let config = match config {
        Ok(config) => config,
        Err(e) => {
            error!(global_logger, "load config failed: {:?}", e);
            std::process::exit(-2);
        }
    };

    let kafka_config = config.kafka;

    let queue_config = config.message_queue;

    // Configure consumer
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", queue_config.consumer_group_id.as_str())
        .set("bootstrap.servers", kafka_config.brokers.as_str())
        .set("enable.auto.commit", "false")
        .set("auto.offset.reset", "earliest")
        .set("session.timeout.ms", "6000")
        .set("enable.partition.eof", "false")
        .create()
        .expect("create wado-consumer failed");

    let topic = queue_config.topic_main.as_str();
    info!(global_logger, "Subscribing to topic: {}", topic);

    match consumer.subscribe(&[topic]) {
        Ok(_) => info!(global_logger, "Successfully subscribed to topic: {}", topic),
        Err(e) => {
            error!(
                global_logger,
                "Failed to subscribe to topic {}: {}", topic, e
            );
            std::process::exit(-1);
        }
    }

    // Create a thread-safe shared Vec and timestamp
    let shared_vec = Arc::new(Mutex::new(Vec::new()));
    let last_process_time = Arc::new(Mutex::new(Instant::now()));

    // Clone Arc for different tasks
    let vec_for_reader = Arc::clone(&shared_vec);
    let vec_for_writer = Arc::clone(&shared_vec);
    let time_for_writer = Arc::clone(&last_process_time);

    // Get current Tokio runtime handle
    let handle = Handle::current();
    let handle_for_reader = handle.clone();
    let handle_for_writer = handle.clone();

    // Start message reading task (run async code in new thread)
    let reader_thread = thread::spawn(move || {
        handle_for_reader.block_on(async {
            read_message(consumer, vec_for_reader, last_process_time).await;
        });
    });

    // Start message processing task (run async code in new thread)
    let writer_thread = thread::spawn(move || {
        handle_for_writer.block_on(async {
            persist_message_loop(vec_for_writer, time_for_writer).await;
        });
    });

    // Wait for both threads to complete
    let reader_result = reader_thread.join();
    let writer_result = writer_thread.join();

    match reader_result {
        Ok(_) => info!(global_logger, "Reader thread completed successfully"),
        Err(e) => error!(global_logger, "Reader thread panicked: {:?}", e),
    }

    match writer_result {
        Ok(_) => info!(global_logger, "Writer thread completed successfully"),
        Err(e) => error!(global_logger, "Writer thread panicked: {:?}", e),
    }
    // Call before exit

    // Wait a short time for cleanup to complete
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Main thread view final results
    let final_vec = shared_vec.lock().unwrap();
    info!(global_logger, "Final Vec: {:?}", *final_vec);
}

async fn read_message(
    consumer: StreamConsumer,
    vec: Arc<Mutex<Vec<DicomStoreMeta>>>,
    last_process_time: Arc<Mutex<Instant>>,
) {
    let logger = get_logger();
    let mut message_stream = consumer.stream();
    info!(logger, "XXStarting to read messages ...");

    while let Some(result) = message_stream.next().await {
        match result {
            Ok(message) => {
                info!(logger, "Received message: {:?}", message);
                match message.payload() {
                    Some(payload) => {
                        match serde_json::from_slice::<DicomStoreMeta>(payload) {
                            Ok(dicom_message) => {
                                // Add message to shared vector
                                {
                                    let mut vec = vec.lock().unwrap();
                                    vec.push(dicom_message);
                                    // Update last processing time
                                    let mut time = last_process_time.lock().unwrap();
                                    *time = Instant::now();
                                }

                                // Commit offset after successful processing
                                if let Err(e) = consumer.commit_message(&message, CommitMode::Sync)
                                {
                                    error!(logger, "Failed to commit message: {}", e);
                                } else {
                                    info!(
                                        logger,
                                        "Successfully processed and committed message:{}",
                                        message.offset()
                                    );
                                }
                            }
                            Err(e) => {
                                error!(logger, "Failed to deserialize message: {}", e);
                                // Commit offset even on deserialization failure
                                if let Err(e) = consumer.commit_message(&message, CommitMode::Sync)
                                {
                                    error!(logger, "Failed to commit message: {}", e);
                                }
                            }
                        }
                    }
                    None => {
                        error!(logger, "Received message with no payload");
                        if let Err(e) = consumer.commit_message(&message, CommitMode::Sync) {
                            error!(logger, "Failed to commit message: {}", e);
                        }
                    }
                }
            }
            Err(e) => {
                error!(logger, "Error receiving message: {}", e);
            }
        }
    }
}

static MAX_MESSAGES_PER_BATCH: usize = 50;
static MAX_TIME_BETWEEN_BATCHES: Duration = Duration::from_secs(5);
async fn persist_message_loop(
    vec: Arc<Mutex<Vec<DicomStoreMeta>>>,
    last_process_time: Arc<Mutex<Instant>>,
) {
    let logger = get_logger();
    info!(logger, "Starting message persistence loop...");

    loop {
        let should_process = {
            let vec = vec.lock().unwrap();
            let time = last_process_time.lock().unwrap();

            // Check if processing conditions are met:
            // 1. Queue has messages and count >= 100
            // 2. Queue has messages and time since last processing exceeds 10 seconds
            let queue_size = vec.len();
            let time_since_last_process = Instant::now().duration_since(*time);

            (queue_size > 0 && queue_size >= MAX_MESSAGES_PER_BATCH)
                || (queue_size > 0 && time_since_last_process >= MAX_TIME_BETWEEN_BATCHES)
        };
        if !should_process {
            // Sleep for a while, wait for next processing
            tokio::time::sleep(Duration::from_secs(1)).await;
            continue;
        }

        // Batch process messages
        let messages_to_process = {
            let mut vec = vec.lock().unwrap();
            let mut messages = Vec::new();

            // Take all messages or up to 100 messages for processing
            let take_count = vec.len().min(MAX_MESSAGES_PER_BATCH);
            for _ in 0..take_count {
                if let Some(msg) = vec.pop() {
                    messages.push(msg);
                }
            }

            messages
        };

        if messages_to_process.is_empty() {
            // Sleep for a while, wait for next processing
            tokio::time::sleep(Duration::from_secs(1)).await;
            continue;
        }
        let (state_metas, image_entities) = match group_dicom_state(&messages_to_process).await {
            Ok((state_metas, image_entities)) => (state_metas, image_entities),
            Err(e) => {
                error!(logger, "Failed to group dicom state: {}", e);
                continue;
            }
        };
        //TODO Iterate and output state
        for state_meta in &state_metas {
            info!(logger, "DicomStateMeta: {:?}", state_meta);
        }
        //TODO Iterate and output image
        for image_entity in &image_entities {
            info!(logger, "ImageEntity: {:?}", image_entity);
        }
        let app_config = match server_config::load_config() {
            Ok(config) => config,
            Err(e) => {
                error!(logger, "Failed to load config: {}", e);
                continue;
            }
        };
        let queue_config = app_config.message_queue;

        let topic_state = &queue_config.topic_dicom_state.as_str();
        let topic_image = &queue_config.topic_dicom_image.as_str();

        let topic_dicom_state = match topic_state.parse::<String>() {
            Ok(val) => val,
            Err(e) => {
                error!(logger, "Failed to parse topic_dicom_state: {}", e);
                continue;
            }
        };
        let topic_dicom_image = match topic_image.parse::<String>() {
            Ok(val) => val,
            Err(e) => {
                error!(logger, "Failed to parse topic_image: {}", e);
                continue;
            }
        };

        let state_producer = KafkaMessagePublisher::new(topic_dicom_state);
        let image_producer = KafkaMessagePublisher::new(topic_dicom_image);

        // Publish state messages and image messages
        if let Err(e) = publish_dicom_meta(
            &state_metas,
            &image_entities,
            &state_producer,
            &image_producer,
        )
        .await
        {
            error!(logger, "Failed to publish dicom meta: {}", e);
            continue;
        }
        match database_factory::create_db_instance(&app_config.main_database).await {
            Ok(db) => {
                // Save image acquisition records
                match db.save_store_list(&messages_to_process).await {
                    Ok(_) => {}
                    Err(e) => error!(logger, "Failed to save_store_info: {}", e),
                }
                // Insert state messages
                match db.save_state_list(&state_metas).await {
                    Ok(_) => {}
                    Err(e) => error!(logger, "Failed to save_state_list: {}", e),
                }
                // Insert slice messages
                match db.save_image_list(&image_entities).await {
                    Ok(_) => {}
                    Err(e) => error!(logger, "Failed to save_image_list: {}", e),
                }
            }
            Err(e) => {
                error!(logger, "Failed to create database: {}", e);
                continue;
            }
        };

        // Update last processing time
        {
            let mut time = last_process_time.lock().unwrap();
            *time = Instant::now();
        }

        info!(
            logger,
            "Successfully processed batch of {} messages",
            messages_to_process.len()
        );
    }
}

async fn publish_dicom_meta(
    state_metaes: &Vec<DicomStateMeta>,
    image_metaes: &Vec<DicomImageMeta>,
    state_producer: &KafkaMessagePublisher,
    image_producer: &KafkaMessagePublisher,
) -> Result<(), Box<dyn std::error::Error>> {
    let root_logger = get_logger();
    let logger = root_logger.new(o!("wado-consume"=>"publish_dicom_meta"));
    if state_metaes.is_empty() || image_metaes.is_empty() {
        info!(
            logger,
            "Empty dicom state meta list and image meta list, skip"
        );
        return Ok(());
    }

    let state_topic_name = state_producer.topic();
    let image_topic_name = image_producer.topic();

    // Parallel publishing of state messages and image messages
    let (state_result, image_result) = tokio::join!(
        common::utils::publish_state_messages(state_producer, &state_metaes),
        common::utils::publish_image_messages(image_producer, &image_metaes)
    );
    // Handle state message publishing results
    match state_result {
        Ok(_) => {
            info!(
                logger,
                "Successfully published {} supported messages to Kafka: {}",
                state_metaes.len(),
                state_topic_name
            );
        }
        Err(e) => {
            error!(
                logger,
                "Failed to publish messages to Kafka: {}, topic: {}", e, state_topic_name
            );
        }
    }

    // Handle image message publishing results
    match image_result {
        Ok(_) => {
            info!(
                logger,
                "Successfully published {} messages to Kafka: {}",
                image_metaes.len(),
                image_topic_name
            );
        }
        Err(e) => {
            error!(
                logger,
                "Failed to publish image messages to Kafka: {}, topic: {}", e, image_topic_name
            );
        }
    }
     

    Ok(())
}

Key Benefits

  • High Performance: Batch processing with configurable thresholds for optimal throughput
  • Scalability: Kafka-based message queue architecture supports horizontal scaling
  • Reliability: Message acknowledgment and offset management prevent data loss
  • Flexibility: Support for multiple database backends through factory pattern
  • Real-time Processing: Asynchronous processing with parallel operations

Architecture Considerations

  • Batch Processing: Configurable batch size (50 messages) and timeout (5 seconds) for performance optimization
  • Thread Safety: Proper synchronization using Arc and Mutex for concurrent access
  • Error Handling: Comprehensive error handling and logging throughout the processing pipeline
  • Data Consistency: Transactional processing to maintain data integrity

Keywords and Descriptions

  • Primary Keywords: DICOM-WEB, medical imaging, healthcare cloud, DICOM storage, Kafka, DICOM consumer service
  • Secondary Keywords: PostgreSQL, Apache Doris, DICOM metadata processing, healthcare software, DICOM analytics
  • Meta Description: Complete implementation of DICOM consumer service using Kafka for scalable DICOM metadata processing in healthcare cloud environments.
  • Target Audience: Healthcare software developers, medical imaging system architects, DICOM system administrators
  • Content Value: Detailed guide to building scalable DICOM consumer services with batch processing and database integration