"""Фоновый воркер сборки таймлапса: выбор кадров из storage и сборка видео через ffmpeg.""" import logging import shutil import subprocess from datetime import date, datetime, time, timedelta from pathlib import Path from django.conf import settings from django.utils import timezone from ..models import TimelapseJob logger = logging.getLogger('camlaps') def _iter_dates(date_from: date, date_to: date): """Итерируется по датам включительно в указанном диапазоне.""" current = date_from while current <= date_to: yield current current += timedelta(days=1) def _safe_camera_root(job: TimelapseJob) -> Path: """Возвращает безопасный путь к каталогу камеры внутри STORAGE_PATH.""" storage_root = Path(settings.STORAGE_PATH).resolve() camera_root = (storage_root / job.camera.storage_path).resolve() if camera_root != storage_root and storage_root not in camera_root.parents: raise RuntimeError('Некорректный путь камеры.') return camera_root def _is_day_time(snapshot_time: time, day_start: time, day_end: time) -> bool: return day_start <= snapshot_time <= day_end def _time_to_minutes(t: time) -> int: return t.hour * 60 + t.minute def _pick_nearest_frame_for_day(job: TimelapseJob, day_files: list[Path]) -> Path | None: """Выбирает кадр дня, ближайший к anchor_time с учетом фильтра дня/ночи.""" candidates: list[tuple[int, Path]] = [] anchor_minutes = _time_to_minutes(job.anchor_time) for img_path in day_files: if img_path.name.lower() == 'lastsnap.jpg': continue try: snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time() except ValueError: continue if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time): continue diff = abs(_time_to_minutes(snap_time) - anchor_minutes) candidates.append((diff, img_path)) if not candidates: return None candidates.sort(key=lambda x: (x[0], x[1].name)) return candidates[0][1] def _select_frames(job: TimelapseJob, camera_root: Path) -> tuple[list[Path], int, int]: """Подбирает кадры и возвращает (кадры, всего_дней, дней_с_кадрами).""" logger.info('worker:select_frames:start job_id=%s', job.id) selected: list[Path] = [] days_total = 0 days_with_frames = 0 # Для 1 кадра/сутки берем ближайший кадр к якорному времени в каждом дне. if int(job.sampling_interval_minutes) == 1440: for day in _iter_dates(job.date_from, job.date_to): days_total += 1 day_dir = camera_root / day.isoformat() if not day_dir.exists() or not day_dir.is_dir(): continue day_files = sorted(day_dir.glob('*.jpg')) picked = _pick_nearest_frame_for_day(job, day_files) if picked is not None: selected.append(picked) days_with_frames += 1 logger.info( 'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=daily_anchor', job.id, len(selected), days_total, days_with_frames, ) return selected, days_total, days_with_frames # Для остальных интервалов применяем шаг по времени по общей временной оси. interval_seconds = int(job.sampling_interval_minutes) * 60 last_selected_dt: datetime | None = None for day in _iter_dates(job.date_from, job.date_to): days_total += 1 day_dir = camera_root / day.isoformat() if not day_dir.exists() or not day_dir.is_dir(): continue day_has_selected = False day_files = sorted(day_dir.glob('*.jpg')) for img_path in day_files: if img_path.name.lower() == 'lastsnap.jpg': continue try: snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time() except ValueError: continue if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time): continue current_dt = datetime.combine(day, snap_time) if last_selected_dt is None or (current_dt - last_selected_dt).total_seconds() >= interval_seconds: selected.append(img_path) last_selected_dt = current_dt day_has_selected = True if day_has_selected: days_with_frames += 1 logger.info( 'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=interval', job.id, len(selected), days_total, days_with_frames, ) return selected, days_total, days_with_frames def _copy_frames_to_temp(job: TimelapseJob, frame_paths: list[Path], temp_dir: Path): logger.info('worker:copy_frames:start job_id=%s count=%s', job.id, len(frame_paths)) temp_dir.mkdir(parents=True, exist_ok=True) total = len(frame_paths) for index, src in enumerate(frame_paths, start=1): dst = temp_dir / f'{index:06d}.jpg' shutil.copy2(src, dst) progress = min(70, int((index / total) * 70)) TimelapseJob.objects.filter(pk=job.pk).update( frames_processed=index, progress_percent=progress, ) logger.info('worker:copy_frames:done job_id=%s', job.id) def _build_output_path(job: TimelapseJob) -> tuple[Path, str]: export_dir = Path(settings.TIMELAPS_EXPORT_DIR) export_dir.mkdir(parents=True, exist_ok=True) day_mode = 'daynight' if job.include_night else 'dayonly' filename = ( f'{job.camera.slug}_{job.date_from.strftime("%Y%m%d")}_{job.date_to.strftime("%Y%m%d")}' f'_s{job.sampling_interval_minutes}m_fps{job.fps}_{day_mode}_job{job.id}.mp4' ) output_path = export_dir / filename rel_path = f'timelapses/{filename}' return output_path, rel_path def _run_ffmpeg(job: TimelapseJob, temp_dir: Path, output_path: Path): logger.info('worker:ffmpeg:start job_id=%s', job.id) cmd = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-framerate', str(job.fps), '-i', str(temp_dir / '%06d.jpg'), '-c:v', 'libx264', '-preset', 'veryfast', '-pix_fmt', 'yuv420p', str(output_path), ] subprocess.run(cmd, check=True) logger.info('worker:ffmpeg:done job_id=%s', job.id) def claim_next_job() -> TimelapseJob | None: logger.info('worker:claim_next:start') candidate = TimelapseJob.objects.filter(status=TimelapseJob.Status.PLANNED).order_by('created_at').first() if not candidate: logger.info('worker:claim_next:done no_jobs=true') return None updated = TimelapseJob.objects.filter( pk=candidate.pk, status=TimelapseJob.Status.PLANNED, ).update( status=TimelapseJob.Status.RUNNING, started_at=timezone.now(), progress_percent=1, frames_processed=0, frames_total=None, error_message='', ) if not updated: logger.info('worker:claim_next:done race_lost=true') return None job = TimelapseJob.objects.select_related('camera').get(pk=candidate.pk) logger.info('worker:claim_next:done job_id=%s', job.id) return job def process_job(job: TimelapseJob) -> bool: logger.info('worker:process_job:start job_id=%s', job.id) temp_dir = Path(settings.TIMELAPS_EXPORT_DIR) / '_tmp' / f'job_{job.id}' try: if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) camera_root = _safe_camera_root(job) frame_paths, days_total, days_with_frames = _select_frames(job, camera_root) days_skipped = max(0, days_total - days_with_frames) if not frame_paths: TimelapseJob.objects.filter(pk=job.pk).update( days_total=days_total, days_with_frames=days_with_frames, days_skipped=days_skipped, ) raise RuntimeError('Не найдено кадров под выбранные параметры.') TimelapseJob.objects.filter(pk=job.pk).update( frames_total=len(frame_paths), frames_processed=0, days_total=days_total, days_with_frames=days_with_frames, days_skipped=days_skipped, progress_percent=5, ) _copy_frames_to_temp(job, frame_paths, temp_dir) output_path, rel_path = _build_output_path(job) TimelapseJob.objects.filter(pk=job.pk).update(progress_percent=90) _run_ffmpeg(job, temp_dir, output_path) TimelapseJob.objects.filter(pk=job.pk).update( status=TimelapseJob.Status.SUCCESS, progress_percent=100, output_rel_path=rel_path, finished_at=timezone.now(), error_message='', frames_processed=len(frame_paths), ) logger.info('worker:process_job:done job_id=%s', job.id) return True except Exception as exc: logger.exception('worker:process_job:error job_id=%s', job.id) TimelapseJob.objects.filter(pk=job.pk).update( status=TimelapseJob.Status.ERROR, finished_at=timezone.now(), error_message=str(exc)[:1000], ) return False finally: if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) def run_one_job() -> bool: logger.info('worker:run_one:start') job = claim_next_job() if not job: logger.info('worker:run_one:done processed=false') return False process_job(job) logger.info('worker:run_one:done processed=true') return True def run_specific_job(job_id: int) -> bool: logger.info('worker:run_specific:start job_id=%s', job_id) job = TimelapseJob.objects.select_related('camera').filter(pk=job_id).first() if not job: logger.info('worker:run_specific:done job_not_found=true job_id=%s', job_id) return False if job.status == TimelapseJob.Status.RUNNING: logger.info('worker:run_specific:done already_running=true job_id=%s', job_id) return False TimelapseJob.objects.filter(pk=job_id).update( status=TimelapseJob.Status.RUNNING, started_at=timezone.now(), finished_at=None, progress_percent=1, frames_processed=0, frames_total=None, days_total=0, days_with_frames=0, days_skipped=0, error_message='', ) job = TimelapseJob.objects.select_related('camera').get(pk=job_id) process_job(job) logger.info('worker:run_specific:done job_id=%s', job_id) return True