Files
timelaps/camlaps/services/timelapse_worker.py
ack ae15bee2d1
All checks were successful
Deploy timelaps / deploy (push) Successful in 5s
добавил удаление таймлапсов
2026-04-19 23:21:07 +03:00

328 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Фоновый воркер сборки таймлапса: выбор кадров из storage и сборка видео через ffmpeg."""
import logging
import shutil
import subprocess
from datetime import date, datetime, time, timedelta
from pathlib import Path
from django.conf import settings
from django.utils import timezone
from ..models import TimelapseJob
logger = logging.getLogger('camlaps')
def _iter_dates(date_from: date, date_to: date):
"""Итерируется по датам включительно в указанном диапазоне."""
current = date_from
while current <= date_to:
yield current
current += timedelta(days=1)
def _safe_camera_root(job: TimelapseJob) -> Path:
"""Возвращает безопасный путь к каталогу камеры внутри STORAGE_PATH."""
storage_root = Path(settings.STORAGE_PATH).resolve()
camera_root = (storage_root / job.camera.storage_path).resolve()
if camera_root != storage_root and storage_root not in camera_root.parents:
raise RuntimeError('Некорректный путь камеры.')
return camera_root
def _is_day_time(snapshot_time: time, day_start: time, day_end: time) -> bool:
return day_start <= snapshot_time <= day_end
def _time_to_minutes(t: time) -> int:
return t.hour * 60 + t.minute
def _pick_nearest_frame_for_day(job: TimelapseJob, day_files: list[Path]) -> Path | None:
"""Выбирает кадр дня, ближайший к anchor_time с учетом фильтра дня/ночи."""
candidates: list[tuple[int, Path]] = []
anchor_minutes = _time_to_minutes(job.anchor_time)
for img_path in day_files:
if img_path.name.lower() == 'lastsnap.jpg':
continue
try:
snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time()
except ValueError:
continue
if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time):
continue
diff = abs(_time_to_minutes(snap_time) - anchor_minutes)
candidates.append((diff, img_path))
if not candidates:
return None
candidates.sort(key=lambda x: (x[0], x[1].name))
return candidates[0][1]
def _select_frames(job: TimelapseJob, camera_root: Path) -> tuple[list[Path], int, int]:
"""Подбирает кадры и возвращает (кадры, всего_дней, дней_садрами)."""
logger.info('worker:select_frames:start job_id=%s', job.id)
selected: list[Path] = []
days_total = 0
days_with_frames = 0
# Для 1 кадра/сутки берем ближайший кадр к якорному времени в каждом дне.
if int(job.sampling_interval_minutes) == 1440:
for day in _iter_dates(job.date_from, job.date_to):
days_total += 1
day_dir = camera_root / day.isoformat()
if not day_dir.exists() or not day_dir.is_dir():
continue
day_files = sorted(day_dir.glob('*.jpg'))
picked = _pick_nearest_frame_for_day(job, day_files)
if picked is not None:
selected.append(picked)
days_with_frames += 1
logger.info(
'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=daily_anchor',
job.id,
len(selected),
days_total,
days_with_frames,
)
return selected, days_total, days_with_frames
# Для остальных интервалов применяем шаг по времени по общей временной оси.
interval_seconds = int(job.sampling_interval_minutes) * 60
last_selected_dt: datetime | None = None
for day in _iter_dates(job.date_from, job.date_to):
days_total += 1
day_dir = camera_root / day.isoformat()
if not day_dir.exists() or not day_dir.is_dir():
continue
day_has_selected = False
day_files = sorted(day_dir.glob('*.jpg'))
for img_path in day_files:
if img_path.name.lower() == 'lastsnap.jpg':
continue
try:
snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time()
except ValueError:
continue
if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time):
continue
current_dt = datetime.combine(day, snap_time)
if last_selected_dt is None or (current_dt - last_selected_dt).total_seconds() >= interval_seconds:
selected.append(img_path)
last_selected_dt = current_dt
day_has_selected = True
if day_has_selected:
days_with_frames += 1
logger.info(
'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=interval',
job.id,
len(selected),
days_total,
days_with_frames,
)
return selected, days_total, days_with_frames
def _copy_frames_to_temp(job: TimelapseJob, frame_paths: list[Path], temp_dir: Path):
logger.info('worker:copy_frames:start job_id=%s count=%s', job.id, len(frame_paths))
temp_dir.mkdir(parents=True, exist_ok=True)
total = len(frame_paths)
for index, src in enumerate(frame_paths, start=1):
dst = temp_dir / f'{index:06d}.jpg'
shutil.copy2(src, dst)
progress = min(70, int((index / total) * 70))
TimelapseJob.objects.filter(pk=job.pk).update(
frames_processed=index,
progress_percent=progress,
)
logger.info('worker:copy_frames:done job_id=%s', job.id)
def _build_output_path(job: TimelapseJob) -> tuple[Path, str]:
export_dir = Path(settings.TIMELAPS_EXPORT_DIR)
export_dir.mkdir(parents=True, exist_ok=True)
day_mode = 'daynight' if job.include_night else 'dayonly'
filename = (
f'{job.camera.slug}_{job.date_from.strftime("%Y%m%d")}_{job.date_to.strftime("%Y%m%d")}'
f'_s{job.sampling_interval_minutes}m_fps{job.fps}_{day_mode}_job{job.id}.mp4'
)
output_path = export_dir / filename
rel_path = f'timelapses/{filename}'
return output_path, rel_path
def _run_ffmpeg(job: TimelapseJob, temp_dir: Path, output_path: Path):
logger.info('worker:ffmpeg:start job_id=%s', job.id)
cmd = [
'ffmpeg',
'-y',
'-hide_banner',
'-loglevel',
'error',
'-framerate',
str(job.fps),
'-i',
str(temp_dir / '%06d.jpg'),
'-c:v',
'libx264',
'-preset',
'veryfast',
'-pix_fmt',
'yuv420p',
str(output_path),
]
subprocess.run(cmd, check=True)
logger.info('worker:ffmpeg:done job_id=%s', job.id)
def claim_next_job() -> TimelapseJob | None:
logger.info('worker:claim_next:start')
candidate = TimelapseJob.objects.filter(status=TimelapseJob.Status.PLANNED).order_by('created_at').first()
if not candidate:
logger.info('worker:claim_next:done no_jobs=true')
return None
updated = TimelapseJob.objects.filter(
pk=candidate.pk,
status=TimelapseJob.Status.PLANNED,
).update(
status=TimelapseJob.Status.RUNNING,
started_at=timezone.now(),
progress_percent=1,
frames_processed=0,
frames_total=None,
error_message='',
)
if not updated:
logger.info('worker:claim_next:done race_lost=true')
return None
job = TimelapseJob.objects.select_related('camera').get(pk=candidate.pk)
logger.info('worker:claim_next:done job_id=%s', job.id)
return job
def process_job(job: TimelapseJob) -> bool:
logger.info('worker:process_job:start job_id=%s', job.id)
temp_dir = Path(settings.TIMELAPS_EXPORT_DIR) / '_tmp' / f'job_{job.id}'
try:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
camera_root = _safe_camera_root(job)
frame_paths, days_total, days_with_frames = _select_frames(job, camera_root)
days_skipped = max(0, days_total - days_with_frames)
if not frame_paths:
TimelapseJob.objects.filter(pk=job.pk).update(
days_total=days_total,
days_with_frames=days_with_frames,
days_skipped=days_skipped,
)
raise RuntimeError('Не найдено кадров под выбранные параметры.')
TimelapseJob.objects.filter(pk=job.pk).update(
frames_total=len(frame_paths),
frames_processed=0,
days_total=days_total,
days_with_frames=days_with_frames,
days_skipped=days_skipped,
progress_percent=5,
)
_copy_frames_to_temp(job, frame_paths, temp_dir)
output_path, rel_path = _build_output_path(job)
TimelapseJob.objects.filter(pk=job.pk).update(progress_percent=90)
_run_ffmpeg(job, temp_dir, output_path)
TimelapseJob.objects.filter(pk=job.pk).update(
status=TimelapseJob.Status.SUCCESS,
progress_percent=100,
output_rel_path=rel_path,
finished_at=timezone.now(),
error_message='',
frames_processed=len(frame_paths),
)
logger.info('worker:process_job:done job_id=%s', job.id)
return True
except Exception as exc:
logger.exception('worker:process_job:error job_id=%s', job.id)
TimelapseJob.objects.filter(pk=job.pk).update(
status=TimelapseJob.Status.ERROR,
finished_at=timezone.now(),
error_message=str(exc)[:1000],
)
return False
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
def run_one_job() -> bool:
logger.info('worker:run_one:start')
job = claim_next_job()
if not job:
logger.info('worker:run_one:done processed=false')
return False
process_job(job)
logger.info('worker:run_one:done processed=true')
return True
def run_specific_job(job_id: int) -> bool:
logger.info('worker:run_specific:start job_id=%s', job_id)
job = TimelapseJob.objects.select_related('camera').filter(pk=job_id).first()
if not job:
logger.info('worker:run_specific:done job_not_found=true job_id=%s', job_id)
return False
if job.status == TimelapseJob.Status.RUNNING:
logger.info('worker:run_specific:done already_running=true job_id=%s', job_id)
return False
TimelapseJob.objects.filter(pk=job_id).update(
status=TimelapseJob.Status.RUNNING,
started_at=timezone.now(),
finished_at=None,
progress_percent=1,
frames_processed=0,
frames_total=None,
days_total=0,
days_with_frames=0,
days_skipped=0,
error_message='',
)
job = TimelapseJob.objects.select_related('camera').get(pk=job_id)
process_job(job)
logger.info('worker:run_specific:done job_id=%s', job_id)
return True