Source code for cModules.VideoTo3D
import os
import sys
import os.path
import numpy as np
import cv2
import coat
import multiprocessing
import threading
threads = []
save_threads = []
frames_sharp = []
frames_sharpMax = []
[docs]
def estimateFrameSharpness(frame: np.ndarray) -> float:
sigma = 0.33
median = np.median(frame)
vMin = int(max(0, (1.0 - sigma) * median))
vMax = int(min(255, (1.0 + sigma) * median))
mean, std = cv2.meanStdDev(cv2.Canny(frame, vMin, vMax))
return mean[0][0] * std[0][0]
[docs]
def calcSharpness(thread_id):
global threads
threads[thread_id].sharpness = estimateFrameSharpness(threads[thread_id].frame)
[docs]
def VideoTo3D(inputVideo, output_path, shotCount):
if not os.path.exists(inputVideo):
return
vidcap = cv2.VideoCapture(inputVideo)
success, frame = vidcap.read()
if not os.path.exists(output_path):
os.makedirs(output_path)
frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
window_size = frame_count / shotCount
frame_available = True
last_sharpness = 0
next_check_point = window_size
shot_id = 0
frame_id = 0
output_format = "jpg"
thread_count = max(1, multiprocessing.cpu_count() // 2)
sharp_frame = frame
global frames_sharp
global frames_sharpMax
global threads, save_threads
save_threads.clear()
while frame_available:
success, frameR = vidcap.read()
if success:
thread_id = frame_id % thread_count
if len(threads) <= thread_id:
threads.append(frame_thread())
if frame_id > thread_count:
threads[thread_id].thread.join()
frames_sharp.append(threads[thread_id].sharpness)
if last_sharpness < threads[thread_id].sharpness:
sharp_frame = threads[thread_id].frame
last_sharpness = threads[thread_id].sharpness
if frame_id > next_check_point:
frame_path = os.path.join(output_path, "shot%04d.%s" % (shot_id, output_format))
# cv2.imwrite(frame_path, sharp_frame)
# shot_frame = np.array(sharp_frame, copy=True)
while len(frames_sharpMax) < len(frames_sharp):
frames_sharpMax.append(last_sharpness)
thread = threading.Thread(target=lambda frame_path, sharp_frame: cv2.imwrite(frame_path, sharp_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) , args=([frame_path, sharp_frame]))
thread.start()
save_threads.append(thread)
shot_id += 1
next_check_point = window_size*shot_id+window_size
coat.io.progressBar(shot_id, shotCount, "Getting sharp frames from the video")
last_sharpness = 0
threads[thread_id].frame = frameR
threads[thread_id].sharpness = -1
# do_calc_sarp(thread_id)
threads[thread_id].thread = threading.Thread(target=calcSharpness, args=([thread_id]))
threads[thread_id].thread.start()
frame_id += 1
else:
frame_available = False
vidcap.release()