Celery Patch
Celery Patch Module: Tenant-Aware Task Execution
This module patches Celery to automatically capture and restore tenant context when executing background tasks asynchronously.
Problem
Celery tasks execute in background worker processes without request context: - Worker processes are separate from Django request handlers - TenantContext not automatically available in workers - Tasks execute in wrong/empty tenant context - Data isolation violations in multi-tenant applications - Tasks may operate on wrong tenant's data
Solution
This patch extends Celery's Task base class to: 1. Capture current tenant when task is queued 2. Store tenant_id in task headers 3. Restore tenant context when task executes 4. Execute task in correct tenant context 5. Clean up context after execution
Tenant Context Flow:
Request Handler (Tenant A context set):
```
@login_required
def trigger_job(request):
# In TenantContext for tenant_id='acme'
task.apply_async(args=(user_id,), tenant_id='acme')
# Task queued with headers={'tenant_id': 'acme'}
```
Celery Broker (Task stored):
```
{
'task': 'myapp.tasks.process_data',
'args': (user_id,),
'headers': {'tenant_id': 'acme'},
'kwargs': {}
}
```
Worker Process (Tenant A context restored):
```
def process_data(user_id):
# TenantContext.use_tenant(acme) called automatically
# Queries scoped to tenant_id='acme'
user = User.objects.get(id=user_id) # From tenant_id='acme'
# Task completes in correct context
```
Tenant ID Passing Methods:
Method 1: As Keyword Argument
```python
task.apply_async(
args=(arg1, arg2),
kwargs={'user_id': 123, 'tenant_id': 'acme'}
)
# tenant_id removed from kwargs, stored in headers
```
Method 2: As Option
```python
task.apply_async(
args=(arg1, arg2),
tenant_id='acme' # Passed as option
)
# tenant_id stored in headers
```
Header-Based Storage
Using task headers ensures persistence: - Headers survive task routing through broker - Headers survive task retries - Headers available in worker process - Tenant_id not visible in args/kwargs - Secure: hidden from task observers
Header Storage:
options.setdefault("headers", {})["tenant_id"] = tenant_id
# Creates headers dict if missing
# Stores tenant_id in headers
# Persists through entire task lifecycle
Task Execution Flow:
1. Developer queues task with tenant_id
2. TenantAwareTask.apply_async() called
3. Extracts tenant_id from kwargs or options
4. Stores in task headers
5. Task sent to broker
6. Worker receives task
7. TenantAwareTask.__call__() called
8. Extracts tenant_id from headers
9. Fetches Tenant object
10. Sets TenantContext.use_tenant(tenant)
11. Calls parent __call__() to execute task
12. Context auto-cleaned on exit
Result: Task executes in correct tenant context
Error Handling:
No Tenant ID:
- Task executes in default context (no tenant set)
- May cause issues if task expects tenant
- Should always pass tenant_id
Tenant Not Found:
- Tenant.objects.get(tenant_id=tenant_id) raises DoesNotExist
- Task fails
- Can be retried
- Should handle gracefully
Best practice: Always pass tenant_id, handle errors
Performance
- Header storage: Minimal overhead
- Tenant lookup: Single database query
- Context switching: Negligible overhead
- Works at worker scale
Compatibility
- Works with all Celery task types
- Compatible with task routing
- Compatible with task retries
- Compatible with task chains/groups
- Transparent to existing task code
Configuration
Enable via Django settings:
OMNITENANT_CONFIG = {
'PATCHES': {
'celery': True, # Enable Celery patch
}
}
Or set Celery Task class directly:
from django_omnitenant.patches.celery import TenantAwareTask
app.Task = TenantAwareTask
Usage
The patch automatically applies to all tasks:
from celery import shared_task
@shared_task
def my_task(user_id):
# Automatically executes in correct tenant context
user = User.objects.get(id=user_id)
process_user(user)
# Queue task in request handler
from django_omnitenant.tenant_context import TenantContext
def request_handler(request):
tenant = TenantContext.get_tenant()
my_task.apply_async(
args=(request.user.id,),
tenant_id=tenant.tenant_id
)
TenantAwareTask
Bases: Task
Celery Task subclass that automatically manages tenant context.
This task class ensures that background tasks execute with the correct tenant context, enabling proper multi-tenant data isolation.
Design
- Captures tenant_id when task is queued (apply_async)
- Stores tenant_id in task headers for persistence
- Restores tenant context when task executes (call)
- Cleans up context automatically after execution
- Transparent to developer - no changes needed to task code
Features
- Automatic tenant capture on queueing
- Automatic tenant restoration on execution
- Flexible tenant_id passing (kwarg or option)
- Header-based storage (survives retries)
- Works with all Celery features
- Zero configuration needed
Usage
All tasks automatically get this behavior:
@shared_task
def my_task(user_id):
# Executes in correct tenant context
user = User.objects.get(id=user_id)
# Queue with tenant_id
my_task.apply_async(
args=(user_id,),
tenant_id=tenant.tenant_id
)
Attributes:
| Name | Type | Description |
|---|---|---|
abstract |
bool
|
True - this is base class for all tasks |
Source code in django_omnitenant/patches/celery.py
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 | |
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
Queue task with automatic tenant_id capture and header storage.
This method intercepts task queueing to capture the current tenant and store it in task headers for later restoration in the worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
tuple
|
Positional arguments for task |
None
|
|
dict
|
Keyword arguments for task May contain 'tenant_id' which is extracted |
None
|
|
str
|
Optional explicit task ID |
None
|
|
Celery producer for task routing |
None
|
|
|
Callback tasks on success |
None
|
|
|
Callback tasks on error |
None
|
|
|
str
|
Shadow task name for monitoring |
None
|
|
Additional Celery options May contain 'tenant_id' which is extracted |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
AsyncResult |
Celery AsyncResult for task tracking |
Process
- Extract tenant_id from kwargs or options
- Remove tenant_id from visible task arguments
- Store tenant_id in task headers
- Call parent apply_async with modified options
- Task queued with tenant context preserved
Tenant ID Extraction
Supports two passing methods:
Method 1 - Keyword Argument:
task.apply_async(
args=(user_id,),
kwargs={'tenant_id': 'acme'}
)
# Extracted from kwargs, removed
Method 2 - Option:
task.apply_async(
args=(user_id,),
tenant_id='acme'
)
# Extracted from options, removed
Header Storage
Tenant_id stored in task headers:
options.setdefault("headers", {})["tenant_id"] = tenant_id
# Creates headers if missing
# Stores tenant_id in headers
# Persists through broker and retries
Why headers: - Survives task routing - Survives task retries - Not visible in args/kwargs - Accessible in worker via self.request.headers - Secure transmission
Side Effects
- tenant_id removed from kwargs if present
- tenant_id removed from options if present
- headers dict created in options if missing
- Modified options passed to parent
Examples:
Basic queueing:
```python
task.apply_async(
args=(user_id,),
tenant_id='acme'
)
# Task queued, tenant stored in headers
```
With multiple arguments:
```python
task.apply_async(
args=(user_id, data),
kwargs={'tenant_id': 'acme', 'priority': 'high'}
)
# tenant_id extracted and stored
# priority remains in kwargs
# Both available to task
```
With explicit task ID:
```python
task.apply_async(
args=(user_id,),
task_id='custom-id-123',
tenant_id='acme'
)
# task_id used for tracking
# tenant_id stored in headers
```
Source code in django_omnitenant/patches/celery.py
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | |
__call__(*args, **kwargs)
Execute task with automatic tenant context restoration.
This method intercepts task execution in the worker to restore the tenant context that was captured during queueing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Positional arguments passed to task |
()
|
|
|
Keyword arguments passed to task |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
object |
Return value of actual task execution |
Process
- Extract tenant_id from task headers
- If tenant_id exists: a. Fetch Tenant object from database b. Enter TenantContext.use_tenant(tenant) c. Call parent call (task execution) d. Context auto-cleaned on exit
- If no tenant_id, execute task normally
Tenant Context Restoration
Worker receives task with stored tenant_id:
# self.request.headers = {'tenant_id': 'acme'}
tenant_id = headers.get("tenant_id") # Gets 'acme'
# Fetch tenant object
tenant = Tenant.objects.get(tenant_id='acme')
# Enter tenant context
with TenantContext.use_tenant(tenant):
# Task executes in tenant context
# All database queries scoped to tenant
# TenantContext.get_tenant() returns tenant
result = super().__call__(*args, **kwargs)
# Context automatically exited, cleaned up
Task Execution
Task code now has tenant context:
def my_task(user_id):
# TenantContext is set to correct tenant
# Database queries scoped to tenant
user = User.objects.get(id=user_id)
# Gets user from tenant database/schema
return process_user(user)
Error Handling:
Tenant Not Found:
```python
tenant = Tenant.objects.get(tenant_id=tenant_id) # May raise DoesNotExist
# Task fails if tenant doesn't exist
# Can be retried if tenant is created later
```
No Tenant ID:
```python
# If headers missing or tenant_id not in headers
# Task executes without tenant context
# May cause issues if task expects tenant
```
Examples:
Task with tenant context:
```python
def process_user(user_id):
# TenantContext set to correct tenant
user = User.objects.get(id=user_id)
# Gets from tenant database
return user.email
# Worker calls:
# __call__(user_id=123)
# - Extracts tenant_id='acme' from headers
# - Fetches Tenant(tenant_id='acme')
# - Calls: with TenantContext.use_tenant(tenant): super().__call__(user_id=123)
# - Task executes in tenant context
# - Returns user.email
```
Task without tenant (fallback):
```python
def background_job():
# No tenant context set
# Executes in default context
# May cause issues
pass
# Worker calls:
# __call__()
# - No headers or tenant_id not in headers
# - tenant_id remains None
# - Calls: super().__call__()
# - Executes normally without context
```
Source code in django_omnitenant/patches/celery.py
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 | |