Files
timelaps/camlaps/services/timelapse_worker.py
ack 4a10958445
All checks were successful
Deploy timelaps / deploy (push) Successful in 5s
добавил обработчик задач
2026-04-19 19:41:33 +03:00

213 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import shutil
import subprocess
from datetime import date, datetime, time, timedelta
from pathlib import Path
from django.conf import settings
from django.utils import timezone
from ..models import TimelapseJob
logger = logging.getLogger('camlaps')
def _iter_dates(date_from: date, date_to: date):
current = date_from
while current <= date_to:
yield current
current += timedelta(days=1)
def _safe_camera_root(job: TimelapseJob) -> Path:
storage_root = Path(settings.STORAGE_PATH).resolve()
camera_root = (storage_root / job.camera.storage_path).resolve()
if camera_root != storage_root and storage_root not in camera_root.parents:
raise RuntimeError('Некорректный путь камеры.')
return camera_root
def _is_day_time(snapshot_time: time, day_start: time, day_end: time) -> bool:
return day_start <= snapshot_time <= day_end
def _select_frames(job: TimelapseJob, camera_root: Path) -> list[Path]:
logger.info('worker:select_frames:start job_id=%s', job.id)
selected: list[Path] = []
interval_seconds = int(job.sampling_interval_minutes) * 60
last_selected_dt: datetime | None = None
for day in _iter_dates(job.date_from, job.date_to):
day_dir = camera_root / day.isoformat()
if not day_dir.exists() or not day_dir.is_dir():
continue
day_files = sorted(day_dir.glob('*.jpg'))
for img_path in day_files:
if img_path.name.lower() == 'lastsnap.jpg':
continue
try:
snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time()
except ValueError:
continue
if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time):
continue
current_dt = datetime.combine(day, snap_time)
if last_selected_dt is None or (current_dt - last_selected_dt).total_seconds() >= interval_seconds:
selected.append(img_path)
last_selected_dt = current_dt
logger.info('worker:select_frames:done job_id=%s selected=%s', job.id, len(selected))
return selected
def _copy_frames_to_temp(job: TimelapseJob, frame_paths: list[Path], temp_dir: Path):
logger.info('worker:copy_frames:start job_id=%s count=%s', job.id, len(frame_paths))
temp_dir.mkdir(parents=True, exist_ok=True)
total = len(frame_paths)
for index, src in enumerate(frame_paths, start=1):
dst = temp_dir / f'{index:06d}.jpg'
shutil.copy2(src, dst)
progress = min(70, int((index / total) * 70))
TimelapseJob.objects.filter(pk=job.pk).update(
frames_processed=index,
progress_percent=progress,
)
logger.info('worker:copy_frames:done job_id=%s', job.id)
def _build_output_path(job: TimelapseJob) -> tuple[Path, str]:
export_dir = Path(settings.TIMELAPS_EXPORT_DIR)
export_dir.mkdir(parents=True, exist_ok=True)
filename = (
f'{job.camera.slug}_{job.date_from.strftime("%Y%m%d")}_'
f'{job.date_to.strftime("%Y%m%d")}_{job.id}.mp4'
)
output_path = export_dir / filename
rel_path = f'timelapses/{filename}'
return output_path, rel_path
def _run_ffmpeg(job: TimelapseJob, temp_dir: Path, output_path: Path):
logger.info('worker:ffmpeg:start job_id=%s', job.id)
cmd = [
'ffmpeg',
'-y',
'-hide_banner',
'-loglevel',
'error',
'-framerate',
str(job.fps),
'-i',
str(temp_dir / '%06d.jpg'),
'-c:v',
'libx264',
'-preset',
'veryfast',
'-pix_fmt',
'yuv420p',
str(output_path),
]
subprocess.run(cmd, check=True)
logger.info('worker:ffmpeg:done job_id=%s', job.id)
def claim_next_job() -> TimelapseJob | None:
logger.info('worker:claim_next:start')
candidate = TimelapseJob.objects.filter(status=TimelapseJob.Status.PLANNED).order_by('created_at').first()
if not candidate:
logger.info('worker:claim_next:done no_jobs=true')
return None
updated = TimelapseJob.objects.filter(
pk=candidate.pk,
status=TimelapseJob.Status.PLANNED,
).update(
status=TimelapseJob.Status.RUNNING,
started_at=timezone.now(),
progress_percent=1,
frames_processed=0,
frames_total=None,
error_message='',
)
if not updated:
logger.info('worker:claim_next:done race_lost=true')
return None
job = TimelapseJob.objects.select_related('camera').get(pk=candidate.pk)
logger.info('worker:claim_next:done job_id=%s', job.id)
return job
def process_job(job: TimelapseJob) -> bool:
logger.info('worker:process_job:start job_id=%s', job.id)
temp_dir = Path(settings.TIMELAPS_EXPORT_DIR) / '_tmp' / f'job_{job.id}'
try:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
camera_root = _safe_camera_root(job)
frame_paths = _select_frames(job, camera_root)
if not frame_paths:
raise RuntimeError('Не найдено кадров под выбранные параметры.')
TimelapseJob.objects.filter(pk=job.pk).update(
frames_total=len(frame_paths),
frames_processed=0,
progress_percent=5,
)
_copy_frames_to_temp(job, frame_paths, temp_dir)
output_path, rel_path = _build_output_path(job)
TimelapseJob.objects.filter(pk=job.pk).update(progress_percent=90)
_run_ffmpeg(job, temp_dir, output_path)
TimelapseJob.objects.filter(pk=job.pk).update(
status=TimelapseJob.Status.SUCCESS,
progress_percent=100,
output_rel_path=rel_path,
finished_at=timezone.now(),
error_message='',
frames_processed=len(frame_paths),
)
logger.info('worker:process_job:done job_id=%s', job.id)
return True
except Exception as exc:
logger.exception('worker:process_job:error job_id=%s', job.id)
TimelapseJob.objects.filter(pk=job.pk).update(
status=TimelapseJob.Status.ERROR,
finished_at=timezone.now(),
error_message=str(exc)[:1000],
)
return False
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
def run_one_job() -> bool:
logger.info('worker:run_one:start')
job = claim_next_job()
if not job:
logger.info('worker:run_one:done processed=false')
return False
process_job(job)
logger.info('worker:run_one:done processed=true')
return True