Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions materializationengine/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def create_celery(app=None):
for logger_name in celery_loggers:
logging.getLogger(logger_name).setLevel(log_level)

# Debug: Check if BEAT_SCHEDULES is in app.config
beat_schedules = app.config.get("BEAT_SCHEDULES", [])
celery_logger.debug(f"BEAT_SCHEDULES from app.config: {beat_schedules}")
celery_logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}")

celery.conf.update(
{
"task_routes": ("materializationengine.task_router.TaskRouter"),
Expand All @@ -70,11 +75,20 @@ def create_celery(app=None):
"socket_timeout": 20,
"socket_connect_timeout": 20,
}, # timeout (s) for tasks to be sent back to broker queue
"beat_schedules": app.config["BEAT_SCHEDULES"],
"beat_schedules": beat_schedules,
}
)

celery.conf.update(app.config)
# Ensure beat_schedules is set correctly after update (in case app.config overwrote it)
# Use BEAT_SCHEDULES from app.config if beat_schedules is empty or missing
if not celery.conf.get("beat_schedules") and app.config.get("BEAT_SCHEDULES"):
celery.conf.beat_schedules = app.config["BEAT_SCHEDULES"]
celery_logger.debug(f"Restored beat_schedules from BEAT_SCHEDULES: {len(app.config['BEAT_SCHEDULES'])} schedules")

# Debug: Verify beat_schedules is in celery.conf after update
celery_logger.debug(f"beat_schedules in celery.conf after update: {celery.conf.get('beat_schedules', 'NOT FOUND')}")
celery_logger.debug(f"BEAT_SCHEDULES in celery.conf after update: {celery.conf.get('BEAT_SCHEDULES', 'NOT FOUND')}")
TaskBase = celery.Task

class ContextTask(TaskBase):
Expand All @@ -87,6 +101,7 @@ def __call__(self, *args, **kwargs):
celery.Task = ContextTask
if os.environ.get("SLACK_WEBHOOK"):
celery.Task.on_failure = post_to_slack_on_task_failure

return celery


Expand Down Expand Up @@ -129,8 +144,18 @@ def setup_periodic_tasks(sender, **kwargs):
name="Clean up back end results",
)

beat_schedules = celery.conf.get("beat_schedules", [])
celery_logger.info(beat_schedules)
# Try to get beat_schedules from celery.conf, fallback to BEAT_SCHEDULES if not found
beat_schedules = celery.conf.get("beat_schedules")
if not beat_schedules:
# Fallback: try to get from BEAT_SCHEDULES (uppercase) in celery.conf
beat_schedules = celery.conf.get("BEAT_SCHEDULES", [])
if beat_schedules:
celery_logger.debug(f"Found BEAT_SCHEDULES (uppercase), converting to beat_schedules")
celery.conf.beat_schedules = beat_schedules

celery_logger.debug(f"beat_schedules from celery.conf: {beat_schedules}")
celery_logger.debug(f"beat_schedules type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}")

if not beat_schedules:
celery_logger.info("No periodic tasks configured.")
return
Expand Down
10 changes: 9 additions & 1 deletion materializationengine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,22 @@ def configure_app(app: Flask) -> Flask:
if "MATERIALIZATION_ENGINE_SETTINGS" in os.environ.keys():
app.config.from_envvar("MATERIALIZATION_ENGINE_SETTINGS")
# instance-folders configuration
# Store BEAT_SCHEDULES before loading config file to see if it gets overwritten
beat_schedules_before = app.config.get("BEAT_SCHEDULES", "NOT_SET")
app.config.from_pyfile("config.cfg", silent=True)

beat_schedules_after = app.config.get("BEAT_SCHEDULES", "NOT_SET")

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(app.config["LOGGING_LEVEL"])
app.logger.removeHandler(default_handler)
app.logger.addHandler(handler)
app.logger.setLevel(app.config["LOGGING_LEVEL"])
app.logger.propagate = False

# Log BEAT_SCHEDULES loading status (debug level)
app.logger.debug(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}")
app.logger.debug(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}")
app.logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}")
app.logger.debug(app.config)
app.app_context().push()
return app