Kafka Message Processing System for Medical Imaging

The [message_sender_kafka.rs] module provides robust Kafka integration for medical imaging systems, enabling scalable, reliable message processing for DICOM metadata and imaging workflows.

Message Processing Architecture

High-Throughput Design

Implements batch processing and compression to handle large volumes of medical imaging metadata with minimal resource consumption.

Reliable Delivery

Features built-in retry mechanisms and error handling to ensure message delivery even in challenging network conditions.

Topic Segregation

Uses separate Kafka topics for different message types:

  • Main storage queue for primary processing
  • Log queue for system events and auditing
  • DICOM state queue for metadata updates
  • DICOM image queue for image processing workflows

Core Capabilities

Batch Message Sending

Processes multiple DICOM metadata records in batches to optimize network usage and reduce latency.

Compression Support

Implements Snappy compression to reduce bandwidth requirements for large message payloads.

Asynchronous Processing

Uses async/await patterns for non-blocking message processing, enabling high-concurrency operations.

Technical Features

Connection Management

Maintains persistent Kafka connections with automatic reconnection handling.

Error Handling

Implements comprehensive error handling with detailed logging for troubleshooting.

Performance Tuning

Configurable buffering and batching parameters for optimal performance in different environments.

Healthcare Workflow Integration

DICOM State Management

Streams DICOM study and series state changes for real-time workflow updates.

Image Processing Coordination

Coordinates distributed image processing tasks across multiple system components.

Audit Trail Generation

Creates comprehensive audit logs for compliance and troubleshooting purposes.

Scalability Benefits

  1. Horizontal Scaling: Kafka’s distributed architecture enables system scaling
  2. Fault Tolerance: Built-in redundancy ensures system reliability
  3. Load Distribution: Distributes processing load across multiple consumers
  4. Performance Isolation: Separates message processing from core application logic

message_sender_kafka.rs Send Mesages To Kafka Topics

 
use crate::message_sender::MessagePublisher;
use crate::server_config;
use async_trait::async_trait;
use futures_util::future::join_all;
use rdkafka::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use std::collections::HashMap;
use std::error::Error;
use std::time::Duration;
use tracing::{debug, error, info};
use database::dicom_meta::{DicomImageMeta, DicomStateMeta, DicomStoreMeta};

pub struct KafkaMessagePublisher {
    producer: FutureProducer,
    topic: String,
}
impl KafkaMessagePublisher {
    pub fn topic(&self) -> &str {
        &self.topic
    }
    pub fn new(topic_name: String) -> Self {
        let app_config = server_config::load_config().expect("Failed to load config");
        let config = app_config.kafka;

        let brokers = config.brokers;
        let producer = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "30000") //increase message timeout to 30 seconds
            // ---BATCH SENDING RELATED CONFIGURATION ---
            .set(
                "queue.buffering.max.messages",
                config.queue_buffering_max_messages.to_string(),
            ) // producer MAXIMUM INTERNAL QUEUE MESSAGES
            .set(
                "queue.buffering.max.kbytes",
                config.queue_buffering_max_kbytes.to_string(),
            ) // maximum queue size (1GB)`
            .set(
                "queue.buffering.max.ms",
                config.queue_buffering_max_ms.to_string(),
            ) // MAXIMUM BUFFER TIME FORCE SENDING AFTER 100 MS
            .set("batch.num.messages", config.batch_num_messages.to_string()) // MAXIMUM MESSAGES PER BATCH
            .set("linger.ms", config.linger_ms.to_string()) //WAIT FOR MORE MESSAGES TO FORM LARGER BATCHES
            .set("compression.codec", config.compression_codec.to_string()) // ENABLE COMPRESSION TO REDUCE NETWORK OVERHEAD
            // ADD RETRY MECHANISM
            .set("retries", "5")
            .set("retry.backoff.ms", "1000")
            .create()
            .expect("Failed to create KafkaMessagePublisher");
        Self {
            producer,
            topic: topic_name,
        }
    }
}

#[async_trait]
impl MessagePublisher for KafkaMessagePublisher {
    async fn send_message(&self, msg: &DicomStoreMeta) -> Result<(), Box<dyn Error>> {
        info!(
            "KafkaMessagePublisher send_message to topic: {}",
            self.topic
        );
        let payload = serde_json::to_vec(msg)?;
        let key = String::from(msg.trace_id.as_str());
        let record: FutureRecord<String, Vec<u8>> =
            FutureRecord::to(&*self.topic).key(&key).payload(&payload);

        match self
            .producer
            .send(record, Timeout::After(Duration::from_secs(10))) // INCREASE TIMEOUT TO 10 SECONDS
            .await
            .map_err(|(e, _)| e)
        {
            Ok(_) => {
                self.producer.flush(Duration::from_micros(500))?;
                debug!("Flushed KafkaMessagePublisher");
                Ok(())
            }
            Err(e) => {
                error!("Failed to send message to Kafka: {:?}", e);
                Err(Box::new(e))
            }
        }
    }

    async fn send_batch_messages(&self, messages: &[DicomStoreMeta]) -> Result<(), Box<dyn Error>> {
        info!(
            "KafkaMessagePublisher send_batch_messages: {}  to topic {}",
            messages.len(),
            self.topic
        );

        let mut wait_message = HashMap::new();

        for msg in messages {
            match serde_json::to_vec(msg) {
                Ok(payload) => {
                    let key = String::from(msg.trace_id.as_str());
                    wait_message.insert(key.clone(), payload.clone());
                }
                Err(e) => {
                    error!("Failed to serialize message: {:?}", e);
                    return Err(Box::new(e));
                }
            }
        }

        let futures: Vec<_> = wait_message
            .iter()
            .map(|(key, payload)| {
                let record = FutureRecord::to(&*self.topic)
                    .key(&key[..])
                    .payload(&payload[..]);
                // INCREASE TIMEOUT TO 10 SECONDS
                self.producer
                    .send(record, Timeout::After(Duration::from_secs(10)))
            })
            .collect();

        let results = join_all(futures).await;

        let mut success_count = 0;
        let mut error_count = 0;

        for result in results {
            match result {
                Ok(_) => success_count += 1,
                Err(e) => {
                    error!("Failed to send message: {:?}", e);
                    error_count += 1;
                }
            }
        }

        info!(
            "✅ Batch sending completed: {} successful, {} failed",
            success_count, error_count
        );


        if error_count > 0 {
            Err("Some messages failed to send".into())
        } else {
            Ok(())
        }
    }

    async fn send_state_messages(&self, messages: &[DicomStateMeta]) -> Result<(), Box<dyn Error>> {
        info!(
            "KafkaMessagePublisher send_state_messages: {} to topic {}",
            messages.len(),
            self.topic
        );

        let mut wait_message = HashMap::new();

        for msg in messages {
            match serde_json::to_vec(msg) {
                Ok(payload) => {
                    //   tenant_id + patient_id + study_uid + series_uid
                    let key_source = format!("{}_{}_{}_{}",
                                             msg.tenant_id.as_str(),
                                             msg.patient_id.as_str(),
                                             msg.study_uid.as_str(),
                                             msg.series_uid.as_str());
                    let key = format!("{:x}", md5::compute(key_source));
                    wait_message.insert(key, payload);
                   
                }
                Err(e) => {
                    error!("Failed to serialize DicomStateMeta message: {:?}", e);
                    return Err(Box::new(e));
                }
            }
        }

        let futures: Vec<_> = wait_message
            .iter()
            .map(|(key, payload)| {
                let record = FutureRecord::to(&*self.topic)
                    .key(&key[..])
                    .payload(&payload[..]);
                // INCREASE TIMEOUT TO 10 SECONDS
                self.producer
                    .send(record, Timeout::After(Duration::from_secs(10)))
            })
            .collect();

        let results = join_all(futures).await;

        let mut success_count = 0;
        let mut error_count = 0;

        for result in results {
            match result {
                Ok(_) => success_count += 1,
                Err(e) => {
                    error!("Failed to send DicomStateMeta message: {:?}", e);
                    error_count += 1;
                }
            }
        }

        info!(
            "✅ Batch sending DicomStateMeta completed: {} successful, {} failed",
            success_count, error_count
        );


        if error_count > 0 {
            Err("Some DicomStateMeta messages failed to send".into())
        } else {
            Ok(())
        }
    }

    async fn send_image_messages(&self, messages: &[DicomImageMeta]) -> Result<(), Box<dyn Error>> {
        info!(
            "KafkaMessagePublisher send_image_messages: {} to topic {}",
            messages.len(),
            self.topic
        );

        let mut wait_message = HashMap::new();

        for msg in messages {
            match serde_json::to_vec(msg) {
                Ok(payload) => {
                    //   tenant_id + patient_id + study_uid + series_uid + sop_uid
                    let key_source = format!(
                        "{}_{}_{}_{}_{}",
                        msg.tenant_id.as_str(),
                        msg.patient_id.as_str(),
                        msg.study_uid.as_str(),
                        msg.series_uid.as_str(),
                        msg.sop_uid.as_str()
                    );
                    let key = format!("{:x}", md5::compute(key_source));
                    wait_message.insert(key, payload);
                }
                Err(e) => {
                    error!("Failed to serialize DicomImageMeta message: {:?}", e);
                    return Err(Box::new(e));
                }
            }
        }

        let futures: Vec<_> = wait_message
            .iter()
            .map(|(key, payload)| {
                let record = FutureRecord::to(&*self.topic)
                    .key(&key[..])
                    .payload(&payload[..]);
                // INCREASE TIMEOUT TO 10 SECONDS
                self.producer
                    .send(record, Timeout::After(Duration::from_secs(10)))
            })
            .collect();

        let results = join_all(futures).await;

        let mut success_count = 0;
        let mut error_count = 0;

        for result in results {
            match result {
                Ok(_) => success_count += 1,
                Err(e) => {
                    error!("Failed to send DicomImageMeta message: {:?}", e);
                    error_count += 1;
                }
            }
        }

        info!(
            "✅ Batch sending DicomImageMeta completed: {} successful, {} failed",
            success_count, error_count
        );

     
        if error_count > 0 {
            Err("Some DicomImageMeta messages failed to send".into())
        } else {
            Ok(())
        }
    }
}

GoTo Summary : how-to-build-cloud-dicom