카테고리 없음
ㅇㅇㅇ
1231.
2025. 2. 20. 11:08
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import socket
import threading
import json
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import base64
import os
import datetime
import ipaddress
import hashlib
class SecureMessengerGUI:
def __init__(self):
self.root = tk.Tk()
self.root.title("보안 메신저")
self.root.geometry("800x600")
# 보안 관련 변수
self.socket = None
self.clients = []
self.nickname = None
self.shared_key = None
self.fernet = None
self.message_counter = 0 # 재전송 공격 방지용
self.received_messages = set() # 중복 메시지 방지
# GUI 초기화
self.setup_gui()
def setup_gui(self):
# 메인 프레임
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 연결 프레임
connection_frame = ttk.LabelFrame(main_frame, text="연결 설정", padding="5")
connection_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# 서버/클라이언트 선택
self.role_var = tk.StringVar(value="client")
ttk.Radiobutton(connection_frame, text="서버", variable=self.role_var,
value="server").grid(row=0, column=0, padx=5)
ttk.Radiobutton(connection_frame, text="클라이언트", variable=self.role_var,
value="client").grid(row=0, column=1, padx=5)
# 닉네임 입력
ttk.Label(connection_frame, text="닉네임:").grid(row=0, column=2, padx=5)
self.nickname_entry = ttk.Entry(connection_frame, width=15)
self.nickname_entry.grid(row=0, column=3, padx=5)
self.nickname_entry.insert(0, f"사용자_{os.urandom(2).hex()}")
# IP 주소 입력 (선택적)
ttk.Label(connection_frame, text="IP:").grid(row=0, column=4, padx=5)
self.ip_entry = ttk.Entry(connection_frame, width=15)
self.ip_entry.grid(row=0, column=5, padx=5)
self.ip_entry.insert(0, "localhost")
# 연결 버튼
self.connect_button = ttk.Button(connection_frame, text="연결",
command=self.start_connection)
self.connect_button.grid(row=0, column=6, padx=5)
# 채팅 영역
self.chat_area = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD,
width=70, height=30)
self.chat_area.grid(row=1, column=0, columnspan=2, pady=5)
self.chat_area.config(state=tk.DISABLED)
# 메시지 입력
self.message_entry = ttk.Entry(main_frame, width=60)
self.message_entry.grid(row=2, column=0, pady=5, sticky=tk.W)
self.message_entry.bind('<Return>', lambda e: self.send_message())
# 전송 버튼
self.send_button = ttk.Button(main_frame, text="전송",
command=self.send_message)
self.send_button.grid(row=2, column=1, pady=5)
# 상태 표시줄
self.status_var = tk.StringVar(value="연결 대기 중...")
self.status_label = ttk.Label(main_frame, textvariable=self.status_var)
self.status_label.grid(row=3, column=0, columnspan=2, sticky=tk.W)
def _generate_key(self):
# 보안성 강화된 키 생성
salt = os.urandom(16) # 랜덤 salt 사용
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000, # OWASP 권장 반복 횟수
)
key = base64.urlsafe_b64encode(kdf.derive(os.urandom(32))) # 랜덤 시드
return key, salt
def _encrypt_message(self, message):
# 메시지 무결성 및 재전송 공격 방지 추가
self.message_counter += 1
message['counter'] = self.message_counter
message['checksum'] = hashlib.sha256(
json.dumps(message, sort_keys=True).encode()
).hexdigest()
return self.fernet.encrypt(json.dumps(message).encode())
def _decrypt_message(self, encrypted_message):
decrypted = json.loads(self.fernet.decrypt(encrypted_message).decode())
# 체크섬 검증
original_checksum = decrypted.pop('checksum')
calculated_checksum = hashlib.sha256(
json.dumps(decrypted, sort_keys=True).encode()
).hexdigest()
if original_checksum != calculated_checksum:
raise ValueError("메시지 무결성 검증 실패")
# 재전송 공격 감지
message_id = f"{decrypted['nickname']}_{decrypted['counter']}"
if message_id in self.received_messages:
raise ValueError("재전송된 메시지 감지")
self.received_messages.add(message_id)
return decrypted
def update_chat(self, message):
self.chat_area.config(state=tk.NORMAL)
self.chat_area.insert(tk.END, message + '\n')
self.chat_area.see(tk.END)
self.chat_area.config(state=tk.DISABLED)
def start_server(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip_entry.get(), 5555))
self.socket.listen()
self.shared_key, self.salt = self._generate_key()
self.fernet = Fernet(self.shared_key)
self.status_var.set("서버 실행 중...")
self.update_chat("서버가 시작되었습니다.")
# 클라이언트 접속 처리 스레드
threading.Thread(target=self.accept_connections, daemon=True).start()
except Exception as e:
messagebox.showerror("오류", f"서버 시작 실패: {str(e)}")
self.status_var.set("서버 시작 실패")
def start_client(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.ip_entry.get(), 5555))
# 서버로부터 키 수신
self.shared_key = self.socket.recv(2048)
self.salt = self.socket.recv(16)
self.fernet = Fernet(self.shared_key)
self.status_var.set("서버에 연결됨")
self.update_chat("서버에 연결되었습니다.")
# 메시지 수신 스레드
threading.Thread(target=self.receive_messages, daemon=True).start()
except Exception as e:
messagebox.showerror("오류", f"연결 실패: {str(e)}")
self.status_var.set("연결 실패")
def start_connection(self):
self.nickname = self.nickname_entry.get()
if not self.nickname:
messagebox.showwarning("경고", "닉네임을 입력해주세요.")
return
if self.role_var.get() == "server":
self.start_server()
else:
self.start_client()
# UI 업데이트
self.connect_button.config(state=tk.DISABLED)
self.nickname_entry.config(state=tk.DISABLED)
self.ip_entry.config(state=tk.DISABLED)
def accept_connections(self):
while True:
try:
client, address = self.socket.accept()
self.clients.append(client)
# 클라이언트에게 키와 salt 전송
client.send(self.shared_key)
client.send(self.salt)
# 클라이언트 처리 스레드
threading.Thread(target=self.handle_client,
args=(client,), daemon=True).start()
self.update_chat(f"새로운 클라이언트가 연결되었습니다: {address}")
except Exception as e:
print(f"클라이언트 접속 처리 중 오류: {e}")
break
def handle_client(self, client):
while True:
try:
encrypted_message = client.recv(2048)
if not encrypted_message:
break
# 메시지 검증 및 해독
message = self._decrypt_message(encrypted_message)
# 브로드캐스트
self.broadcast(encrypted_message, client)
# UI 업데이트
self.update_chat(
f"{message['nickname']} ({message['timestamp']}): {message['content']}"
)
except Exception as e:
print(f"클라이언트 처리 중 오류: {e}")
self.clients.remove(client)
client.close()
break
def broadcast(self, message, sender=None):
for client in self.clients:
if client != sender:
try:
client.send(message)
except:
self.clients.remove(client)
def receive_messages(self):
while True:
try:
encrypted_message = self.socket.recv(2048)
message = self._decrypt_message(encrypted_message)
self.update_chat(
f"{message['nickname']} ({message['timestamp']}): {message['content']}"
)
except Exception as e:
print(f"메시지 수신 중 오류: {e}")
self.status_var.set("연결이 종료되었습니다.")
break
def send_message(self):
if not self.socket:
messagebox.showwarning("경고", "서버에 연결되어 있지 않습니다.")
return
message = self.message_entry.get().strip()
if not message:
return
try:
message_data = {
'nickname': self.nickname,
'content': message,
'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
encrypted_message = self._encrypt_message(message_data)
if self.role_var.get() == "server":
self.broadcast(encrypted_message)
self.update_chat(
f"{message_data['nickname']} ({message_data['timestamp']}): {message}"
)
else:
self.socket.send(encrypted_message)
self.message_entry.delete(0, tk.END)
except Exception as e:
messagebox.showerror("오류", f"메시지 전송 실패: {str(e)}")
def run(self):
self.root.mainloop()
if __name__ == "__main__":
app = SecureMessengerGUI()
app.run()