s스크립트는 웹캠에서 이미지를 캡처하여 AWS S3에 업로드한 다음 S3 URL을 MySQL 데이터베이스에 저장”
[공지사항] 지킬블로그 안내드립니다.
지킬 블로그에 대하여 알아보겠습니다.
import boto3
import pymysql
from datetime import datetime
import pytz
import os
import mimetypes
import cv2
import numpy as np
- boto3: AWS SDK for Python으로, Amazon S3와 같은 AWS 서비스와 상호 작용하기 위해 사용합니다.
- pymysql: MySQL 데이터베이스와의 상호 작용을 위해 사용합니다.
- datetime, pytz: 날짜와 시간대 처리를 위해 사용합니다.
- os: 운영 체제와 상호 작용하고 환경 변수를 읽기 위해 사용합니다.
- mimetypes: 파일의 MIME 타입을 추정하기 위해 사용합니다.
- cv2: OpenCV 라이브러리로, 컴퓨터 비전 작업(예: 이미지 및 비디오 처리)을 위해 사용합니다.
- numpy: 수치 연산을 위한 라이브러리로, 배열 연산에 주로 사용됩니다.
MySQL 연결 함수
def mysql_connection():
try:
connection = pymysql.connect(
host=os.getenv('MYSQL_HOST'),
user=os.getenv('MYSQL_USER'),
password=os.getenv('MYSQL_PASSWORD'),
database=os.getenv('MYSQL_DATABASE'),
)
return connection
except pymysql.MySQLError as e:
print(f"Error connecting to MySQL: {e}")
return None
-
mysql_connection
함수는 MySQL 데이터베이스에 연결합니다.
os.getenv('ENV_VARIABLE')를 사용하여 환경 변수에서 MySQL 연결 정보를 읽어옵니다.pymysql.connect메서드를 사용하여 MySQL에 연결하고, 연결 객체를 반환합니다.- 연결에 실패하면 예외를 캐치하여 오류 메시지를 출력하고
None을 반환합니다.
한국 시간대를 고려하여 현재 시간을 반환하는 함수
def get_kst_time():
kst = pytz.timezone('Asia/Seoul')
kst_time = datetime.now(kst)
return kst_time.strftime('%Y-%m-%d %H:%M:%S')
-
get_kst_time
함수는 현재 시간을 한국 표준시(KST)로 반환합니다.
pytz.timezone('Asia/Seoul')을 사용하여 서울 시간대를 설정합니다.datetime.now(kst)를 사용하여 현재 서울 시간을 가져옵니다.strftime('%Y-%m-%d %H:%M:%S')을 사용하여 시간을 포맷된 문자열로 반환합니다.
이미지 URL을 MySQL에 삽입하는 함수
def insert_image_url_to_mysql(connection, image_name, image_url):
try:
with connection.cursor() as cursor:
detection_time = get_kst_time()
sql = "INSERT INTO myapp_images (image_name, image_url, Detection_Time) VALUES (%s, %s, %s)"
cursor.execute(sql, (image_name, image_url, detection_time))
connection.commit()
print("Image URL inserted successfully")
except pymysql.MySQLError as e:
print(f"Error inserting URL to MySQL: {e}")
-
insert_image_url_to_mysql
함수는 이미지의 이름과 URL을 MySQL 데이터베이스에 삽입합니다.
connection.cursor()를 사용하여 커서를 생성합니다.get_kst_time()함수를 호출하여 감지 시간을 가져옵니다.- SQL INSERT 쿼리를 작성하고
cursor.execute로 실행합니다. connection.commit()을 호출하여 변경 사항을 커밋합니다.- 예외 발생 시 오류 메시지를 출력합니다.
- cursor: 하나의 DB connection에 대하여 독립적으로 SQL 문을 실행할 수 있는 작업환경을 제공하는 객체.
- 하나의 connection에 동시에 한개의 cursor만 생성가능
- cursor를 통해서 SQL 문을 실행할 수 있으며, 응용 프로그램이 실행결과를 투플 단위로 접근할 수 있도록함.
VALUES절에서%s를 사용하는 것은 파라미터화된 쿼리로, SQL 인젝션 공격을 방지하고 코드의 가독성을 높이는 방법입니다.sql변수에 쿼리를 문자열로 정의합니다.(%s, %s, %s)는 플레이스홀더로, 나중에 값으로 대체됩니다.
S3 연결 함수
def s3_connection():
try:
s3 = boto3.client(
service_name=os.getenv('AWS_SERVICE_NAME'),
region_name=os.getenv('AWS_REGION_NAME'),
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
)
except Exception as e:
print(e)
return None
else:
print("s3 bucket connected!")
return s3
-
s3_connection
함수는 AWS S3에 연결합니다.
- 환경 변수에서 S3 자격 증명과 서비스 정보를 읽어옵니다.
boto3.client를 사용하여 S3 클라이언트를 생성합니다.- 예외 발생 시 오류 메시지를 출력하고
None을 반환합니다. - 연결에 성공하면 “s3 bucket connected!”를 출력하고 S3 클라이언트를 반환합니다.
파일을 S3에 업로드하고 URL을 반환하는 함수
def upload_file_to_s3_and_get_url(s3, file_name, bucket_name, object_name):
try:
content_type, _ = mimetypes.guess_type(file_name)
if content_type is None:
content_type = 'binary/octet-stream'
with open(file_name, 'rb') as file:
s3.upload_fileobj(
file,
bucket_name,
object_name,
ExtraArgs={'ContentType': content_type}
)
print("File uploaded successfully")
s3_url = f"https://{bucket_name}.s3.amazonaws.com/{object_name}"
return s3_url
except Exception as e:
print(e)
return None
-
upload_file_to_s3_and_get_url
함수는 파일을 S3에 업로드하고 업로드된 파일의 URL을 반환합니다.
mimetypes.guess_type을 사용하여 파일의 MIME 타입을 추정합니다.None일 경우 기본값으로 ‘binary/octet-stream’을 설정합니다.- 파일을 바이너리 모드(‘rb’)로 열고
s3.upload_fileobj를 사용하여 S3에 업로드합니다. - 업로드가 성공하면 파일의 S3 URL을 반환합니다.
- 예외 발생 시 오류 메시지를 출력하고
None을 반환합니다.
MySQL에서 데이터를 조회하는 함수
def fetch_images_from_mysql(connection):
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM myapp_images"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
print(row)
except pymysql.MySQLError as e:
print(f"Error fetching data from MySQL: {e}")
-
fetch_images_from_mysql
함수는 MySQL 데이터베이스에서 이미지를 조회합니다.
connection.cursor()를 사용하여 커서를 생성합니다.- SQL SELECT 쿼리를 작성하고
cursor.execute로 실행합니다. cursor.fetchall()로 모든 결과를 가져와 반복하며 출력합니다.- 예외 발생 시 오류 메시지를 출력합니다.
YOLO 모델을 로드하는 함수
def load_yolo_model(cfg_path, weights_path, names_path):
net = cv2.dnn.readNet(weights_path, cfg_path)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
with open(names_path, "r") as f:
classes = [line.strip() for line in f.readlines()]
return net, classes, output_layers
-
load_yolo_model
함수는 YOLO 모델을 로드하고 필요한 레이어와 클래스 이름을 반환합니다.
cv2.dnn.readNet를 사용하여 YOLO 모델을 로드합니다.- 네트워크의 모든 레이어 이름을 가져옵니다.
net.getUnconnectedOutLayers를 사용하여 출력 레이어를 가져옵니다.- 클래스 이름을 파일에서 읽어 리스트로 반환합니다.
객체를 감지하고 이미지를 저장하는 함수
def detect_and_save_image(net, output_layers, frame, classes):
height, width, channels = frame.shape
blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
net.setInput(blob)
outs = net.forward(output_layers)
class_ids = []
confidences = []
boxes = []
for out in outs:
for detection in out:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > 0.5:
center_x = int(detection[0] * width)
center_y = int(detection[1] * height)
w = int(detection[2] * width)
h = int(detection[3] * height)
x = int(center_x - w / 2)
y = int(center_y - h / 2)
boxes.append([x, y, w, h])
confidences.append(float(confidence))
class_ids.append(class_id)
indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
for i in range(len(boxes)):
if i in indexes:
x, y, w, h = boxes[i]
label = str(classes[class_ids[i]])
color = (0, 255, 0)
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_PLAIN, 1, color, 2)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
image_name = f"detected_{timestamp}.jpg"
cv2.imwrite(image_name, frame)
return image_name
return None
-
detect_and_save_image
함수는 YOLO 모델을 사용하여 객체를 감지하고, 감지된 객체가 포함된 이미지를 저장합니다.
- 입력 이미지의 크기를 가져옵니다.
cv2.dnn.blobFromImage를 사용하여 입력 이미지를 YOLO 모델에 맞게 변환합니다.net.setInput을 사용하여 변환된 이미지를 모델에 입력합니다.net.forward를 사용하여 출력 레이어에서 예측 결과를 가져옵니다.- 감지된 객체의 클래스 ID, 신뢰도, 경계 상자를 리스트에 저장합니다.
cv2.dnn.NMSBoxes를 사용하여 비최대 억제(NMS)를 적용하여 중복 경계 상자를 제거합니다.- 감지된 객체의 경계 상자와 클래스 라벨을 이미지에 그립니다.
- 현재 시간을 기반으로 파일 이름을 생성하고 이미지를 저장합니다.
- 저장된 이미지의 파일 이름을 반환합니다.
메인 함수
if __name__ == "__main__":
s3 = s3_connection()
if s3:
cap = cv2.VideoCapture(0)
cfg_path = "./yolov3-tiny.cfg"
weights_path = "./yolov3-tiny.weights"
names_path = "./label.names"
if not os.path.isfile(cfg_path) or not os.path.isfile(weights_path) or not os.path.isfile(names_path):
print("Error: YOLO configuration or weight files not found.")
else:
net, classes, output_layers = load_yolo_model(cfg_path, weights_path, names_path)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
print("캡처 종료")
break
cv2.imshow("Video", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
image_name = detect_and_save_image(net, output_layers, frame, classes)
if image_name:
file_path = f"./{image_name}"
s3_url = upload_file_to_s3_and_get_url(s3, file_path, os.getenv('S3_BUCKET_NAME'), image_name)
if s3_url:
connection = mysql_connection()
if connection:
insert_image_url_to_mysql(connection, image_name, s3_url)
connection.close()
if cv2.waitKey(1) & 0xFF == ord('e'):
break
cap.release()
cv2.destroyAllWindows()
-
메인 함수
는 전체 파이프라인을 실행합니다.
- S3 연결을 시도하고 성공하면 웹캠을 캡처하는 객체를 생성합니다.
- YOLO 모델의 구성, 가중치, 클래스 파일 경로를 설정합니다.
- 파일이 존재하는지 확인하고, 존재하지 않으면 오류 메시지를 출력합니다.
- YOLO 모델을 로드하고 웹캠 캡처를 시작합니다.
- 웹캠에서 프레임을 읽고, ‘q’ 키가 눌리면 객체 감지를 수행하고 이미지를 저장합니다.
- 저장된 이미지를 S3에 업로드하고, 업로드된 URL을 MySQL에 삽입합니다.
- ‘e’ 키가 눌리면 루프를 종료하고 웹캠과 모든 창을 해제합니다.
이 코드의 전반적인 흐름은 웹캠에서 영상을 캡처하여 YOLO 모델로 객체를 감지하고, 감지된 이미지를 S3에 업로드하고 MySQL에 URL을 저장하는 것입니다. 전체 과정을 통해 실시간으로 객체 감지와 결과 저장이 이루어집니다.
댓글남기기