328 lines
11 KiB
Python
328 lines
11 KiB
Python
"""Фоновый воркер сборки таймлапса: выбор кадров из storage и сборка видео через ffmpeg."""
|
||
|
||
import logging
|
||
import shutil
|
||
import subprocess
|
||
from datetime import date, datetime, time, timedelta
|
||
from pathlib import Path
|
||
|
||
from django.conf import settings
|
||
from django.utils import timezone
|
||
|
||
from ..models import TimelapseJob
|
||
|
||
logger = logging.getLogger('camlaps')
|
||
|
||
|
||
def _iter_dates(date_from: date, date_to: date):
|
||
"""Итерируется по датам включительно в указанном диапазоне."""
|
||
current = date_from
|
||
while current <= date_to:
|
||
yield current
|
||
current += timedelta(days=1)
|
||
|
||
|
||
def _safe_camera_root(job: TimelapseJob) -> Path:
|
||
"""Возвращает безопасный путь к каталогу камеры внутри STORAGE_PATH."""
|
||
storage_root = Path(settings.STORAGE_PATH).resolve()
|
||
camera_root = (storage_root / job.camera.storage_path).resolve()
|
||
|
||
if camera_root != storage_root and storage_root not in camera_root.parents:
|
||
raise RuntimeError('Некорректный путь камеры.')
|
||
|
||
return camera_root
|
||
|
||
|
||
def _is_day_time(snapshot_time: time, day_start: time, day_end: time) -> bool:
|
||
return day_start <= snapshot_time <= day_end
|
||
|
||
|
||
def _time_to_minutes(t: time) -> int:
|
||
return t.hour * 60 + t.minute
|
||
|
||
|
||
def _pick_nearest_frame_for_day(job: TimelapseJob, day_files: list[Path]) -> Path | None:
|
||
"""Выбирает кадр дня, ближайший к anchor_time с учетом фильтра дня/ночи."""
|
||
candidates: list[tuple[int, Path]] = []
|
||
anchor_minutes = _time_to_minutes(job.anchor_time)
|
||
|
||
for img_path in day_files:
|
||
if img_path.name.lower() == 'lastsnap.jpg':
|
||
continue
|
||
|
||
try:
|
||
snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time()
|
||
except ValueError:
|
||
continue
|
||
|
||
if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time):
|
||
continue
|
||
|
||
diff = abs(_time_to_minutes(snap_time) - anchor_minutes)
|
||
candidates.append((diff, img_path))
|
||
|
||
if not candidates:
|
||
return None
|
||
|
||
candidates.sort(key=lambda x: (x[0], x[1].name))
|
||
return candidates[0][1]
|
||
|
||
|
||
def _select_frames(job: TimelapseJob, camera_root: Path) -> tuple[list[Path], int, int]:
|
||
"""Подбирает кадры и возвращает (кадры, всего_дней, дней_с_кадрами)."""
|
||
logger.info('worker:select_frames:start job_id=%s', job.id)
|
||
selected: list[Path] = []
|
||
days_total = 0
|
||
days_with_frames = 0
|
||
|
||
# Для 1 кадра/сутки берем ближайший кадр к якорному времени в каждом дне.
|
||
if int(job.sampling_interval_minutes) == 1440:
|
||
for day in _iter_dates(job.date_from, job.date_to):
|
||
days_total += 1
|
||
day_dir = camera_root / day.isoformat()
|
||
if not day_dir.exists() or not day_dir.is_dir():
|
||
continue
|
||
|
||
day_files = sorted(day_dir.glob('*.jpg'))
|
||
picked = _pick_nearest_frame_for_day(job, day_files)
|
||
if picked is not None:
|
||
selected.append(picked)
|
||
days_with_frames += 1
|
||
|
||
logger.info(
|
||
'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=daily_anchor',
|
||
job.id,
|
||
len(selected),
|
||
days_total,
|
||
days_with_frames,
|
||
)
|
||
return selected, days_total, days_with_frames
|
||
|
||
# Для остальных интервалов применяем шаг по времени по общей временной оси.
|
||
interval_seconds = int(job.sampling_interval_minutes) * 60
|
||
last_selected_dt: datetime | None = None
|
||
|
||
for day in _iter_dates(job.date_from, job.date_to):
|
||
days_total += 1
|
||
day_dir = camera_root / day.isoformat()
|
||
if not day_dir.exists() or not day_dir.is_dir():
|
||
continue
|
||
|
||
day_has_selected = False
|
||
day_files = sorted(day_dir.glob('*.jpg'))
|
||
for img_path in day_files:
|
||
if img_path.name.lower() == 'lastsnap.jpg':
|
||
continue
|
||
|
||
try:
|
||
snap_time = datetime.strptime(img_path.stem, '%H-%M-%S').time()
|
||
except ValueError:
|
||
continue
|
||
|
||
if not job.include_night and not _is_day_time(snap_time, job.day_start_time, job.day_end_time):
|
||
continue
|
||
|
||
current_dt = datetime.combine(day, snap_time)
|
||
if last_selected_dt is None or (current_dt - last_selected_dt).total_seconds() >= interval_seconds:
|
||
selected.append(img_path)
|
||
last_selected_dt = current_dt
|
||
day_has_selected = True
|
||
|
||
if day_has_selected:
|
||
days_with_frames += 1
|
||
|
||
logger.info(
|
||
'worker:select_frames:done job_id=%s selected=%s days_total=%s days_with_frames=%s mode=interval',
|
||
job.id,
|
||
len(selected),
|
||
days_total,
|
||
days_with_frames,
|
||
)
|
||
return selected, days_total, days_with_frames
|
||
|
||
|
||
def _copy_frames_to_temp(job: TimelapseJob, frame_paths: list[Path], temp_dir: Path):
|
||
logger.info('worker:copy_frames:start job_id=%s count=%s', job.id, len(frame_paths))
|
||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
total = len(frame_paths)
|
||
for index, src in enumerate(frame_paths, start=1):
|
||
dst = temp_dir / f'{index:06d}.jpg'
|
||
shutil.copy2(src, dst)
|
||
|
||
progress = min(70, int((index / total) * 70))
|
||
TimelapseJob.objects.filter(pk=job.pk).update(
|
||
frames_processed=index,
|
||
progress_percent=progress,
|
||
)
|
||
|
||
logger.info('worker:copy_frames:done job_id=%s', job.id)
|
||
|
||
|
||
def _build_output_path(job: TimelapseJob) -> tuple[Path, str]:
|
||
export_dir = Path(settings.TIMELAPS_EXPORT_DIR)
|
||
export_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
day_mode = 'daynight' if job.include_night else 'dayonly'
|
||
filename = (
|
||
f'{job.camera.slug}_{job.date_from.strftime("%Y%m%d")}_{job.date_to.strftime("%Y%m%d")}'
|
||
f'_s{job.sampling_interval_minutes}m_fps{job.fps}_{day_mode}_job{job.id}.mp4'
|
||
)
|
||
output_path = export_dir / filename
|
||
rel_path = f'timelapses/{filename}'
|
||
return output_path, rel_path
|
||
|
||
|
||
def _run_ffmpeg(job: TimelapseJob, temp_dir: Path, output_path: Path):
|
||
logger.info('worker:ffmpeg:start job_id=%s', job.id)
|
||
|
||
cmd = [
|
||
'ffmpeg',
|
||
'-y',
|
||
'-hide_banner',
|
||
'-loglevel',
|
||
'error',
|
||
'-framerate',
|
||
str(job.fps),
|
||
'-i',
|
||
str(temp_dir / '%06d.jpg'),
|
||
'-c:v',
|
||
'libx264',
|
||
'-preset',
|
||
'veryfast',
|
||
'-pix_fmt',
|
||
'yuv420p',
|
||
str(output_path),
|
||
]
|
||
|
||
subprocess.run(cmd, check=True)
|
||
logger.info('worker:ffmpeg:done job_id=%s', job.id)
|
||
|
||
|
||
def claim_next_job() -> TimelapseJob | None:
|
||
logger.info('worker:claim_next:start')
|
||
candidate = TimelapseJob.objects.filter(status=TimelapseJob.Status.PLANNED).order_by('created_at').first()
|
||
if not candidate:
|
||
logger.info('worker:claim_next:done no_jobs=true')
|
||
return None
|
||
|
||
updated = TimelapseJob.objects.filter(
|
||
pk=candidate.pk,
|
||
status=TimelapseJob.Status.PLANNED,
|
||
).update(
|
||
status=TimelapseJob.Status.RUNNING,
|
||
started_at=timezone.now(),
|
||
progress_percent=1,
|
||
frames_processed=0,
|
||
frames_total=None,
|
||
error_message='',
|
||
)
|
||
|
||
if not updated:
|
||
logger.info('worker:claim_next:done race_lost=true')
|
||
return None
|
||
|
||
job = TimelapseJob.objects.select_related('camera').get(pk=candidate.pk)
|
||
logger.info('worker:claim_next:done job_id=%s', job.id)
|
||
return job
|
||
|
||
|
||
def process_job(job: TimelapseJob) -> bool:
|
||
logger.info('worker:process_job:start job_id=%s', job.id)
|
||
|
||
temp_dir = Path(settings.TIMELAPS_EXPORT_DIR) / '_tmp' / f'job_{job.id}'
|
||
try:
|
||
if temp_dir.exists():
|
||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||
|
||
camera_root = _safe_camera_root(job)
|
||
frame_paths, days_total, days_with_frames = _select_frames(job, camera_root)
|
||
days_skipped = max(0, days_total - days_with_frames)
|
||
|
||
if not frame_paths:
|
||
TimelapseJob.objects.filter(pk=job.pk).update(
|
||
days_total=days_total,
|
||
days_with_frames=days_with_frames,
|
||
days_skipped=days_skipped,
|
||
)
|
||
raise RuntimeError('Не найдено кадров под выбранные параметры.')
|
||
|
||
TimelapseJob.objects.filter(pk=job.pk).update(
|
||
frames_total=len(frame_paths),
|
||
frames_processed=0,
|
||
days_total=days_total,
|
||
days_with_frames=days_with_frames,
|
||
days_skipped=days_skipped,
|
||
progress_percent=5,
|
||
)
|
||
|
||
_copy_frames_to_temp(job, frame_paths, temp_dir)
|
||
output_path, rel_path = _build_output_path(job)
|
||
|
||
TimelapseJob.objects.filter(pk=job.pk).update(progress_percent=90)
|
||
_run_ffmpeg(job, temp_dir, output_path)
|
||
|
||
TimelapseJob.objects.filter(pk=job.pk).update(
|
||
status=TimelapseJob.Status.SUCCESS,
|
||
progress_percent=100,
|
||
output_rel_path=rel_path,
|
||
finished_at=timezone.now(),
|
||
error_message='',
|
||
frames_processed=len(frame_paths),
|
||
)
|
||
logger.info('worker:process_job:done job_id=%s', job.id)
|
||
return True
|
||
|
||
except Exception as exc:
|
||
logger.exception('worker:process_job:error job_id=%s', job.id)
|
||
TimelapseJob.objects.filter(pk=job.pk).update(
|
||
status=TimelapseJob.Status.ERROR,
|
||
finished_at=timezone.now(),
|
||
error_message=str(exc)[:1000],
|
||
)
|
||
return False
|
||
finally:
|
||
if temp_dir.exists():
|
||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||
|
||
|
||
def run_one_job() -> bool:
|
||
logger.info('worker:run_one:start')
|
||
job = claim_next_job()
|
||
if not job:
|
||
logger.info('worker:run_one:done processed=false')
|
||
return False
|
||
|
||
process_job(job)
|
||
logger.info('worker:run_one:done processed=true')
|
||
return True
|
||
|
||
|
||
def run_specific_job(job_id: int) -> bool:
|
||
logger.info('worker:run_specific:start job_id=%s', job_id)
|
||
job = TimelapseJob.objects.select_related('camera').filter(pk=job_id).first()
|
||
if not job:
|
||
logger.info('worker:run_specific:done job_not_found=true job_id=%s', job_id)
|
||
return False
|
||
|
||
if job.status == TimelapseJob.Status.RUNNING:
|
||
logger.info('worker:run_specific:done already_running=true job_id=%s', job_id)
|
||
return False
|
||
|
||
TimelapseJob.objects.filter(pk=job_id).update(
|
||
status=TimelapseJob.Status.RUNNING,
|
||
started_at=timezone.now(),
|
||
finished_at=None,
|
||
progress_percent=1,
|
||
frames_processed=0,
|
||
frames_total=None,
|
||
days_total=0,
|
||
days_with_frames=0,
|
||
days_skipped=0,
|
||
error_message='',
|
||
)
|
||
|
||
job = TimelapseJob.objects.select_related('camera').get(pk=job_id)
|
||
process_job(job)
|
||
logger.info('worker:run_specific:done job_id=%s', job_id)
|
||
return True |