|
|
|
|
|
|
|
# ---- MODULES ----#
|
|
|
|
import face_recognition
|
|
|
|
import cv2
|
|
|
|
import time
|
|
|
|
import logging
|
|
|
|
from datetime import datetime
|
|
|
|
from collections import deque
|
|
|
|
# ---- MODULES ----#
|
|
|
|
|
|
|
|
# ---- CLASSES ----#
|
|
|
|
from classes.IPCamera import IPCamera
|
|
|
|
from classes.ImageDataset import ImageDataset
|
|
|
|
from classes.IPWall import IPWall
|
|
|
|
from classes.Conf import Conf
|
|
|
|
# ---- CLASSES ----#
|
|
|
|
|
|
|
|
class Main:
|
|
|
|
def __init__(self):
|
|
|
|
self.config_logs()
|
|
|
|
self.config = Conf("conf.json")
|
|
|
|
self.src = f"rtsp://{self.config.get_value('login_camera')}:{self.config.get_value('senha_camera')}@{self.config.get_value('url_camera')}"
|
|
|
|
self.image_dataset = ImageDataset("imagens")
|
|
|
|
self.ip_camera = IPCamera(self.src)
|
|
|
|
self.ip_wall = IPWall()
|
|
|
|
|
|
|
|
self.min_area = int(self.config.get_value("min_area"))
|
|
|
|
self.sleep_time_after_opened = int(self.config.get_value("sleep_time_after_opened"))
|
|
|
|
|
|
|
|
self.buffer = deque(maxlen=10)
|
|
|
|
|
|
|
|
|
|
|
|
def main(self):
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
frame = self.ip_camera.frame
|
|
|
|
if frame is not None:
|
|
|
|
frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
|
|
|
|
|
|
|
|
face_locations = face_recognition.face_locations(rgb_frame)
|
|
|
|
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
|
|
|
|
array_faces = []
|
|
|
|
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
|
|
|
|
area = abs(top-bottom)*abs(left-right)
|
|
|
|
array_faces.append((area, face_encoding))
|
|
|
|
if len(array_faces) == 0:
|
|
|
|
time.sleep(0.2)
|
|
|
|
continue
|
|
|
|
array_faces_sorted = sorted(array_faces, key=lambda x: -x[0])
|
|
|
|
chosen_face_area = array_faces_sorted[0][0]
|
|
|
|
chosen_face_encoding = array_faces_sorted[0][1]
|
|
|
|
flag_sleep = False
|
|
|
|
if chosen_face_area < self.min_area:
|
|
|
|
filename_info = "detectadaNaoComparada"
|
|
|
|
else:
|
|
|
|
ret, name = self.image_dataset.compare_encodings(chosen_face_encoding, float(self.config.get_value("tolerance")))
|
|
|
|
if not ret:
|
|
|
|
filename_info = "desconhecido"
|
|
|
|
else:
|
|
|
|
opened_door = self.ip_wall.open_door()
|
|
|
|
if not opened_door:
|
|
|
|
filename_info = f"{name}TentouEntrar"
|
|
|
|
else:
|
|
|
|
self.log.info(f"Porta Aberta por {name}")
|
|
|
|
filename_info = f"{name}Entrou"
|
|
|
|
flag_sleep = True
|
|
|
|
|
|
|
|
image_filename = f"/home/simplesip/Porteiro/logs/entradas/{datetime.now().strftime('%d-%m-%Y_%H:%M:%S')}_{filename_info}_area{chosen_face_area}.png"
|
|
|
|
|
|
|
|
self.insere_buffer((image_filename, frame.copy()))
|
|
|
|
|
|
|
|
if flag_sleep:
|
|
|
|
self.descarrega_buffer()
|
|
|
|
time.sleep(self.sleep_time_after_opened)
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.log.error("Falha ao acessar frame da camera")
|
|
|
|
self.ip_camera.create_connection()
|
|
|
|
except Exception as e:
|
|
|
|
self.log.critical(str(e))
|
|
|
|
break
|
|
|
|
|
|
|
|
def config_logs(self):
|
|
|
|
logging.basicConfig(
|
|
|
|
level=logging.DEBUG,
|
|
|
|
format='%(asctime)s %(name)s %(levelname)s %(message)s',
|
|
|
|
filename="/home/simplesip/Porteiro/logs/execution_logs/porteiro.log",
|
|
|
|
filemode='a'
|
|
|
|
)
|
|
|
|
self.log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
def insere_buffer(self, tupla):
|
|
|
|
self.log.info("Inserindo no buffer")
|
|
|
|
self.buffer.appendleft(tupla)
|
|
|
|
|
|
|
|
def descarrega_buffer(self):
|
|
|
|
self.log.info("Descarregando buffer")
|
|
|
|
for (image_filename, frame) in self.buffer:
|
|
|
|
try:
|
|
|
|
cv2.imwrite(filename=image_filename,img=frame)
|
|
|
|
except cv2.error as e:
|
|
|
|
self.log.info(f"Erro ao salvar a imagem {image_filename}: {e}")
|
|
|
|
self.buffer.clear()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main = Main()
|
|
|
|
main.main()
|