Kafka Message Processing System for Medical Imaging
The [message_sender_kafka.rs] module provides robust Kafka integration for medical imaging systems, enabling scalable, reliable message processing for DICOM metadata and imaging workflows.
Message Processing Architecture
High-Throughput Design
Implements batch processing and compression to handle large volumes of medical imaging metadata with minimal resource consumption.
Reliable Delivery
Features built-in retry mechanisms and error handling to ensure message delivery even in challenging network conditions.
Topic Segregation
Uses separate Kafka topics for different message types:
- Main storage queue for primary processing
- Log queue for system events and auditing
- DICOM state queue for metadata updates
- DICOM image queue for image processing workflows
Core Capabilities
Batch Message Sending
Processes multiple DICOM metadata records in batches to optimize network usage and reduce latency.
Compression Support
Implements Snappy compression to reduce bandwidth requirements for large message payloads.
Asynchronous Processing
Uses async/await patterns for non-blocking message processing, enabling high-concurrency operations.
Technical Features
Connection Management
Maintains persistent Kafka connections with automatic reconnection handling.
Error Handling
Implements comprehensive error handling with detailed logging for troubleshooting.
Performance Tuning
Configurable buffering and batching parameters for optimal performance in different environments.
Healthcare Workflow Integration
DICOM State Management
Streams DICOM study and series state changes for real-time workflow updates.
Image Processing Coordination
Coordinates distributed image processing tasks across multiple system components.
Audit Trail Generation
Creates comprehensive audit logs for compliance and troubleshooting purposes.
Scalability Benefits
- Horizontal Scaling: Kafka’s distributed architecture enables system scaling
- Fault Tolerance: Built-in redundancy ensures system reliability
- Load Distribution: Distributes processing load across multiple consumers
- Performance Isolation: Separates message processing from core application logic
message_sender_kafka.rs Send Mesages To Kafka Topics
use crate::message_sender::MessagePublisher;
use crate::server_config;
use async_trait::async_trait;
use futures_util::future::join_all;
use rdkafka::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use std::collections::HashMap;
use std::error::Error;
use std::time::Duration;
use tracing::{debug, error, info};
use database::dicom_meta::{DicomImageMeta, DicomStateMeta, DicomStoreMeta};
pub struct KafkaMessagePublisher {
producer: FutureProducer,
topic: String,
}
impl KafkaMessagePublisher {
pub fn topic(&self) -> &str {
&self.topic
}
pub fn new(topic_name: String) -> Self {
let app_config = server_config::load_config().expect("Failed to load config");
let config = app_config.kafka;
let brokers = config.brokers;
let producer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "30000") //increase message timeout to 30 seconds
// ---BATCH SENDING RELATED CONFIGURATION ---
.set(
"queue.buffering.max.messages",
config.queue_buffering_max_messages.to_string(),
) // producer MAXIMUM INTERNAL QUEUE MESSAGES
.set(
"queue.buffering.max.kbytes",
config.queue_buffering_max_kbytes.to_string(),
) // maximum queue size (1GB)`
.set(
"queue.buffering.max.ms",
config.queue_buffering_max_ms.to_string(),
) // MAXIMUM BUFFER TIME FORCE SENDING AFTER 100 MS
.set("batch.num.messages", config.batch_num_messages.to_string()) // MAXIMUM MESSAGES PER BATCH
.set("linger.ms", config.linger_ms.to_string()) //WAIT FOR MORE MESSAGES TO FORM LARGER BATCHES
.set("compression.codec", config.compression_codec.to_string()) // ENABLE COMPRESSION TO REDUCE NETWORK OVERHEAD
// ADD RETRY MECHANISM
.set("retries", "5")
.set("retry.backoff.ms", "1000")
.create()
.expect("Failed to create KafkaMessagePublisher");
Self {
producer,
topic: topic_name,
}
}
}
#[async_trait]
impl MessagePublisher for KafkaMessagePublisher {
async fn send_message(&self, msg: &DicomStoreMeta) -> Result<(), Box<dyn Error>> {
info!(
"KafkaMessagePublisher send_message to topic: {}",
self.topic
);
let payload = serde_json::to_vec(msg)?;
let key = String::from(msg.trace_id.as_str());
let record: FutureRecord<String, Vec<u8>> =
FutureRecord::to(&*self.topic).key(&key).payload(&payload);
match self
.producer
.send(record, Timeout::After(Duration::from_secs(10))) // INCREASE TIMEOUT TO 10 SECONDS
.await
.map_err(|(e, _)| e)
{
Ok(_) => {
self.producer.flush(Duration::from_micros(500))?;
debug!("Flushed KafkaMessagePublisher");
Ok(())
}
Err(e) => {
error!("Failed to send message to Kafka: {:?}", e);
Err(Box::new(e))
}
}
}
async fn send_batch_messages(&self, messages: &[DicomStoreMeta]) -> Result<(), Box<dyn Error>> {
info!(
"KafkaMessagePublisher send_batch_messages: {} to topic {}",
messages.len(),
self.topic
);
let mut wait_message = HashMap::new();
for msg in messages {
match serde_json::to_vec(msg) {
Ok(payload) => {
let key = String::from(msg.trace_id.as_str());
wait_message.insert(key.clone(), payload.clone());
}
Err(e) => {
error!("Failed to serialize message: {:?}", e);
return Err(Box::new(e));
}
}
}
let futures: Vec<_> = wait_message
.iter()
.map(|(key, payload)| {
let record = FutureRecord::to(&*self.topic)
.key(&key[..])
.payload(&payload[..]);
// INCREASE TIMEOUT TO 10 SECONDS
self.producer
.send(record, Timeout::After(Duration::from_secs(10)))
})
.collect();
let results = join_all(futures).await;
let mut success_count = 0;
let mut error_count = 0;
for result in results {
match result {
Ok(_) => success_count += 1,
Err(e) => {
error!("Failed to send message: {:?}", e);
error_count += 1;
}
}
}
info!(
"✅ Batch sending completed: {} successful, {} failed",
success_count, error_count
);
if error_count > 0 {
Err("Some messages failed to send".into())
} else {
Ok(())
}
}
async fn send_state_messages(&self, messages: &[DicomStateMeta]) -> Result<(), Box<dyn Error>> {
info!(
"KafkaMessagePublisher send_state_messages: {} to topic {}",
messages.len(),
self.topic
);
let mut wait_message = HashMap::new();
for msg in messages {
match serde_json::to_vec(msg) {
Ok(payload) => {
// tenant_id + patient_id + study_uid + series_uid
let key_source = format!("{}_{}_{}_{}",
msg.tenant_id.as_str(),
msg.patient_id.as_str(),
msg.study_uid.as_str(),
msg.series_uid.as_str());
let key = format!("{:x}", md5::compute(key_source));
wait_message.insert(key, payload);
}
Err(e) => {
error!("Failed to serialize DicomStateMeta message: {:?}", e);
return Err(Box::new(e));
}
}
}
let futures: Vec<_> = wait_message
.iter()
.map(|(key, payload)| {
let record = FutureRecord::to(&*self.topic)
.key(&key[..])
.payload(&payload[..]);
// INCREASE TIMEOUT TO 10 SECONDS
self.producer
.send(record, Timeout::After(Duration::from_secs(10)))
})
.collect();
let results = join_all(futures).await;
let mut success_count = 0;
let mut error_count = 0;
for result in results {
match result {
Ok(_) => success_count += 1,
Err(e) => {
error!("Failed to send DicomStateMeta message: {:?}", e);
error_count += 1;
}
}
}
info!(
"✅ Batch sending DicomStateMeta completed: {} successful, {} failed",
success_count, error_count
);
if error_count > 0 {
Err("Some DicomStateMeta messages failed to send".into())
} else {
Ok(())
}
}
async fn send_image_messages(&self, messages: &[DicomImageMeta]) -> Result<(), Box<dyn Error>> {
info!(
"KafkaMessagePublisher send_image_messages: {} to topic {}",
messages.len(),
self.topic
);
let mut wait_message = HashMap::new();
for msg in messages {
match serde_json::to_vec(msg) {
Ok(payload) => {
// tenant_id + patient_id + study_uid + series_uid + sop_uid
let key_source = format!(
"{}_{}_{}_{}_{}",
msg.tenant_id.as_str(),
msg.patient_id.as_str(),
msg.study_uid.as_str(),
msg.series_uid.as_str(),
msg.sop_uid.as_str()
);
let key = format!("{:x}", md5::compute(key_source));
wait_message.insert(key, payload);
}
Err(e) => {
error!("Failed to serialize DicomImageMeta message: {:?}", e);
return Err(Box::new(e));
}
}
}
let futures: Vec<_> = wait_message
.iter()
.map(|(key, payload)| {
let record = FutureRecord::to(&*self.topic)
.key(&key[..])
.payload(&payload[..]);
// INCREASE TIMEOUT TO 10 SECONDS
self.producer
.send(record, Timeout::After(Duration::from_secs(10)))
})
.collect();
let results = join_all(futures).await;
let mut success_count = 0;
let mut error_count = 0;
for result in results {
match result {
Ok(_) => success_count += 1,
Err(e) => {
error!("Failed to send DicomImageMeta message: {:?}", e);
error_count += 1;
}
}
}
info!(
"✅ Batch sending DicomImageMeta completed: {} successful, {} failed",
success_count, error_count
);
if error_count > 0 {
Err("Some DicomImageMeta messages failed to send".into())
} else {
Ok(())
}
}
}
GoTo Summary : how-to-build-cloud-dicom