You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
111 lines
4.3 KiB
111 lines
4.3 KiB
|
|
# ---- MODULES ----# |
|
import face_recognition |
|
import cv2 |
|
import time |
|
import logging |
|
from datetime import datetime |
|
from collections import deque |
|
# ---- MODULES ----# |
|
|
|
# ---- CLASSES ----# |
|
from classes.IPCamera import IPCamera |
|
from classes.ImageDataset import ImageDataset |
|
from classes.IPWall import IPWall |
|
from classes.Conf import Conf |
|
# ---- CLASSES ----# |
|
|
|
class Main: |
|
def __init__(self): |
|
self.config_logs() |
|
self.config = Conf("conf.json") |
|
self.src = f"rtsp://{self.config.get_value('login_camera')}:{self.config.get_value('senha_camera')}@{self.config.get_value('url_camera')}" |
|
self.image_dataset = ImageDataset("imagens") |
|
self.ip_camera = IPCamera(self.src) |
|
self.ip_wall = IPWall() |
|
|
|
self.min_area = int(self.config.get_value("min_area")) |
|
self.sleep_time_after_opened = int(self.config.get_value("sleep_time_after_opened")) |
|
|
|
self.buffer = deque(maxlen=10) |
|
|
|
|
|
def main(self): |
|
while True: |
|
try: |
|
frame = self.ip_camera.frame |
|
if frame is not None: |
|
frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5) |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
face_locations = face_recognition.face_locations(rgb_frame) |
|
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations) |
|
array_faces = [] |
|
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings): |
|
area = abs(top-bottom)*abs(left-right) |
|
array_faces.append((area, face_encoding)) |
|
if len(array_faces) == 0: |
|
time.sleep(0.2) |
|
continue |
|
array_faces_sorted = sorted(array_faces, key=lambda x: -x[0]) |
|
chosen_face_area = array_faces_sorted[0][0] |
|
chosen_face_encoding = array_faces_sorted[0][1] |
|
flag_sleep = False |
|
if chosen_face_area < self.min_area: |
|
filename_info = "detectadaNaoComparada" |
|
else: |
|
ret, name = self.image_dataset.compare_encodings(chosen_face_encoding, float(self.config.get_value("tolerance"))) |
|
if not ret: |
|
filename_info = "desconhecido" |
|
else: |
|
opened_door = self.ip_wall.open_door() |
|
if not opened_door: |
|
filename_info = f"{name}TentouEntrar" |
|
else: |
|
self.log.info(f"Porta Aberta por {name}") |
|
filename_info = f"{name}Entrou" |
|
flag_sleep = True |
|
|
|
image_filename = f"/home/simplesip/Porteiro/logs/entradas/{datetime.now().strftime('%d-%m-%Y_%H:%M:%S')}_{filename_info}_area{chosen_face_area}.png" |
|
|
|
self.insere_buffer((image_filename, frame.copy())) |
|
|
|
if flag_sleep: |
|
self.descarrega_buffer() |
|
time.sleep(self.sleep_time_after_opened) |
|
|
|
else: |
|
self.log.error("Falha ao acessar frame da camera") |
|
self.ip_camera.create_connection() |
|
except Exception as e: |
|
self.log.critical(str(e)) |
|
break |
|
|
|
def config_logs(self): |
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format='%(asctime)s %(name)s %(levelname)s %(message)s', |
|
filename="/home/simplesip/Porteiro/logs/execution_logs/porteiro.log", |
|
filemode='a' |
|
) |
|
self.log = logging.getLogger(__name__) |
|
|
|
def insere_buffer(self, tupla): |
|
self.log.info("Inserindo no buffer") |
|
self.buffer.appendleft(tupla) |
|
|
|
def descarrega_buffer(self): |
|
self.log.info("Descarregando buffer") |
|
for (image_filename, frame) in self.buffer: |
|
try: |
|
cv2.imwrite(filename=image_filename,img=frame) |
|
except cv2.error as e: |
|
self.log.info(f"Erro ao salvar a imagem {image_filename}: {e}") |
|
self.buffer.clear() |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main = Main() |
|
main.main() |