import logging import shutil import subprocess from datetime import date, datetime, time, timedelta from pathlib import Path from django.conf import settings from django.utils import timezone from ..models import TimelapseJob logger = logging.getLogger('camlaps') def _iter_dates(date_from: date, date_to: date): current = date_from while current <= date_to: yield current current += timedelta(days=1) def _safe_camera_root(job: TimelapseJob) -> Path: storage_root = Path(settings.STORAGE_PATH).resolve() camera_root = (storage_root / job.camera.storage_path).resolve() if camera_root != storage_root and storage_root not in camera_root.parents: raise RuntimeError('Некорректный путь камеры.') return camera_root def _is_day_time(snapshot_time: time, day_start: time, day_end: time) -> bool: return day_start <= snapshot_time <= day_end def _select_frames(job: TimelapseJob, camera_root: Path) -> list[Path]: logger.info('worker:select_frames:start job_id=%s', job.id) selected: list[Path] = [] interval_seconds = int(job.sampling_interval_minutes) * 60 last_selected_dt: datetime | None = None for day in _iter_dates(job.date_from, job.date_to): day_dir = camera_root / day.isoformat() if not day_dir.exists() or not day_dir.is_dir(): continue day_files = sorted(day_dir.glob('*.jpg')) for img_path in day_files: if img_path.name.lower() == 'lastsnap.jpg': continue try: snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time() except ValueError: continue if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time): continue current_dt = datetime.combine(day, snap_time) if last_selected_dt is None or (current_dt - last_selected_dt).total_seconds() >= interval_seconds: selected.append(img_path) last_selected_dt = current_dt logger.info('worker:select_frames:done job_id=%s selected=%s', job.id, len(selected)) return selected def _copy_frames_to_temp(job: TimelapseJob, frame_paths: list[Path], temp_dir: Path): logger.info('worker:copy_frames:start job_id=%s count=%s', job.id, len(frame_paths)) temp_dir.mkdir(parents=True, exist_ok=True) total = len(frame_paths) for index, src in enumerate(frame_paths, start=1): dst = temp_dir / f'{index:06d}.jpg' shutil.copy2(src, dst) progress = min(70, int((index / total) * 70)) TimelapseJob.objects.filter(pk=job.pk).update( frames_processed=index, progress_percent=progress, ) logger.info('worker:copy_frames:done job_id=%s', job.id) def _build_output_path(job: TimelapseJob) -> tuple[Path, str]: export_dir = Path(settings.TIMELAPS_EXPORT_DIR) export_dir.mkdir(parents=True, exist_ok=True) filename = ( f'{job.camera.slug}_{job.date_from.strftime("%Y%m%d")}_' f'{job.date_to.strftime("%Y%m%d")}_{job.id}.mp4' ) output_path = export_dir / filename rel_path = f'timelapses/{filename}' return output_path, rel_path def _run_ffmpeg(job: TimelapseJob, temp_dir: Path, output_path: Path): logger.info('worker:ffmpeg:start job_id=%s', job.id) cmd = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-framerate', str(job.fps), '-i', str(temp_dir / '%06d.jpg'), '-c:v', 'libx264', '-preset', 'veryfast', '-pix_fmt', 'yuv420p', str(output_path), ] subprocess.run(cmd, check=True) logger.info('worker:ffmpeg:done job_id=%s', job.id) def claim_next_job() -> TimelapseJob | None: logger.info('worker:claim_next:start') candidate = TimelapseJob.objects.filter(status=TimelapseJob.Status.PLANNED).order_by('created_at').first() if not candidate: logger.info('worker:claim_next:done no_jobs=true') return None updated = TimelapseJob.objects.filter( pk=candidate.pk, status=TimelapseJob.Status.PLANNED, ).update( status=TimelapseJob.Status.RUNNING, started_at=timezone.now(), progress_percent=1, frames_processed=0, frames_total=None, error_message='', ) if not updated: logger.info('worker:claim_next:done race_lost=true') return None job = TimelapseJob.objects.select_related('camera').get(pk=candidate.pk) logger.info('worker:claim_next:done job_id=%s', job.id) return job def process_job(job: TimelapseJob) -> bool: logger.info('worker:process_job:start job_id=%s', job.id) temp_dir = Path(settings.TIMELAPS_EXPORT_DIR) / '_tmp' / f'job_{job.id}' try: if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) camera_root = _safe_camera_root(job) frame_paths = _select_frames(job, camera_root) if not frame_paths: raise RuntimeError('Не найдено кадров под выбранные параметры.') TimelapseJob.objects.filter(pk=job.pk).update( frames_total=len(frame_paths), frames_processed=0, progress_percent=5, ) _copy_frames_to_temp(job, frame_paths, temp_dir) output_path, rel_path = _build_output_path(job) TimelapseJob.objects.filter(pk=job.pk).update(progress_percent=90) _run_ffmpeg(job, temp_dir, output_path) TimelapseJob.objects.filter(pk=job.pk).update( status=TimelapseJob.Status.SUCCESS, progress_percent=100, output_rel_path=rel_path, finished_at=timezone.now(), error_message='', frames_processed=len(frame_paths), ) logger.info('worker:process_job:done job_id=%s', job.id) return True except Exception as exc: logger.exception('worker:process_job:error job_id=%s', job.id) TimelapseJob.objects.filter(pk=job.pk).update( status=TimelapseJob.Status.ERROR, finished_at=timezone.now(), error_message=str(exc)[:1000], ) return False finally: if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) def run_one_job() -> bool: logger.info('worker:run_one:start') job = claim_next_job() if not job: logger.info('worker:run_one:done processed=false') return False process_job(job) logger.info('worker:run_one:done processed=true') return True