YADTQ is a distributed task queue system built with Python, using Kafka as the message broker and Redis as the result backend. It provides robust task distribution, worker management, and fault tolerance capabilities.
- Distributed task processing with multiple workers
- Automatic worker failure detection and recovery
- Task status tracking and result storage
- Round-robin task distribution
- Worker heartbeat monitoring
- Graceful shutdown handling
- Comprehensive logging
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS
brew install redis
# Start Redis server
redis-server
# Download Kafka
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka
bin/kafka-server-start.sh config/server.properties
-
Clone the repository.
-
Install dependencies:
pip install --break-system-packages git+https://github.com/dpkp/kafka-python.git
pip install redis numpy colorlog
- Navigate to Kafka server properties
sudo nano kafka/bin/server.properties
- Add following lines
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
- Open Redis configuration file
sudo nano /etc/redis/redis.conf
- Add the following lines
bind localhost
sudo systemctl restart kafka
sudo systemctl restart redis
The worker system is built with fault tolerance and automatic recovery:
- Multi-threaded Processing: Each worker runs 3 main threads:
- Consumer Thread: Fetches tasks from Kafka
- Processor Thread: Executes tasks
- Heartbeat Thread: Sends health signals
- Automatic Recovery: Workers automatically recover from failures with configurable retry attempts
- Partition Assignment: Each worker is assigned to a specific Kafka partition for balanced task distribution
The system includes a robust heartbeat monitoring mechanism:
- Health Checking: Workers emit periodic heartbeats to signal their health status
- Failure Detection: Client monitors heartbeats and detects worker failures
- Automatic Recovery: Failed workers are automatically restarted
- Uses topic partitioning for parallel processing
- Configurable number of partitions
- Automatic topic creation and management
- Stores task statuses and results
- Provides fast status lookups
- Maintains task metadata
Default configuration in config.py
:
# Kafka Configuration
KAFKA_HOST = 'localhost'
KAFKA_PORT = 9092
KAFKA_SERVERS = [f'{KAFKA_HOST}:{KAFKA_PORT}']
KAFKA_TASK_TOPIC = 'yadtq_tasks'
KAFKA_HEARTBEAT_TOPIC = "worker-heartbeats"
# Redis Configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
For production deployments:
# Worker Configuration
MAX_WORKER_RETRIES = 3
HEARTBEAT_INTERVAL = 5
MISSED_HEARTBEATS_THRESHOLD = 3
# Task Queue Configuration
TASK_TIMEOUT = 10
AUTO_COMMIT_INTERVAL_MS = 1000
from client import YADTQClient
# Initialize client with 4 worker partitions
client = YADTQClient(num_partitions=4)
# Submit a task
task_id = client.submit_task('addition', args=[1, 2])
# Check task status
status = client.get_task_status(task_id)
# Get result
result = client.get_result(task_id)
# Clean up
client.close()
# Submit multiple tasks
tasks = [
{
'task': 'addition',
'args': [10, 20, 30],
'kwargs': {'round_to': 2}
},
{
'task': 'multiplication',
'args': [2.5, 3.5],
'kwargs': {'round_to': 1}
}
]
task_ids = client.submit_tasks(tasks)
- Basic arithmetic: addition, subtraction, multiplication, division
- Advanced math: power, root
- Matrix operations: matrix_addition, matrix_multiplication
The system provides comprehensive error handling:
- Task Failures: Failed tasks are marked and errors are stored
- Worker Failures: Automatic worker recovery with configurable retries
- Connection Issues: Automatic reconnection attempts for both Kafka and Redis
- Invalid Tasks: Validation and appropriate error responses
# Get all tasks
all_tasks = client.get_all_tasks()
# Get tasks by status
failed_tasks = client.get_tasks_by_status(TaskStatus.FAILED)
queued_tasks = client.get_tasks_by_status(TaskStatus.QUEUED)
# Check all workers health
worker_status = client.get_all_workers_health()
# Clear all tasks
client.clear_all_tasks()
Common error codes and their solutions:
KAFKA_CONNECTION_ERROR
: Check if Kafka server is running and accessibleREDIS_CONNECTION_ERROR
: Verify Redis connection settings and server statusWORKER_INITIALIZATION_FAILED
: Check logs for specific worker startup errorsTASK_SUBMISSION_TIMEOUT
: Increase timeout value or check network connectivityWORKER_RECOVERY_FAILED
: Check max retries and worker logs
To install all necessary dependencies and set up the environment, you can use the provided setup.sh
script. Follow these steps:
-
Download the
setup.sh
script:wget https://github.com/Cloud-Computing-Big-Data/RR-Team-45-yadtq-yet-another-distributed-task-queue-.git/setup.sh
-
Make the script executable:
chmod +x setup.sh
-
Run the script:
./setup.sh
The setup.sh
script will handle the following tasks:
- Install required system packages
- Set up a virtual environment
- Install Python dependencies
- Configure Kafka and Redis
The system uses colored logging for better visibility:
# Log levels are color-coded:
# DEBUG: cyan
# INFO: green
# WARNING: yellow
# ERROR: red
# CRITICAL: red with white background