π SkysPy uses Celery for distributed background task processing, enabling real-time aircraft tracking, analytics computation, external data synchronization, and notification delivery without blocking the main application.
flowchart TB
subgraph Producers["π€ Task Producers"]
API[π Django API]
WS[π Socket.IO Handlers]
Beat[β±οΈ Celery Beat Scheduler]
end
subgraph Broker["π΄ Redis Broker"]
Redis[(Redis 6.x)]
end
subgraph Queues["π¬ Task Queues"]
Q1[π¨ polling<br/>High Priority]
Q2[π¦ default<br/>General Purpose]
Q3[ποΈ database<br/>DB Operations]
Q4[π notifications<br/>Alert Delivery]
Q5[ποΈ transcription<br/>Audio Processing]
Q6[π’ low_priority<br/>Analytics & Cleanup]
end
subgraph Workers["π· Celery Workers"]
W1[Worker 1<br/>gevent pool]
W2[Worker 2<br/>prefork pool]
W3[Worker N<br/>...]
end
Producers --> Redis
Redis --> Queues
Queues --> Workers
style Q1 fill:#ff6b6b,color:#fff
style Q2 fill:#4ecdc4,color:#fff
style Q3 fill:#45b7d1,color:#fff
style Q4 fill:#f7dc6f,color:#000
style Q5 fill:#bb8fce,color:#fff
style Q6 fill:#85c1e9,color:#fff
π Metric Value Total Tasks 45+ Queue Count 6 Fastest Interval 2 seconds Slowest Interval Weekly
π Task Categories Overview Category Tasks Primary Queue Frequency Range βοΈ Aircraft Tracking 5 polling2s - 5min π Analytics 12 polling / low_priority30s - Daily ποΈ Transcription 4 transcription10s - On-demand π Geographic Data 8 database10min - Daily π Notifications 5 notifications30s - Daily π§Ή Cleanup 8 low_priorityDaily - Weekly π― Cannonball Mode 5 polling / database5s - Daily π‘ External Sync 6 databaseOn-demand - Daily
flowchart LR
subgraph Priority["Task Priority Levels"]
direction TB
P1["π΄ CRITICAL<br/>polling queue<br/>2-60s intervals"]
P2["π‘ NORMAL<br/>default queue<br/>On-demand tasks"]
P3["π’ LOW<br/>low_priority queue<br/>Analytics & cleanup"]
end
P1 --> P2 --> P3
Queue Priority Purpose Concurrency Tasks pollingπ΄ Critical Time-sensitive, high-frequency 50+ Aircraft polling, stats, cannonball defaultπ‘ Normal General-purpose operations 10-20 Cache cleanup, info lookups databaseπ Medium Database-intensive operations 5-10 External DB sync, geodata notificationsπ‘ Normal Notification delivery 10-20 Send notifications, queue processing transcriptionπ΅ Background Long-running audio processing 2-4 Whisper transcription low_priorityπ’ Low Expensive, non-urgent 2-4 Daily stats, vacuum, aggregation
β‘ For high-load deployments, run dedicated workers per queue to isolate time-critical tasks from long-running operations.
π
π poll_aircraft - Core Tracking Loop Polls aircraft positions from the ADS-B receiver and broadcasts real-time updates to connected clients.
sequenceDiagram
participant Beat as β±οΈ Celery Beat
participant Task as π poll_aircraft
participant ADS-B as π‘ ADS-B Receiver
participant Cache as πΎ Redis Cache
participant SIO as π Socket.IO
Beat->>Task: Every 2 seconds
Task->>ADS-B: Fetch positions
ADS-B-->>Task: Aircraft data
Task->>Cache: Update current_aircraft
Task->>SIO: Broadcast updates
Note over SIO: aircraft_update<br/>positions_update<br/>aircraft_new<br/>aircraft_remove Property Value β±οΈ Schedule Every 2 seconds π¬ Queue pollingπ Max Retries 0 β³ Expires 2 seconds π€ Events aircraft_update, positions_update, aircraft_new, aircraft_remove
π update_stats_cache - Statistics Refresh Updates cached statistics for quick API retrieval.
Property Value β±οΈ Schedule Every 60 seconds π¬ Queue pollingπΎ Cache Key aircraft_stats
π‘οΈ update_safety_stats - Safety Metrics Updates cached safety event statistics for real-time safety monitoring.
Property Value β±οΈ Schedule Every 30 seconds π¬ Queue pollingπΎ Cache Key safety_stats
ποΈ cleanup_sessions - Session Management Identifies and marks stale aircraft tracking sessions.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue database
πΎ store_aircraft_sightings - Batch Storage Batch stores aircraft sightings to the database for historical tracking.
Property Value β±οΈ Schedule On-demand π¬ Queue default
Parameters:
Name Type Description aircraft_datalistList of aircraft position dictionaries
π
flowchart TB
subgraph RealTime["β‘ Real-Time Analytics"]
A1[refresh_acars_stats<br/>Every 60s]
A2[update_antenna_analytics<br/>Every 5min]
A3[update_safety_stats<br/>Every 30s]
end
subgraph NearRealTime["π Near Real-Time"]
B1[refresh_flight_pattern_geographic_stats<br/>Every 2min]
B2[refresh_tracking_quality_stats<br/>Every 2min]
B3[refresh_engagement_stats<br/>Every 2min]
B4[refresh_time_comparison_stats<br/>Every 5min]
end
subgraph Aggregation["π Aggregation"]
C1[aggregate_hourly_antenna_analytics<br/>Hourly]
C2[calculate_daily_stats<br/>Daily]
end
RealTime --> NearRealTime --> Aggregation
π‘ update_antenna_analytics - Antenna Performance Calculates antenna performance metrics including range, RSSI, and coverage analysis.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue polling
Returns:
JSON
{
"max_range_by_direction": {"0": 125.3, "30": 98.7},
"overall_max_range": 125.3,
"avg_range": 45.2,
"total_positions": 15234,
"unique_aircraft": 127,
"avg_rssi": -12.5,
"coverage_percentage": 91.7,
"range_percentiles": {"p50": 35.2, "p75": 52.1, "p90": 78.4, "p95": 95.2}
}
π
calculate_daily_stats - Daily Aggregates Generates daily statistics for historical analysis and trending.
Property Value β±οΈ Schedule Daily at 1:00 AM UTC π¬ Queue low_priority
Returns:
JSON
{
"date": "2024-01-14",
"total_sightings": 125000,
"unique_aircraft": 1523,
"total_sessions": 892,
"military_sessions": 45,
"avg_distance": 32.5,
"max_distance": 156.2
}
π aggregate_hourly_antenna_analytics Creates hourly aggregate snapshots from 5-minute snapshots for trend analysis.
Property Value β±οΈ Schedule 5 minutes past each hour π¬ Queue low_priority
π§Ή cleanup_antenna_analytics_snapshots Removes old antenna analytics snapshots based on retention policy.
Property Value β±οΈ Schedule Daily at 4:30 AM UTC π¬ Queue low_priorityποΈ Default Retention 7 days
π» refresh_acars_stats - ACARS Metrics Refreshes ACARS/VDL2 message statistics cache.
Property Value β±οΈ Schedule Every 60 seconds π¬ Queue polling
π refresh_time_comparison_stats Calculates week-over-week, seasonal, and time-of-day comparisons.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue low_priority
πΊοΈ refresh_flight_pattern_geographic_stats Refreshes flight pattern and geographic statistics.
Property Value β±οΈ Schedule Every 2 minutes π¬ Queue low_priority
πΆ refresh_tracking_quality_stats Calculates tracking quality metrics and grade distribution.
Property Value β±οΈ Schedule Every 2 minutes π¬ Queue low_priority
π« refresh_engagement_stats Computes user engagement and favorite tracking statistics.
Property Value β±οΈ Schedule Every 2 minutes π¬ Queue low_priority
β update_favorite_tracking Updates tracking statistics for favorited aircraft.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue database
π§ cleanup_memory_cache Cleans expired entries from in-memory caches to prevent memory growth.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue default
π
flowchart LR
Audio[π΅ Audio File] --> Queue[π Transcription Queue]
Queue --> Process[π process_transcription_queue]
Process --> Transcribe[ποΈ transcribe_audio]
Transcribe --> Extract[βοΈ extract_callsigns]
Extract --> DB[(πΎ Database)]
style Audio fill:#bb8fce,color:#fff
style Transcribe fill:#bb8fce,color:#fff
π process_transcription_queue - Queue Processor Processes queued audio files for transcription.
Property Value β±οΈ Schedule Every 10 seconds π¬ Queue transcriptionβ³ Expires 10 seconds
ποΈ transcribe_audio - Whisper Transcription Transcribes a single audio file using Whisper or ATC-Whisper.
Property Value β±οΈ Schedule On-demand π¬ Queue transcriptionπ Max Retries 3 β³ Retry Delay 60 seconds
Parameters:
Name Type Description transmission_idintAudioTransmission database ID
βοΈ extract_callsigns - Callsign Extraction Extracts aircraft callsigns from transcripts using pattern matching.
Property Value β±οΈ Schedule On-demand π¬ Queue transcription
π reprocess_all_transcripts - Bulk Reprocessing Re-runs callsign extraction on all completed transcripts.
Property Value β±οΈ Schedule On-demand π¬ Queue transcription
π
π refresh_all_geodata - Full Refresh Refreshes all geographic data including airports, navaids, and GeoJSON boundaries.
Property Value β±οΈ Schedule Daily at 3:30 AM UTC π¬ Queue databaseπ Max Retries 3 β³ Retry Delay 300 seconds
Returns:
JSON
{
"airports": 15234,
"navaids": 8923,
"geojson": 127
}
π check_and_refresh_geodata - Staleness Check Checks staleness and triggers refresh if needed.
Property Value β±οΈ Schedule Every hour π¬ Queue database
π©οΈ refresh_pireps - Pilot Reports Fetches PIREPs from Aviation Weather Center.
Property Value β±οΈ Schedule Every 10 minutes π¬ Queue databaseπ Max Retries 3
Parameters:
Name Type Default Description bboxstr"24,-130,50,-60"Geographic bounding box hoursint6Hours of PIREPs to fetch
π€οΈ refresh_metars - Weather Reports Fetches METARs from Aviation Weather Center.
Property Value β±οΈ Schedule Every 10 minutes π¬ Queue databaseπ Max Retries 3
π refresh_tafs - Terminal Forecasts Fetches TAFs from Aviation Weather Center.
Property Value β±οΈ Schedule Every 30 minutes π¬ Queue databaseπ Max Retries 3
π§Ή cleanup_old_pireps Removes expired PIREPs from the database.
Property Value β±οΈ Schedule Every hour π¬ Queue databaseποΈ Default Retention 24 hours
π
β οΈ refresh_airspace_advisories - Live Advisories Fetches G-AIRMETs, SIGMETs, and advisories from Aviation Weather Center.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue databaseπ€ Events airspace_advisory
πΊοΈ refresh_airspace_boundaries - Static Boundaries Refreshes static airspace boundaries from OpenAIP.
Property Value β±οΈ Schedule Daily at 3:00 AM UTC π¬ Queue databaseπ Max Retries 3
π
π refresh_openaip_data - Cache Warming Warms the OpenAIP cache by prefetching data for major US regions.
Property Value β±οΈ Schedule Daily at 5:15 AM UTC π¬ Queue databaseπ Max Retries 3
π prefetch_openaip_airspaces - Region Prefetch Prefetches airspaces for a specific region.
Property Value β±οΈ Schedule On-demand π¬ Queue database
Parameters:
Name Type Default Description latfloatRequired Center latitude lonfloatRequired Center longitude radius_nmfloat200Search radius in nautical miles
π
π refresh_notams - NOTAM Sync Refreshes all NOTAMs from FAA Aviation Weather API.
Property Value β±οΈ Schedule Every 15 minutes π¬ Queue databaseπ Max Retries 3 π€ Events notam_refresh, stats_update
π§Ή cleanup_expired_notams Archives and deletes expired NOTAMs based on retention policy.
Property Value β±οΈ Schedule Daily at 4:15 AM UTC π¬ Queue database
Parameters:
Name Type Default Description archive_daysint7Days after expiration to archive delete_daysint90Days after archival to hard delete
π¨ broadcast_new_tfr - TFR Alerts Broadcasts a new TFR notification via Socket.IO.
Property Value β±οΈ Schedule On-demand π¬ Queue database
π
flowchart TB
subgraph Sources["π‘ Data Sources"]
S1[ADS-B Exchange]
S2[tar1090]
S3[FAA Registry]
S4[OpenSky Network]
S5[HexDB]
S6[Planespotters]
end
subgraph Sync["π Sync Tasks"]
T1[sync_external_databases<br/>Daily 4:00 AM]
T2[update_stale_databases<br/>Every 6 hours]
T3[fetch_aircraft_info<br/>On-demand]
T4[fetch_aircraft_photos<br/>On-demand]
end
subgraph Storage["πΎ Storage"]
DB[(PostgreSQL)]
Cache[(Redis Cache)]
end
Sources --> Sync --> Storage
π sync_external_databases - Full Sync Syncs aircraft databases from external sources (ADS-B Exchange, tar1090, FAA, OpenSky).
Property Value β±οΈ Schedule Daily at 4:00 AM UTC π¬ Queue databaseπ Max Retries 3
π update_stale_databases - Incremental Update Checks and updates databases older than 24 hours.
Property Value β±οΈ Schedule Every 6 hours π¬ Queue databaseπ Max Retries 3
βοΈ fetch_aircraft_info - Info Lookup Fetches aircraft info from multiple sources (in-memory DB, HexDB, adsb.lol).
Property Value β±οΈ Schedule On-demand π¬ Queue default
π· fetch_aircraft_photos - Photo Download Fetches and caches aircraft photos from Planespotters or HexDB.
Property Value β±οΈ Schedule On-demand π¬ Queue default
π refresh_stale_aircraft_info Refreshes aircraft info records older than the specified age.
Property Value β±οΈ Schedule Daily at 5:00 AM UTC π¬ Queue database
πΈ batch_upgrade_aircraft_photos Upgrades photos to higher resolution versions.
Property Value β±οΈ Schedule Daily at 5:30 AM UTC π¬ Queue database
π§Ή cleanup_orphan_aircraft_info Removes AircraftInfo records for aircraft not seen recently.
Property Value β±οΈ Schedule Weekly on Sundays at 6:00 AM UTC π¬ Queue database
π
flowchart LR
subgraph Master["π― Master Task"]
M[run_all_cleanup_tasks<br/>Daily 3:00 AM]
end
subgraph Children["π Cleanup Tasks"]
C1[cleanup_old_sightings<br/>30 days]
C2[cleanup_old_sessions<br/>90 days]
C3[cleanup_old_alert_history<br/>30 days]
C4[cleanup_old_safety_events<br/>90 days]
C5[cleanup_old_acars_messages<br/>7 days]
end
subgraph Maintenance["π§ Maintenance"]
V[vacuum_analyze_tables<br/>Weekly]
end
M --> Children
Children --> Maintenance
π― run_all_cleanup_tasks - Master Cleanup Master cleanup task that orchestrates all data retention cleanups.
Property Value β±οΈ Schedule Daily at 3:00 AM UTC π¬ Queue low_priority
βοΈ cleanup_old_sightings Property Value ποΈ Retention SIGHTING_RETENTION_DAYS (default: 30)
π cleanup_old_sessions Property Value ποΈ Retention SESSION_RETENTION_DAYS (default: 90)
π cleanup_old_alert_history Property Value ποΈ Retention ALERT_HISTORY_DAYS (default: 30)
π‘οΈ cleanup_old_safety_events Property Value ποΈ Retention SAFETY_EVENT_RETENTION_DAYS (default: 90)
π» cleanup_old_acars_messages Property Value ποΈ Retention 7 days (fixed)
π§ vacuum_analyze_tables - PostgreSQL Maintenance Runs PostgreSQL VACUUM ANALYZE on frequently updated tables.
Property Value β±οΈ Schedule Weekly on Sundays at 4:00 AM UTC π¬ Queue low_priority
π
sequenceDiagram
participant Alert as π¨ Alert Trigger
participant Queue as π Notification Queue
participant Task as π send_notification_task
participant Apprise as π€ Apprise
participant Channels as π± Notification Channels
Alert->>Queue: Queue notification
Queue->>Task: Process
Task->>Apprise: Send via Apprise
Apprise->>Channels: Deliver
alt Success
Channels-->>Task: β
Delivered
else Failure
Channels-->>Task: β Failed
Task->>Queue: Retry with backoff
end
π€ send_notification_task - Notification Delivery Sends a notification via Apprise with automatic retry and exponential backoff.
Property Value β±οΈ Schedule On-demand π¬ Queue notificationsπ Max Retries 5 β³ Retry Backoff Exponential (max 1 hour)
Parameters:
Name Type Default Description channel_urlstrRequired Apprise-compatible URL titlestrRequired Notification title bodystrRequired Notification body prioritystr'info'Priority level event_typestr'alert'Event type
π process_notification_queue Processes notifications pending retry.
Property Value β±οΈ Schedule Every 30 seconds π¬ Queue notifications
π§Ή cleanup_old_notification_logs Removes old notification logs.
Property Value β±οΈ Schedule Daily at 3:15 AM UTC π¬ Queue notificationsποΈ Default Retention 30 days
β° cleanup_notification_cooldowns Cleans up notification cooldown entries to prevent memory growth.
Property Value β±οΈ Schedule Every 30 minutes π¬ Queue notifications
π§ͺ test_notification_channel Sends a test notification to verify channel configuration.
Property Value β±οΈ Schedule On-demand π¬ Queue notifications
π π¨ Real-time law enforcement aircraft detection and pattern analysis.
flowchart TB
subgraph Detection["π― Detection Loop"]
A[analyze_aircraft_patterns<br/>Every 5 seconds]
end
subgraph Processing["βοΈ Processing"]
B[Pattern Analysis]
C[Threat Scoring]
D[Alert Generation]
end
subgraph Storage["πΎ Storage"]
E[(CannonballSession)]
F[(CannonballPattern)]
G[(CannonballAlert)]
end
subgraph Broadcast["π‘ Broadcast"]
H[threats_update]
I[new_alert]
end
A --> B --> C --> D
D --> E & F & G
D --> H & I
π analyze_aircraft_patterns - Pattern Detection Analyzes current aircraft for law enforcement patterns (orbiting, surveillance, etc.).
Property Value β±οΈ Schedule Every 5 seconds π¬ Queue pollingβ³ Expires 5 seconds π Max Retries 0 π€ Events threats_update, new_alert
π§Ή cleanup_cannonball_sessions Marks stale Cannonball sessions as inactive.
Property Value β±οΈ Schedule Every 5 minutes π¬ Queue database
ποΈ cleanup_old_patterns Deletes patterns older than retention period.
Property Value β±οΈ Schedule Daily at 3:45 AM UTC π¬ Queue low_priorityποΈ Retention CANNONBALL_PATTERN_RETENTION_DAYS (default: 30)
π aggregate_cannonball_stats Creates hourly statistics aggregates for trend analysis.
Property Value β±οΈ Schedule 10 minutes past each hour π¬ Queue low_priority
π update_user_location Updates user location for threat calculations.
Property Value β±οΈ Schedule On-demand π¬ Queue default
β±οΈ All times are in UTC. The scheduler is configured in skyspy/celery.py.
Task Frequency Queue Notes poll_aircraftπ΄ 2s pollingExpires after 2s analyze_aircraft_patternsπ΄ 5s pollingCannonball mode process_transcription_queueπ 10s transcriptionExpires after 10s update_safety_statsπ‘ 30s pollingprocess_notification_queueπ‘ 30s notificationsupdate_stats_cacheπ’ 60s pollingrefresh_acars_statsπ’ 60s polling
Task Frequency Queue Notes refresh_flight_pattern_geographic_stats2m low_priorityrefresh_tracking_quality_stats2m low_priorityrefresh_engagement_stats2m low_prioritycleanup_cannonball_sessions5m databasecleanup_memory_cache5m defaultcleanup_sessions5m databaserefresh_airspace_advisories5m databaseupdate_antenna_analytics5m pollingupdate_favorite_tracking5m databaserefresh_time_comparison_stats5m low_priorityrefresh_metars10m databaserefresh_pireps10m databaserefresh_notams15m databasecleanup_notification_cooldowns30m notificationsrefresh_tafs30m database
Task Frequency Queue Notes aggregate_hourly_antenna_analytics:05 past hour low_priorityaggregate_cannonball_stats:10 past hour low_prioritycleanup_pireps_hourlyHourly databasecheck_geodata_freshness_hourlyHourly databaseupdate_stale_databases6 hours database
Task Time (UTC) Queue Notes calculate_daily_stats1:00 AM low_priorityrun_all_cleanup_tasks3:00 AM low_priorityMaster cleanup refresh_airspace_boundaries3:00 AM databasecleanup_notification_logs3:15 AM notificationsrefresh_geodata_daily3:30 AM databasecleanup_cannonball_patterns3:45 AM low_prioritysync_external_databases4:00 AM databasecleanup_expired_notams4:15 AM databasecleanup_antenna_analytics4:30 AM low_priorityrefresh_stale_aircraft_info5:00 AM databaserefresh_openaip_data5:15 AM databasebatch_upgrade_aircraft_photos5:30 AM database
Task Schedule (UTC) Queue Notes vacuum_analyze_tablesSunday 4:00 AM low_priorityPostgreSQL maintenance cleanup_orphan_aircraft_infoSunday 6:00 AM database
flowchart LR
subgraph Attempt["π― Task Execution"]
A[Execute Task]
end
subgraph Decision{"Success?"}
B{Check Result}
end
subgraph Success["β
Success"]
C[Complete]
end
subgraph Retry["π Retry Logic"]
D[Calculate Backoff]
E[Add Jitter]
F[Schedule Retry]
end
subgraph Fail["β Max Retries"]
G[Mark Failed]
H[Log to Sentry]
end
A --> B
B -->|Yes| C
B -->|No| D --> E --> F --> A
F -->|Max Reached| G --> H
Parameter Description Default max_retriesMaximum retry attempts 3retry_backoffEnable exponential backoff Falseretry_backoff_maxMaximum backoff delay (seconds) 600retry_jitterAdd randomness to delays Trueautoretry_forException types to auto-retry ()
π‘ Use expires for time-sensitive tasks to prevent queue buildup when the system is under load.
Python
@shared_task(
bind=True,
max_retries=3,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=3600,
retry_jitter=True,
)
def example_task(self):
try:
# Task logic
pass
except Exception as e:
raise self.retry(exc=e, countdown=60)
Failed notifications are tracked in NotificationLog with status:
Status Description pendingβ³ Awaiting delivery retryingπ Scheduled for retry sentβ
Successfully delivered failedβ All retries exhausted
Deploy Flower for real-time task monitoring:
Bash
celery -A skyspy flower --port=5555
π
Bash
# Watch active tasks
celery -A skyspy inspect active
# View scheduled tasks
celery -A skyspy inspect scheduled
# Check worker stats
celery -A skyspy inspect stats
# View registered tasks
celery -A skyspy inspect registered
Python
LOGGING = {
'handlers': {
'celery': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': '/var/log/skyspy/celery.log',
'formatter': 'verbose',
},
},
'loggers': {
'celery': {
'handlers': ['celery'],
'level': 'INFO',
'propagate': False,
},
'skyspy.tasks': {
'handlers': ['celery'],
'level': 'DEBUG',
'propagate': False,
},
},
}
SkysPy automatically reports task errors to Sentry when configured:
Python
from skyspy.utils.sentry import capture_task_error
try:
# Task logic
pass
except Exception as e:
capture_task_error(e, 'task_name', extra={'key': 'value'})
raise
π―
π Production Configuration (Gevent) Bash
celery -A skyspy worker \
--pool=gevent \
--concurrency=100 \
--loglevel=INFO \
-Q polling,default,database,notifications,transcription,low_priority
π§ Development Configuration (Prefork) Bash
celery -A skyspy worker \
--pool=prefork \
--concurrency=4 \
--loglevel=DEBUG \
-Q polling,default,database,notifications,transcription,low_priority
β‘ For high-load deployments, run dedicated workers per queue to isolate time-critical tasks.
Bash
# High-priority polling worker
celery -A skyspy worker -Q polling -c 50 --pool=gevent -n polling@%h
# Database operations worker
celery -A skyspy worker -Q database -c 10 --pool=prefork -n database@%h
# Low-priority worker
celery -A skyspy worker -Q low_priority -c 4 --pool=prefork -n lowprio@%h
Python
# Disable prefetching for time-sensitive tasks
app.conf.worker_prefetch_multiplier = 1
# Acknowledge after completion (prevents lost tasks)
app.conf.task_acks_late = True
# Re-queue tasks if worker dies
app.conf.task_reject_on_worker_lost = True
π¦ SkysPy includes RPi-specific optimizations in settings_rpi.py.
Setting Standard RPi Reduction stats_cache interval60s 90s 1.5x slower safety_stats interval30s 60s 2x slower acars_stats interval60s 120s 2x slower MAX_SEEN_AIRCRAFT10000 1000 10x smaller SIGHTING_RETENTION_DAYS30 7 4x shorter SESSION_RETENTION_DAYS90 14 6x shorter
ini
# redis.conf - Production settings
maxmemory 512mb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec
Setting Default Description CELERY_BROKER_URLredis://localhost:6379/0Redis broker connection URL CELERY_RESULT_BACKENDredis://localhost:6379/0Task result storage backend CELERY_ACCEPT_CONTENT['json']Accepted serialization formats CELERY_TASK_SERIALIZERjsonTask serialization format CELERY_RESULT_SERIALIZERjsonResult serialization format CELERY_TIMEZONEUTCScheduler timezone CELERY_TASK_TRACK_STARTEDTrueTrack task start times CELERY_TASK_TIME_LIMIT180030-minute task timeout CELERY_BEAT_SCHEDULERDatabaseSchedulerDatabase-backed scheduler
Variable Default RPi Default Description SIGHTING_RETENTION_DAYS30 7 Days to retain sightings SESSION_RETENTION_DAYS90 14 Days to retain sessions ALERT_HISTORY_DAYS30 7 Days to retain alert history SAFETY_EVENT_RETENTION_DAYS90 14 Days to retain safety events ANTENNA_SNAPSHOT_RETENTION_DAYS7 3 Days to retain 5-min snapshots
Variable Default Description TRANSCRIPTION_ENABLEDFalseEnable audio transcription WHISPER_ENABLEDFalseEnable Whisper transcription ATC_WHISPER_ENABLEDFalseEnable ATC-Whisper PHOTO_AUTO_DOWNLOADFalseAuto-download aircraft photos OPENSKY_DB_ENABLEDTrueEnable OpenSky database
YAML
services:
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
celery-worker:
build: ./skyspy_django
command: celery -A skyspy worker -l INFO -Q polling,default,database,notifications,transcription,low_priority
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
redis:
condition: service_healthy
celery-beat:
build: ./skyspy_django
command: celery -A skyspy beat -l INFO
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
redis:
condition: service_healthy
β Tasks not running Bash
# Check worker is connected
celery -A skyspy inspect ping
# Verify beat is running
celery -A skyspy inspect scheduled
β Tasks stuck in queue Bash
# Check queue length
redis-cli LLEN celery
# Purge all tasks (use cautiously!)
celery -A skyspy purge
β Memory growth Bash
# Restart workers periodically
celery -A skyspy control shutdown
β Redis connection issues Bash
# Test Redis connectivity
redis-cli ping
# Check Redis memory
redis-cli info memory
SkysPy exposes a Celery health check via the celery_heartbeat cache key:
Python
from django.core.cache import cache
def celery_health_check():
return cache.get('celery_heartbeat', False)
π‘