Background Tasks

⏰ Background Tasks

πŸš€

Powering Real-Time Aviation Intelligence

SkysPy uses Celery for distributed background task processing, enabling real-time aircraft tracking, analytics computation, external data synchronization, and notification delivery without blocking the main application.


πŸ—οΈ Architecture Overview

flowchart TB
    subgraph Producers["πŸ“€ Task Producers"]
        API[🌐 Django API]
        WS[πŸ”Œ Socket.IO Handlers]
        Beat[⏱️ Celery Beat Scheduler]
    end

    subgraph Broker["πŸ”΄ Redis Broker"]
        Redis[(Redis 6.x)]
    end

    subgraph Queues["πŸ“¬ Task Queues"]
        Q1[🚨 polling<br/>High Priority]
        Q2[πŸ“¦ default<br/>General Purpose]
        Q3[πŸ—„οΈ database<br/>DB Operations]
        Q4[πŸ”” notifications<br/>Alert Delivery]
        Q5[πŸŽ™οΈ transcription<br/>Audio Processing]
        Q6[🐒 low_priority<br/>Analytics & Cleanup]
    end

    subgraph Workers["πŸ‘· Celery Workers"]
        W1[Worker 1<br/>gevent pool]
        W2[Worker 2<br/>prefork pool]
        W3[Worker N<br/>...]
    end

    Producers --> Redis
    Redis --> Queues
    Queues --> Workers

    style Q1 fill:#ff6b6b,color:#fff
    style Q2 fill:#4ecdc4,color:#fff
    style Q3 fill:#45b7d1,color:#fff
    style Q4 fill:#f7dc6f,color:#000
    style Q5 fill:#bb8fce,color:#fff
    style Q6 fill:#85c1e9,color:#fff

🎯 Quick Reference

πŸ“Š

At a Glance

MetricValue
Total Tasks45+
Queue Count6
Fastest Interval2 seconds
Slowest IntervalWeekly
πŸ“‹ Task Categories Overview
CategoryTasksPrimary QueueFrequency Range
✈️ Aircraft Tracking5polling2s - 5min
πŸ“Š Analytics12polling / low_priority30s - Daily
πŸŽ™οΈ Transcription4transcription10s - On-demand
🌍 Geographic Data8database10min - Daily
πŸ”” Notifications5notifications30s - Daily
🧹 Cleanup8low_priorityDaily - Weekly
🎯 Cannonball Mode5polling / database5s - Daily
πŸ“‘ External Sync6databaseOn-demand - Daily

πŸ“¬ Queue System

flowchart LR
    subgraph Priority["Task Priority Levels"]
        direction TB
        P1["πŸ”΄ CRITICAL<br/>polling queue<br/>2-60s intervals"]
        P2["🟑 NORMAL<br/>default queue<br/>On-demand tasks"]
        P3["🟒 LOW<br/>low_priority queue<br/>Analytics & cleanup"]
    end

    P1 --> P2 --> P3

🚨 Queue Configuration

QueuePriorityPurposeConcurrencyTasks
pollingπŸ”΄ CriticalTime-sensitive, high-frequency50+Aircraft polling, stats, cannonball
default🟑 NormalGeneral-purpose operations10-20Cache cleanup, info lookups
database🟠 MediumDatabase-intensive operations5-10External DB sync, geodata
notifications🟑 NormalNotification delivery10-20Send notifications, queue processing
transcriptionπŸ”΅ BackgroundLong-running audio processing2-4Whisper transcription
low_priority🟒 LowExpensive, non-urgent2-4Daily stats, vacuum, aggregation
⚑

Performance Tip

For high-load deployments, run dedicated workers per queue to isolate time-critical tasks from long-running operations.


✈️ Aircraft Tasks

πŸ“

Location: skyspy/tasks/aircraft.py

πŸ”„ poll_aircraft - Core Tracking Loop

Polls aircraft positions from the ADS-B receiver and broadcasts real-time updates to connected clients.

sequenceDiagram
    participant Beat as ⏱️ Celery Beat
    participant Task as πŸ”„ poll_aircraft
    participant ADS-B as πŸ“‘ ADS-B Receiver
    participant Cache as πŸ’Ύ Redis Cache
    participant SIO as πŸ”Œ Socket.IO

    Beat->>Task: Every 2 seconds
    Task->>ADS-B: Fetch positions
    ADS-B-->>Task: Aircraft data
    Task->>Cache: Update current_aircraft
    Task->>SIO: Broadcast updates
    Note over SIO: aircraft_update<br/>positions_update<br/>aircraft_new<br/>aircraft_remove
PropertyValue
⏱️ ScheduleEvery 2 seconds
πŸ“¬ Queuepolling
πŸ”„ Max Retries0
⏳ Expires2 seconds
πŸ“€ Eventsaircraft_update, positions_update, aircraft_new, aircraft_remove
πŸ“Š update_stats_cache - Statistics Refresh

Updates cached statistics for quick API retrieval.

PropertyValue
⏱️ ScheduleEvery 60 seconds
πŸ“¬ Queuepolling
πŸ’Ύ Cache Keyaircraft_stats
πŸ›‘οΈ update_safety_stats - Safety Metrics

Updates cached safety event statistics for real-time safety monitoring.

PropertyValue
⏱️ ScheduleEvery 30 seconds
πŸ“¬ Queuepolling
πŸ’Ύ Cache Keysafety_stats
πŸ—„οΈ cleanup_sessions - Session Management

Identifies and marks stale aircraft tracking sessions.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuedatabase
πŸ’Ύ store_aircraft_sightings - Batch Storage

Batch stores aircraft sightings to the database for historical tracking.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedefault

Parameters:

NameTypeDescription
aircraft_datalistList of aircraft position dictionaries

πŸ“Š Analytics Tasks

πŸ“

Location: skyspy/tasks/analytics.py

flowchart TB
    subgraph RealTime["⚑ Real-Time Analytics"]
        A1[refresh_acars_stats<br/>Every 60s]
        A2[update_antenna_analytics<br/>Every 5min]
        A3[update_safety_stats<br/>Every 30s]
    end

    subgraph NearRealTime["πŸ”„ Near Real-Time"]
        B1[refresh_flight_pattern_geographic_stats<br/>Every 2min]
        B2[refresh_tracking_quality_stats<br/>Every 2min]
        B3[refresh_engagement_stats<br/>Every 2min]
        B4[refresh_time_comparison_stats<br/>Every 5min]
    end

    subgraph Aggregation["πŸ“ˆ Aggregation"]
        C1[aggregate_hourly_antenna_analytics<br/>Hourly]
        C2[calculate_daily_stats<br/>Daily]
    end

    RealTime --> NearRealTime --> Aggregation
πŸ“‘ update_antenna_analytics - Antenna Performance

Calculates antenna performance metrics including range, RSSI, and coverage analysis.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuepolling

Returns:

{
    "max_range_by_direction": {"0": 125.3, "30": 98.7},
    "overall_max_range": 125.3,
    "avg_range": 45.2,
    "total_positions": 15234,
    "unique_aircraft": 127,
    "avg_rssi": -12.5,
    "coverage_percentage": 91.7,
    "range_percentiles": {"p50": 35.2, "p75": 52.1, "p90": 78.4, "p95": 95.2}
}
πŸ“… calculate_daily_stats - Daily Aggregates

Generates daily statistics for historical analysis and trending.

PropertyValue
⏱️ ScheduleDaily at 1:00 AM UTC
πŸ“¬ Queuelow_priority

Returns:

{
    "date": "2024-01-14",
    "total_sightings": 125000,
    "unique_aircraft": 1523,
    "total_sessions": 892,
    "military_sessions": 45,
    "avg_distance": 32.5,
    "max_distance": 156.2
}
πŸ• aggregate_hourly_antenna_analytics

Creates hourly aggregate snapshots from 5-minute snapshots for trend analysis.

PropertyValue
⏱️ Schedule5 minutes past each hour
πŸ“¬ Queuelow_priority
🧹 cleanup_antenna_analytics_snapshots

Removes old antenna analytics snapshots based on retention policy.

PropertyValue
⏱️ ScheduleDaily at 4:30 AM UTC
πŸ“¬ Queuelow_priority
πŸ—‘οΈ Default Retention7 days
πŸ“» refresh_acars_stats - ACARS Metrics

Refreshes ACARS/VDL2 message statistics cache.

PropertyValue
⏱️ ScheduleEvery 60 seconds
πŸ“¬ Queuepolling
πŸ“ˆ refresh_time_comparison_stats

Calculates week-over-week, seasonal, and time-of-day comparisons.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuelow_priority
πŸ—ΊοΈ refresh_flight_pattern_geographic_stats

Refreshes flight pattern and geographic statistics.

PropertyValue
⏱️ ScheduleEvery 2 minutes
πŸ“¬ Queuelow_priority
πŸ“Ά refresh_tracking_quality_stats

Calculates tracking quality metrics and grade distribution.

PropertyValue
⏱️ ScheduleEvery 2 minutes
πŸ“¬ Queuelow_priority
πŸ’« refresh_engagement_stats

Computes user engagement and favorite tracking statistics.

PropertyValue
⏱️ ScheduleEvery 2 minutes
πŸ“¬ Queuelow_priority
⭐ update_favorite_tracking

Updates tracking statistics for favorited aircraft.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuedatabase
🧠 cleanup_memory_cache

Cleans expired entries from in-memory caches to prevent memory growth.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuedefault

πŸŽ™οΈ Transcription Tasks

πŸ“

Location: skyspy/tasks/transcription.py

flowchart LR
    Audio[🎡 Audio File] --> Queue[πŸ“‹ Transcription Queue]
    Queue --> Process[πŸ”„ process_transcription_queue]
    Process --> Transcribe[πŸŽ™οΈ transcribe_audio]
    Transcribe --> Extract[✈️ extract_callsigns]
    Extract --> DB[(πŸ’Ύ Database)]

    style Audio fill:#bb8fce,color:#fff
    style Transcribe fill:#bb8fce,color:#fff
πŸ”„ process_transcription_queue - Queue Processor

Processes queued audio files for transcription.

PropertyValue
⏱️ ScheduleEvery 10 seconds
πŸ“¬ Queuetranscription
⏳ Expires10 seconds
πŸŽ™οΈ transcribe_audio - Whisper Transcription

Transcribes a single audio file using Whisper or ATC-Whisper.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuetranscription
πŸ”„ Max Retries3
⏳ Retry Delay60 seconds

Parameters:

NameTypeDescription
transmission_idintAudioTransmission database ID
✈️ extract_callsigns - Callsign Extraction

Extracts aircraft callsigns from transcripts using pattern matching.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuetranscription
πŸ”„ reprocess_all_transcripts - Bulk Reprocessing

Re-runs callsign extraction on all completed transcripts.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuetranscription

🌍 Geographic Data Tasks

πŸ“

Location: skyspy/tasks/geodata.py

🌐 refresh_all_geodata - Full Refresh

Refreshes all geographic data including airports, navaids, and GeoJSON boundaries.

PropertyValue
⏱️ ScheduleDaily at 3:30 AM UTC
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
⏳ Retry Delay300 seconds

Returns:

{
    "airports": 15234,
    "navaids": 8923,
    "geojson": 127
}
πŸ” check_and_refresh_geodata - Staleness Check

Checks staleness and triggers refresh if needed.

PropertyValue
⏱️ ScheduleEvery hour
πŸ“¬ Queuedatabase
πŸ›©οΈ refresh_pireps - Pilot Reports

Fetches PIREPs from Aviation Weather Center.

PropertyValue
⏱️ ScheduleEvery 10 minutes
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3

Parameters:

NameTypeDefaultDescription
bboxstr"24,-130,50,-60"Geographic bounding box
hoursint6Hours of PIREPs to fetch
🌀️ refresh_metars - Weather Reports

Fetches METARs from Aviation Weather Center.

PropertyValue
⏱️ ScheduleEvery 10 minutes
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
πŸ“‹ refresh_tafs - Terminal Forecasts

Fetches TAFs from Aviation Weather Center.

PropertyValue
⏱️ ScheduleEvery 30 minutes
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
🧹 cleanup_old_pireps

Removes expired PIREPs from the database.

PropertyValue
⏱️ ScheduleEvery hour
πŸ“¬ Queuedatabase
πŸ—‘οΈ Default Retention24 hours

πŸ”² Airspace Tasks

πŸ“

Location: skyspy/tasks/airspace.py

⚠️ refresh_airspace_advisories - Live Advisories

Fetches G-AIRMETs, SIGMETs, and advisories from Aviation Weather Center.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuedatabase
πŸ“€ Eventsairspace_advisory
πŸ—ΊοΈ refresh_airspace_boundaries - Static Boundaries

Refreshes static airspace boundaries from OpenAIP.

PropertyValue
⏱️ ScheduleDaily at 3:00 AM UTC
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3

πŸ—‚οΈ OpenAIP Tasks

πŸ“

Location: skyspy/tasks/openaip.py

🌐 refresh_openaip_data - Cache Warming

Warms the OpenAIP cache by prefetching data for major US regions.

PropertyValue
⏱️ ScheduleDaily at 5:15 AM UTC
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
πŸ“ prefetch_openaip_airspaces - Region Prefetch

Prefetches airspaces for a specific region.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedatabase

Parameters:

NameTypeDefaultDescription
latfloatRequiredCenter latitude
lonfloatRequiredCenter longitude
radius_nmfloat200Search radius in nautical miles

πŸ“œ NOTAM Tasks

πŸ“

Location: skyspy/tasks/notams.py

πŸ”„ refresh_notams - NOTAM Sync

Refreshes all NOTAMs from FAA Aviation Weather API.

PropertyValue
⏱️ ScheduleEvery 15 minutes
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
πŸ“€ Eventsnotam_refresh, stats_update
🧹 cleanup_expired_notams

Archives and deletes expired NOTAMs based on retention policy.

PropertyValue
⏱️ ScheduleDaily at 4:15 AM UTC
πŸ“¬ Queuedatabase

Parameters:

NameTypeDefaultDescription
archive_daysint7Days after expiration to archive
delete_daysint90Days after archival to hard delete
🚨 broadcast_new_tfr - TFR Alerts

Broadcasts a new TFR notification via Socket.IO.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedatabase

πŸ—„οΈ External Database Tasks

πŸ“

Location: skyspy/tasks/external_db.py

flowchart TB
    subgraph Sources["πŸ“‘ Data Sources"]
        S1[ADS-B Exchange]
        S2[tar1090]
        S3[FAA Registry]
        S4[OpenSky Network]
        S5[HexDB]
        S6[Planespotters]
    end

    subgraph Sync["πŸ”„ Sync Tasks"]
        T1[sync_external_databases<br/>Daily 4:00 AM]
        T2[update_stale_databases<br/>Every 6 hours]
        T3[fetch_aircraft_info<br/>On-demand]
        T4[fetch_aircraft_photos<br/>On-demand]
    end

    subgraph Storage["πŸ’Ύ Storage"]
        DB[(PostgreSQL)]
        Cache[(Redis Cache)]
    end

    Sources --> Sync --> Storage
πŸ”„ sync_external_databases - Full Sync

Syncs aircraft databases from external sources (ADS-B Exchange, tar1090, FAA, OpenSky).

PropertyValue
⏱️ ScheduleDaily at 4:00 AM UTC
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
πŸ• update_stale_databases - Incremental Update

Checks and updates databases older than 24 hours.

PropertyValue
⏱️ ScheduleEvery 6 hours
πŸ“¬ Queuedatabase
πŸ”„ Max Retries3
✈️ fetch_aircraft_info - Info Lookup

Fetches aircraft info from multiple sources (in-memory DB, HexDB, adsb.lol).

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedefault
πŸ“· fetch_aircraft_photos - Photo Download

Fetches and caches aircraft photos from Planespotters or HexDB.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedefault
πŸ”ƒ refresh_stale_aircraft_info

Refreshes aircraft info records older than the specified age.

PropertyValue
⏱️ ScheduleDaily at 5:00 AM UTC
πŸ“¬ Queuedatabase
πŸ“Έ batch_upgrade_aircraft_photos

Upgrades photos to higher resolution versions.

PropertyValue
⏱️ ScheduleDaily at 5:30 AM UTC
πŸ“¬ Queuedatabase
🧹 cleanup_orphan_aircraft_info

Removes AircraftInfo records for aircraft not seen recently.

PropertyValue
⏱️ ScheduleWeekly on Sundays at 6:00 AM UTC
πŸ“¬ Queuedatabase

🧹 Cleanup Tasks

πŸ“

Location: skyspy/tasks/cleanup.py

flowchart LR
    subgraph Master["🎯 Master Task"]
        M[run_all_cleanup_tasks<br/>Daily 3:00 AM]
    end

    subgraph Children["πŸ“‹ Cleanup Tasks"]
        C1[cleanup_old_sightings<br/>30 days]
        C2[cleanup_old_sessions<br/>90 days]
        C3[cleanup_old_alert_history<br/>30 days]
        C4[cleanup_old_safety_events<br/>90 days]
        C5[cleanup_old_acars_messages<br/>7 days]
    end

    subgraph Maintenance["πŸ”§ Maintenance"]
        V[vacuum_analyze_tables<br/>Weekly]
    end

    M --> Children
    Children --> Maintenance
🎯 run_all_cleanup_tasks - Master Cleanup

Master cleanup task that orchestrates all data retention cleanups.

PropertyValue
⏱️ ScheduleDaily at 3:00 AM UTC
πŸ“¬ Queuelow_priority
✈️ cleanup_old_sightings
PropertyValue
πŸ—‘οΈ RetentionSIGHTING_RETENTION_DAYS (default: 30)
πŸ“‹ cleanup_old_sessions
PropertyValue
πŸ—‘οΈ RetentionSESSION_RETENTION_DAYS (default: 90)
πŸ”” cleanup_old_alert_history
PropertyValue
πŸ—‘οΈ RetentionALERT_HISTORY_DAYS (default: 30)
πŸ›‘οΈ cleanup_old_safety_events
PropertyValue
πŸ—‘οΈ RetentionSAFETY_EVENT_RETENTION_DAYS (default: 90)
πŸ“» cleanup_old_acars_messages
PropertyValue
πŸ—‘οΈ Retention7 days (fixed)
πŸ”§ vacuum_analyze_tables - PostgreSQL Maintenance

Runs PostgreSQL VACUUM ANALYZE on frequently updated tables.

PropertyValue
⏱️ ScheduleWeekly on Sundays at 4:00 AM UTC
πŸ“¬ Queuelow_priority

πŸ”” Notification Tasks

πŸ“

Location: skyspy/tasks/notifications.py

sequenceDiagram
    participant Alert as 🚨 Alert Trigger
    participant Queue as πŸ“‹ Notification Queue
    participant Task as πŸ”” send_notification_task
    participant Apprise as πŸ“€ Apprise
    participant Channels as πŸ“± Notification Channels

    Alert->>Queue: Queue notification
    Queue->>Task: Process
    Task->>Apprise: Send via Apprise
    Apprise->>Channels: Deliver

    alt Success
        Channels-->>Task: βœ… Delivered
    else Failure
        Channels-->>Task: ❌ Failed
        Task->>Queue: Retry with backoff
    end
πŸ“€ send_notification_task - Notification Delivery

Sends a notification via Apprise with automatic retry and exponential backoff.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuenotifications
πŸ”„ Max Retries5
⏳ Retry BackoffExponential (max 1 hour)

Parameters:

NameTypeDefaultDescription
channel_urlstrRequiredApprise-compatible URL
titlestrRequiredNotification title
bodystrRequiredNotification body
prioritystr'info'Priority level
event_typestr'alert'Event type
πŸ”„ process_notification_queue

Processes notifications pending retry.

PropertyValue
⏱️ ScheduleEvery 30 seconds
πŸ“¬ Queuenotifications
🧹 cleanup_old_notification_logs

Removes old notification logs.

PropertyValue
⏱️ ScheduleDaily at 3:15 AM UTC
πŸ“¬ Queuenotifications
πŸ—‘οΈ Default Retention30 days
⏰ cleanup_notification_cooldowns

Cleans up notification cooldown entries to prevent memory growth.

PropertyValue
⏱️ ScheduleEvery 30 minutes
πŸ“¬ Queuenotifications
πŸ§ͺ test_notification_channel

Sends a test notification to verify channel configuration.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuenotifications

🎯 Cannonball Mode Tasks

πŸ“

Location: skyspy/tasks/cannonball.py

🚨 Real-time law enforcement aircraft detection and pattern analysis.

flowchart TB
    subgraph Detection["🎯 Detection Loop"]
        A[analyze_aircraft_patterns<br/>Every 5 seconds]
    end

    subgraph Processing["βš™οΈ Processing"]
        B[Pattern Analysis]
        C[Threat Scoring]
        D[Alert Generation]
    end

    subgraph Storage["πŸ’Ύ Storage"]
        E[(CannonballSession)]
        F[(CannonballPattern)]
        G[(CannonballAlert)]
    end

    subgraph Broadcast["πŸ“‘ Broadcast"]
        H[threats_update]
        I[new_alert]
    end

    A --> B --> C --> D
    D --> E & F & G
    D --> H & I
πŸ” analyze_aircraft_patterns - Pattern Detection

Analyzes current aircraft for law enforcement patterns (orbiting, surveillance, etc.).

PropertyValue
⏱️ ScheduleEvery 5 seconds
πŸ“¬ Queuepolling
⏳ Expires5 seconds
πŸ”„ Max Retries0
πŸ“€ Eventsthreats_update, new_alert
🧹 cleanup_cannonball_sessions

Marks stale Cannonball sessions as inactive.

PropertyValue
⏱️ ScheduleEvery 5 minutes
πŸ“¬ Queuedatabase
πŸ—‘οΈ cleanup_old_patterns

Deletes patterns older than retention period.

PropertyValue
⏱️ ScheduleDaily at 3:45 AM UTC
πŸ“¬ Queuelow_priority
πŸ—‘οΈ RetentionCANNONBALL_PATTERN_RETENTION_DAYS (default: 30)
πŸ“Š aggregate_cannonball_stats

Creates hourly statistics aggregates for trend analysis.

PropertyValue
⏱️ Schedule10 minutes past each hour
πŸ“¬ Queuelow_priority
πŸ“ update_user_location

Updates user location for threat calculations.

PropertyValue
⏱️ ScheduleOn-demand
πŸ“¬ Queuedefault

πŸ“… Complete Schedule Reference

⏱️

Celery Beat Schedule

All times are in UTC. The scheduler is configured in skyspy/celery.py.

⚑ High-Frequency Tasks (< 1 minute)

TaskFrequencyQueueNotes
poll_aircraftπŸ”΄ 2spollingExpires after 2s
analyze_aircraft_patternsπŸ”΄ 5spollingCannonball mode
process_transcription_queue🟠 10stranscriptionExpires after 10s
update_safety_stats🟑 30spolling
process_notification_queue🟑 30snotifications
update_stats_cache🟒 60spolling
refresh_acars_stats🟒 60spolling

πŸ”„ Medium-Frequency Tasks (1-10 minutes)

TaskFrequencyQueueNotes
refresh_flight_pattern_geographic_stats2mlow_priority
refresh_tracking_quality_stats2mlow_priority
refresh_engagement_stats2mlow_priority
cleanup_cannonball_sessions5mdatabase
cleanup_memory_cache5mdefault
cleanup_sessions5mdatabase
refresh_airspace_advisories5mdatabase
update_antenna_analytics5mpolling
update_favorite_tracking5mdatabase
refresh_time_comparison_stats5mlow_priority
refresh_metars10mdatabase
refresh_pireps10mdatabase
refresh_notams15mdatabase
cleanup_notification_cooldowns30mnotifications
refresh_tafs30mdatabase

πŸ• Low-Frequency Tasks (Hourly+)

TaskFrequencyQueueNotes
aggregate_hourly_antenna_analytics:05 past hourlow_priority
aggregate_cannonball_stats:10 past hourlow_priority
cleanup_pireps_hourlyHourlydatabase
check_geodata_freshness_hourlyHourlydatabase
update_stale_databases6 hoursdatabase

πŸ“† Daily Tasks

TaskTime (UTC)QueueNotes
calculate_daily_stats1:00 AMlow_priority
run_all_cleanup_tasks3:00 AMlow_priorityMaster cleanup
refresh_airspace_boundaries3:00 AMdatabase
cleanup_notification_logs3:15 AMnotifications
refresh_geodata_daily3:30 AMdatabase
cleanup_cannonball_patterns3:45 AMlow_priority
sync_external_databases4:00 AMdatabase
cleanup_expired_notams4:15 AMdatabase
cleanup_antenna_analytics4:30 AMlow_priority
refresh_stale_aircraft_info5:00 AMdatabase
refresh_openaip_data5:15 AMdatabase
batch_upgrade_aircraft_photos5:30 AMdatabase

πŸ“… Weekly Tasks

TaskSchedule (UTC)QueueNotes
vacuum_analyze_tablesSunday 4:00 AMlow_priorityPostgreSQL maintenance
cleanup_orphan_aircraft_infoSunday 6:00 AMdatabase

πŸ”„ Retry Policy

flowchart LR
    subgraph Attempt["🎯 Task Execution"]
        A[Execute Task]
    end

    subgraph Decision{"Success?"}
        B{Check Result}
    end

    subgraph Success["βœ… Success"]
        C[Complete]
    end

    subgraph Retry["πŸ”„ Retry Logic"]
        D[Calculate Backoff]
        E[Add Jitter]
        F[Schedule Retry]
    end

    subgraph Fail["❌ Max Retries"]
        G[Mark Failed]
        H[Log to Sentry]
    end

    A --> B
    B -->|Yes| C
    B -->|No| D --> E --> F --> A
    F -->|Max Reached| G --> H

βš™οΈ Retry Configuration

ParameterDescriptionDefault
max_retriesMaximum retry attempts3
retry_backoffEnable exponential backoffFalse
retry_backoff_maxMaximum backoff delay (seconds)600
retry_jitterAdd randomness to delaysTrue
autoretry_forException types to auto-retry()
πŸ’‘

Best Practice

Use expires for time-sensitive tasks to prevent queue buildup when the system is under load.

@shared_task(
    bind=True,
    max_retries=3,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_backoff_max=3600,
    retry_jitter=True,
)
def example_task(self):
    try:
        # Task logic
        pass
    except Exception as e:
        raise self.retry(exc=e, countdown=60)

πŸ“Š Dead Letter Handling

Failed notifications are tracked in NotificationLog with status:

StatusDescription
pending⏳ Awaiting delivery
retryingπŸ”„ Scheduled for retry
sentβœ… Successfully delivered
failed❌ All retries exhausted

πŸ“Š Monitoring Dashboard

🌸 Flower Dashboard

Deploy Flower for real-time task monitoring:

celery -A skyspy flower --port=5555
🌐

Access at http://localhost:5555

πŸ–₯️ Command-Line Monitoring

# Watch active tasks
celery -A skyspy inspect active

# View scheduled tasks
celery -A skyspy inspect scheduled

# Check worker stats
celery -A skyspy inspect stats

# View registered tasks
celery -A skyspy inspect registered

πŸ“ Logging Configuration

LOGGING = {
    'handlers': {
        'celery': {
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': '/var/log/skyspy/celery.log',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery'],
            'level': 'INFO',
            'propagate': False,
        },
        'skyspy.tasks': {
            'handlers': ['celery'],
            'level': 'DEBUG',
            'propagate': False,
        },
    },
}

πŸ” Sentry Integration

SkysPy automatically reports task errors to Sentry when configured:

from skyspy.utils.sentry import capture_task_error

try:
    # Task logic
    pass
except Exception as e:
    capture_task_error(e, 'task_name', extra={'key': 'value'})
    raise

⚑ Performance Tuning

🎯

Optimization Guide

🏭 Worker Configuration

πŸš€ Production Configuration (Gevent)
celery -A skyspy worker \
    --pool=gevent \
    --concurrency=100 \
    --loglevel=INFO \
    -Q polling,default,database,notifications,transcription,low_priority
πŸ”§ Development Configuration (Prefork)
celery -A skyspy worker \
    --pool=prefork \
    --concurrency=4 \
    --loglevel=DEBUG \
    -Q polling,default,database,notifications,transcription,low_priority

πŸŽ›οΈ Queue-Specific Workers

⚑

High-Load Tip

For high-load deployments, run dedicated workers per queue to isolate time-critical tasks.

# High-priority polling worker
celery -A skyspy worker -Q polling -c 50 --pool=gevent -n polling@%h

# Database operations worker
celery -A skyspy worker -Q database -c 10 --pool=prefork -n database@%h

# Low-priority worker
celery -A skyspy worker -Q low_priority -c 4 --pool=prefork -n lowprio@%h

🧠 Memory Optimization

# Disable prefetching for time-sensitive tasks
app.conf.worker_prefetch_multiplier = 1

# Acknowledge after completion (prevents lost tasks)
app.conf.task_acks_late = True

# Re-queue tasks if worker dies
app.conf.task_reject_on_worker_lost = True

πŸ“ Raspberry Pi Optimization

πŸ“¦

Resource-Constrained Devices

SkysPy includes RPi-specific optimizations in settings_rpi.py.

SettingStandardRPiReduction
stats_cache interval60s90s1.5x slower
safety_stats interval30s60s2x slower
acars_stats interval60s120s2x slower
MAX_SEEN_AIRCRAFT10000100010x smaller
SIGHTING_RETENTION_DAYS3074x shorter
SESSION_RETENTION_DAYS90146x shorter

πŸ”΄ Redis Configuration

# redis.conf - Production settings
maxmemory 512mb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec

πŸ”§ Configuration Reference

πŸ“‹ Django Settings

SettingDefaultDescription
CELERY_BROKER_URLredis://localhost:6379/0Redis broker connection URL
CELERY_RESULT_BACKENDredis://localhost:6379/0Task result storage backend
CELERY_ACCEPT_CONTENT['json']Accepted serialization formats
CELERY_TASK_SERIALIZERjsonTask serialization format
CELERY_RESULT_SERIALIZERjsonResult serialization format
CELERY_TIMEZONEUTCScheduler timezone
CELERY_TASK_TRACK_STARTEDTrueTrack task start times
CELERY_TASK_TIME_LIMIT180030-minute task timeout
CELERY_BEAT_SCHEDULERDatabaseSchedulerDatabase-backed scheduler

πŸ—“οΈ Retention Configuration

VariableDefaultRPi DefaultDescription
SIGHTING_RETENTION_DAYS307Days to retain sightings
SESSION_RETENTION_DAYS9014Days to retain sessions
ALERT_HISTORY_DAYS307Days to retain alert history
SAFETY_EVENT_RETENTION_DAYS9014Days to retain safety events
ANTENNA_SNAPSHOT_RETENTION_DAYS73Days to retain 5-min snapshots

πŸŽ›οΈ Feature Flags

VariableDefaultDescription
TRANSCRIPTION_ENABLEDFalseEnable audio transcription
WHISPER_ENABLEDFalseEnable Whisper transcription
ATC_WHISPER_ENABLEDFalseEnable ATC-Whisper
PHOTO_AUTO_DOWNLOADFalseAuto-download aircraft photos
OPENSKY_DB_ENABLEDTrueEnable OpenSky database

🐳 Docker Deployment

πŸ“¦ Docker Compose Configuration

services:
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  celery-worker:
    build: ./skyspy_django
    command: celery -A skyspy worker -l INFO -Q polling,default,database,notifications,transcription,low_priority
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      redis:
        condition: service_healthy

  celery-beat:
    build: ./skyspy_django
    command: celery -A skyspy beat -l INFO
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      redis:
        condition: service_healthy

πŸ”₯ Troubleshooting

❓ Tasks not running
# Check worker is connected
celery -A skyspy inspect ping

# Verify beat is running
celery -A skyspy inspect scheduled
❓ Tasks stuck in queue
# Check queue length
redis-cli LLEN celery

# Purge all tasks (use cautiously!)
celery -A skyspy purge
❓ Memory growth
# Restart workers periodically
celery -A skyspy control shutdown
❓ Redis connection issues
# Test Redis connectivity
redis-cli ping

# Check Redis memory
redis-cli info memory

πŸ₯ Health Checks

SkysPy exposes a Celery health check via the celery_heartbeat cache key:

from django.core.cache import cache

def celery_health_check():
    return cache.get('celery_heartbeat', False)
πŸ’‘

This is updated every 60 seconds by update_stats_cache.


πŸ“š Additional Resources