import json
from typing import Dict, Any
class GraphQLResponseParser:
def __init__(self, file_path: str):
"""
Parameters:
file_path (str): txt 파일 경로
"""
self.file_path = file_path
self.schema_data = self._load_file()
def _load_file(self) -> Dict[str, Any]:
"""txt 파일을 로드하고 JSON으로 파싱합니다."""
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return json.loads(content)
def print_schema_structure(self):
"""스키마 구조를 보기 쉽게 출력합니다."""
schema = self.schema_data['data']['__schema']
# 기본 타입 정보 출력
print("\n=== 기본 타입 정보 ===")
print(f"Query 타입: {schema['queryType']['name'] if schema['queryType'] else 'None'}")
print(f"Mutation 타입: {schema['mutationType']['name'] if schema['mutationType'] else 'None'}")
print(f"Subscription 타입: {schema['subscriptionType']['name'] if schema['subscriptionType'] else 'None'}")
# 모든 타입 정보 출력
print("\n=== 타입 정보 ===")
for type_info in schema['types']:
# 내장 타입 건너뛰기
if type_info['name'].startswith('__'):
continue
print(f"\n{'='*50}")
print(f"타입 이름: {type_info['name']}")
print(f"종류: {type_info['kind']}")
if type_info['description']:
print(f"설명: {type_info['description']}")
# 필드 정보 출력
if type_info['fields']:
print("\n필드:")
for field in type_info['fields']:
print(f"\n • {field['name']}")
if field['description']:
print(f" 설명: {field['description']}")
# 필드 타입 출력
field_type = self._get_complete_type(field['type'])
print(f" 타입: {field_type}")
# 인자 정보 출력
if field['args']:
print(" 인자:")
for arg in field['args']:
arg_type = self._get_complete_type(arg['type'])
print(f" - {arg['name']}: {arg_type}")
def _get_complete_type(self, type_info: Dict[str, Any]) -> str:
"""타입 정보를 완전한 문자열로 변환합니다."""
if type_info['kind'] == 'NON_NULL':
return f"{self._get_complete_type(type_info['ofType'])}!"
elif type_info['kind'] == 'LIST':
return f"[{self._get_complete_type(type_info['ofType'])}]"
return type_info['name']
def save_formatted_output(self, output_file: str):
"""포맷된 출력을 파일로 저장합니다."""
import sys
from io import StringIO
# 표준 출력을 임시로 문자열 버퍼로 리다이렉트
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
# 스키마 구조 출력
self.print_schema_structure()
# 원래 표준 출력으로 복구
sys.stdout = old_stdout
# 결과를 파일로 저장
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output.getvalue())
output.close()
def get_type_details(self, type_name: str):
"""특정 타입의 상세 정보를 출력합니다."""
schema = self.schema_data['data']['__schema']
for type_info in schema['types']:
if type_info['name'] == type_name:
print(f"\n=== {type_name} 상세 정보 ===")
print(f"종류: {type_info['kind']}")
if type_info['description']:
print(f"설명: {type_info['description']}")
if type_info['fields']:
print("\n필드:")
for field in type_info['fields']:
print(f"\n • {field['name']}")
if field['description']:
print(f" 설명: {field['description']}")
print(f" 타입: {self._get_complete_type(field['type'])}")
return
print(f"타입 '{type_name}'을(를) 찾을 수 없습니다.")
# 사용 예시
if __name__ == "__main__":
# 파일 경로를 지정하세요
file_path = "your_schema_response.txt"
output_file = "formatted_schema.txt"
parser = GraphQLResponseParser(file_path)
# 전체 스키마 구조를 보기 좋게 출력하고 파일로 저장
parser.save_formatted_output(output_file)
# 특정 타입의 상세 정보 확인 (선택적)
# parser.get_type_details("User") # "User" 타입의 상세 정보를 보고 싶을 때
import json
from typing import Dict, Any
from colorama import init, Fore, Style
class GraphQLResponseParser:
def __init__(self, file_path: str):
"""
Parameters:
file_path (str): txt 파일 경로
"""
init() # colorama 초기화
self.file_path = file_path
self.schema_data = self._load_file()
def _load_file(self) -> Dict[str, Any]:
"""txt 파일을 로드하고 JSON으로 파싱합니다."""
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return json.loads(content)
def print_schema_structure(self):
"""스키마 구조를 보기 쉽게 출력합니다."""
schema = self.schema_data['data']['__schema']
# 제목 출력
print(f"\n{Fore.CYAN}{'='*80}")
print(f"{Fore.YELLOW}GraphQL 스키마 구조")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}\n")
# 기본 타입 정보 출력
print(f"{Fore.GREEN}▶ 루트 타입{Style.RESET_ALL}")
print(f" ├─ Query: {Fore.YELLOW}{schema['queryType']['name'] if schema['queryType'] else 'None'}{Style.RESET_ALL}")
print(f" ├─ Mutation: {Fore.YELLOW}{schema['mutationType']['name'] if schema['mutationType'] else 'None'}{Style.RESET_ALL}")
print(f" └─ Subscription: {Fore.YELLOW}{schema['subscriptionType']['name'] if schema['subscriptionType'] else 'None'}{Style.RESET_ALL}")
# 타입별로 분류
object_types = []
input_types = []
enum_types = []
interface_types = []
scalar_types = []
for type_info in schema['types']:
if type_info['name'].startswith('__'):
continue
if type_info['kind'] == 'OBJECT':
object_types.append(type_info)
elif type_info['kind'] == 'INPUT_OBJECT':
input_types.append(type_info)
elif type_info['kind'] == 'ENUM':
enum_types.append(type_info)
elif type_info['kind'] == 'INTERFACE':
interface_types.append(type_info)
elif type_info['kind'] == 'SCALAR':
scalar_types.append(type_info)
# 객체 타입 출력
if object_types:
print(f"\n{Fore.GREEN}▶ 객체 타입{Style.RESET_ALL}")
self._print_types(object_types)
# 입력 타입 출력
if input_types:
print(f"\n{Fore.GREEN}▶ 입력 타입{Style.RESET_ALL}")
self._print_types(input_types)
# Enum 타입 출력
if enum_types:
print(f"\n{Fore.GREEN}▶ Enum 타입{Style.RESET_ALL}")
self._print_enum_types(enum_types)
# 인터페이스 타입 출력
if interface_types:
print(f"\n{Fore.GREEN}▶ 인터페이스 타입{Style.RESET_ALL}")
self._print_types(interface_types)
# 스칼라 타입 출력
if scalar_types:
print(f"\n{Fore.GREEN}▶ 스칼라 타입{Style.RESET_ALL}")
for type_info in scalar_types:
print(f" └─ {Fore.YELLOW}{type_info['name']}{Style.RESET_ALL}")
def _print_types(self, types):
"""타입 정보를 트리 구조로 출력합니다."""
for i, type_info in enumerate(types):
is_last_type = i == len(types) - 1
prefix = " └─" if is_last_type else " ├─"
print(f"{prefix} {Fore.YELLOW}{type_info['name']}{Style.RESET_ALL}")
if type_info['description']:
desc_prefix = " " if is_last_type else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{type_info['description']}{Style.RESET_ALL}")
if type_info['fields']:
self._print_fields(type_info['fields'], " " if is_last_type else " │ ")
def _print_enum_types(self, types):
"""Enum 타입 정보를 출력합니다."""
for i, type_info in enumerate(types):
is_last_type = i == len(types) - 1
prefix = " └─" if is_last_type else " ├─"
print(f"{prefix} {Fore.YELLOW}{type_info['name']}{Style.RESET_ALL}")
if type_info['enumValues']:
values_prefix = " " if is_last_type else " │ "
for j, enum_value in enumerate(type_info['enumValues']):
is_last_value = j == len(type_info['enumValues']) - 1
value_prefix = f"{values_prefix}└─" if is_last_value else f"{values_prefix}├─"
print(f"{value_prefix} {enum_value['name']}")
def _print_fields(self, fields, base_prefix):
"""필드 정보를 트리 구조로 출력합니다."""
for i, field in enumerate(fields):
is_last_field = i == len(fields) - 1
prefix = f"{base_prefix}└─" if is_last_field else f"{base_prefix}├─"
# 필드 이름과 타입 출력
field_type = self._get_complete_type(field['type'])
print(f"{prefix} {Fore.WHITE}{field['name']}{Style.RESET_ALL}: {Fore.MAGENTA}{field_type}{Style.RESET_ALL}")
# 필드 설명 출력
if field['description']:
desc_prefix = f"{base_prefix} " if is_last_field else f"{base_prefix}│ "
print(f"{desc_prefix}📝 {Fore.CYAN}{field['description']}{Style.RESET_ALL}")
# 인자 정보 출력
if field['args']:
args_prefix = f"{base_prefix} " if is_last_field else f"{base_prefix}│ "
for j, arg in enumerate(field['args']):
is_last_arg = j == len(field['args']) - 1
arg_prefix = f"{args_prefix}└─" if is_last_arg else f"{args_prefix}├─"
arg_type = self._get_complete_type(arg['type'])
print(f"{arg_prefix} 📎 {Fore.WHITE}{arg['name']}{Style.RESET_ALL}: {Fore.MAGENTA}{arg_type}{Style.RESET_ALL}")
def _get_complete_type(self, type_info: Dict[str, Any]) -> str:
"""타입 정보를 완전한 문자열로 변환합니다."""
if type_info['kind'] == 'NON_NULL':
return f"{self._get_complete_type(type_info['ofType'])}!"
elif type_info['kind'] == 'LIST':
return f"[{self._get_complete_type(type_info['ofType'])}]"
return type_info['name']
def save_formatted_output(self, output_file: str):
"""포맷된 출력을 파일로 저장합니다."""
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.print_schema_structure()
sys.stdout = old_stdout
# ANSI 이스케이프 코드를 제거하고 저장
from re import sub
ansi_escape = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(ansi_escape)
output.close()
# 사용 예시
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <입력파일경로> [출력파일경로]{Style.RESET_ALL}")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "formatted_schema.txt"
try:
parser = GraphQLResponseParser(input_file)
parser.print_schema_structure() # 콘솔에 출력
parser.save_formatted_output(output_file) # 파일로 저장
print(f"\n{Fore.GREEN}✓ 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
import json
from typing import Dict, Any, List, Set
from colorama import init, Fore, Style
class GraphQLResponseParser:
def __init__(self, file_path: str):
init()
self.file_path = file_path
self.schema_data = self._load_file()
# 중요 정보 관련 키워드 정의
self.security_keywords = {
'인증/계정': {
'fields': {'password', 'token', 'auth', 'authentication', 'credential', 'secret', 'key',
'username', 'email', 'account', 'login', 'session', 'user', 'oauth', 'permission',
'role', 'access', 'identity', 'refresh', 'jwt', 'apikey', 'api_key'},
'color': Fore.RED
},
'개인정보': {
'fields': {'name', 'phone', 'address', 'birthday', 'gender', 'ssn', 'social', 'person',
'personal', 'private', 'profile', 'contact', 'age', 'birth', 'national'},
'color': Fore.YELLOW
},
'결제정보': {
'fields': {'payment', 'card', 'credit', 'debit', 'bank', 'account', 'price', 'money',
'balance', 'transaction', 'billing', 'invoice', 'order', 'purchase'},
'color': Fore.MAGENTA
}
}
def _load_file(self) -> Dict[str, Any]:
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return json.loads(content)
def _check_security_category(self, name: str) -> tuple:
"""필드나 타입 이름이 보안 관련 키워드에 해당하는지 확인합니다."""
name_lower = name.lower()
for category, info in self.security_keywords.items():
for keyword in info['fields']:
if keyword.lower() in name_lower:
return category, info['color']
return None, None
def print_schema_structure(self):
schema = self.schema_data['data']['__schema']
# 타이틀 출력
self._print_title()
# 중요 정보 타입 먼저 출력
security_types = self._collect_security_types(schema['types'])
if security_types:
print(f"\n{Fore.RED}🔒 보안 민감 정보{Style.RESET_ALL}")
for category in self.security_keywords.keys():
category_types = security_types.get(category, [])
if category_types:
print(f"\n{Fore.YELLOW}▶ {category} 관련 타입{Style.RESET_ALL}")
self._print_types(category_types, True)
# 기본 타입 정보 출력
print(f"\n{Fore.GREEN}▶ 루트 타입{Style.RESET_ALL}")
print(f" ├─ Query: {Fore.YELLOW}{schema['queryType']['name'] if schema['queryType'] else 'None'}{Style.RESET_ALL}")
print(f" ├─ Mutation: {Fore.YELLOW}{schema['mutationType']['name'] if schema['mutationType'] else 'None'}{Style.RESET_ALL}")
print(f" └─ Subscription: {Fore.YELLOW}{schema['subscriptionType']['name'] if schema['subscriptionType'] else 'None'}{Style.RESET_ALL}")
# 나머지 타입들 분류해서 출력
remaining_types = [t for t in schema['types'] if not t['name'].startswith('__')]
self._print_categorized_types(remaining_types)
def _collect_security_types(self, types: List[Dict]) -> Dict[str, List[Dict]]:
"""보안 관련 타입들을 수집합니다."""
security_types = {category: [] for category in self.security_keywords.keys()}
for type_info in types:
if type_info['name'].startswith('__'):
continue
# 타입 이름으로 분류
category, _ = self._check_security_category(type_info['name'])
if category:
security_types[category].append(type_info)
continue
# 필드 이름으로 분류
if type_info.get('fields'):
for field in type_info['fields']:
category, _ = self._check_security_category(field['name'])
if category and type_info not in security_types[category]:
security_types[category].append(type_info)
break
return security_types
def _print_title(self):
print(f"\n{Fore.CYAN}{'='*80}")
print(f"{Fore.YELLOW}GraphQL 스키마 분석 결과")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
# 범례 출력
print(f"\n{Fore.WHITE}【범례】")
for category, info in self.security_keywords.items():
print(f"{info['color']}■ {category}{Style.RESET_ALL}", end=' ')
print()
def _print_categorized_types(self, types: List[Dict]):
"""타입들을 종류별로 분류하여 출력합니다."""
type_categories = {
'OBJECT': ('객체 타입', []),
'INPUT_OBJECT': ('입력 타입', []),
'ENUM': ('Enum 타입', []),
'INTERFACE': ('인터페이스 타입', []),
'SCALAR': ('스칼라 타입', [])
}
# 타입 분류
for type_info in types:
if type_info['kind'] in type_categories:
type_categories[type_info['kind']][1].append(type_info)
# 분류된 타입 출력
for kind, (title, type_list) in type_categories.items():
if type_list:
print(f"\n{Fore.GREEN}▶ {title}{Style.RESET_ALL}")
if kind == 'ENUM':
self._print_enum_types(type_list)
else:
self._print_types(type_list)
def _print_types(self, types: List[Dict], is_security: bool = False):
"""타입 정보를 트리 구조로 출력합니다."""
for i, type_info in enumerate(types):
is_last_type = i == len(types) - 1
prefix = " └─" if is_last_type else " ├─"
# 타입 이름 출력 (보안 관련 타입은 색상 강조)
category, color = self._check_security_category(type_info['name'])
name_color = color if color else Fore.YELLOW
print(f"{prefix} {name_color}{type_info['name']}{Style.RESET_ALL}")
if type_info['description']:
desc_prefix = " " if is_last_type else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{type_info['description']}{Style.RESET_ALL}")
if type_info['fields']:
self._print_fields(type_info['fields'], " " if is_last_type else " │ ", is_security)
def _print_fields(self, fields: List[Dict], base_prefix: str, is_security: bool = False):
"""필드 정보를 트리 구조로 출력합니다."""
for i, field in enumerate(fields):
is_last_field = i == len(fields) - 1
prefix = f"{base_prefix}└─" if is_last_field else f"{base_prefix}├─"
# 필드 보안 카테고리 확인
category, color = self._check_security_category(field['name'])
# 필드 이름과 타입 출력
field_type = self._get_complete_type(field['type'])
field_color = color if color else Fore.WHITE
# 보안 관련 필드는 아이콘 추가
security_icon = "🔒 " if category else ""
category_label = f"[{category}] " if category else ""
print(f"{prefix} {security_icon}{field_color}{field['name']}{Style.RESET_ALL}: "
f"{Fore.MAGENTA}{field_type}{Style.RESET_ALL} {Fore.CYAN}{category_label}{Style.RESET_ALL}")
if field['description']:
desc_prefix = f"{base_prefix} " if is_last_field else f"{base_prefix}│ "
print(f"{desc_prefix}📝 {Fore.CYAN}{field['description']}{Style.RESET_ALL}")
if field['args']:
self._print_args(field['args'], base_prefix, is_last_field)
def _print_args(self, args: List[Dict], base_prefix: str, is_last_field: bool):
"""인자 정보를 출력합니다."""
args_prefix = f"{base_prefix} " if is_last_field else f"{base_prefix}│ "
for j, arg in enumerate(args):
is_last_arg = j == len(args) - 1
arg_prefix = f"{args_prefix}└─" if is_last_arg else f"{args_prefix}├─"
# 인자 보안 카테고리 확인
category, color = self._check_security_category(arg['name'])
arg_color = color if color else Fore.WHITE
security_icon = "🔒 " if category else ""
category_label = f"[{category}] " if category else ""
arg_type = self._get_complete_type(arg['type'])
print(f"{arg_prefix} 📎 {security_icon}{arg_color}{arg['name']}{Style.RESET_ALL}: "
f"{Fore.MAGENTA}{arg_type}{Style.RESET_ALL} {Fore.CYAN}{category_label}{Style.RESET_ALL}")
def _get_complete_type(self, type_info: Dict[str, Any]) -> str:
if type_info['kind'] == 'NON_NULL':
return f"{self._get_complete_type(type_info['ofType'])}!"
elif type_info['kind'] == 'LIST':
return f"[{self._get_complete_type(type_info['ofType'])}]"
return type_info['name']
def save_formatted_output(self, output_file: str):
"""포맷된 출력을 파일로 저장합니다."""
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.print_schema_structure()
sys.stdout = old_stdout
# ANSI 이스케이프 코드 제거
from re import sub
ansi_escape = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(ansi_escape)
output.close()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <입력파일경로> [출력파일경로]{Style.RESET_ALL}")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "formatted_schema.txt"
try:
parser = GraphQLResponseParser(input_file)
parser.print_schema_structure() # 콘솔에 출력
parser.save_formatted_output(output_file) # 파일로 저장
print(f"\n{Fore.GREEN}✓ 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
import json
from typing import Dict, Any, List, Set
from colorama import init, Fore, Style
class GraphQLResponseParser:
def __init__(self, file_path: str):
init()
self.file_path = file_path
self.schema_data = self._load_file()
# 중요 정보 관련 키워드 정의
self.security_keywords = {
'인증/계정': {
'fields': {'password', 'token', 'auth', 'authentication', 'credential', 'secret', 'key',
'username', 'email', 'account', 'login', 'session', 'user', 'oauth', 'permission',
'role', 'access', 'identity', 'refresh', 'jwt', 'apikey', 'api_key'},
'color': Fore.RED
},
'개인정보': {
'fields': {'name', 'phone', 'address', 'birthday', 'gender', 'ssn', 'social', 'person',
'personal', 'private', 'profile', 'contact', 'age', 'birth', 'national'},
'color': Fore.YELLOW
},
'결제정보': {
'fields': {'payment', 'card', 'credit', 'debit', 'bank', 'account', 'price', 'money',
'balance', 'transaction', 'billing', 'invoice', 'order', 'purchase'},
'color': Fore.MAGENTA
}
}
def _load_file(self) -> Dict[str, Any]:
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return json.loads(content)
def _check_security_category(self, name: str) -> tuple:
"""필드나 타입 이름이 보안 관련 키워드에 해당하는지 확인합니다."""
name_lower = name.lower()
for category, info in self.security_keywords.items():
for keyword in info['fields']:
if keyword.lower() in name_lower:
return category, info['color']
return None, None
def print_schema_structure(self):
schema = self.schema_data['data']['__schema']
# 타이틀 출력
self._print_title()
# 중요 정보 타입 먼저 출력
security_types = self._collect_security_types(schema['types'])
if security_types:
print(f"\n{Fore.RED}🔒 보안 민감 정보{Style.RESET_ALL}")
for category in self.security_keywords.keys():
category_types = security_types.get(category, [])
if category_types:
print(f"\n{Fore.YELLOW}▶ {category} 관련 타입{Style.RESET_ALL}")
self._print_types(category_types, True)
# 기본 타입 정보 출력
print(f"\n{Fore.GREEN}▶ 루트 타입{Style.RESET_ALL}")
if schema.get('queryType'):
print(f" ├─ Query: {Fore.YELLOW}{schema['queryType']['name'] if schema['queryType']['name'] else 'None'}{Style.RESET_ALL}")
if schema.get('mutationType'):
print(f" ├─ Mutation: {Fore.YELLOW}{schema['mutationType']['name'] if schema['mutationType'] else 'None'}{Style.RESET_ALL}")
if schema.get('subscriptionType'):
print(f" └─ Subscription: {Fore.YELLOW}{schema['subscriptionType']['name'] if schema['subscriptionType'] else 'None'}{Style.RESET_ALL}")
# 나머지 타입들 분류해서 출력
remaining_types = [t for t in schema['types'] if not t['name'].startswith('__')]
self._print_categorized_types(remaining_types)
def _print_enum_types(self, types: List[Dict]):
"""Enum 타입 정보를 출력합니다."""
for i, type_info in enumerate(types):
is_last_type = i == len(types) - 1
prefix = " └─" if is_last_type else " ├─"
# 보안 카테고리 확인
category, color = self._check_security_category(type_info['name'])
type_color = color if color else Fore.YELLOW
print(f"{prefix} {type_color}{type_info['name']}{Style.RESET_ALL}")
if type_info.get('description'):
desc_prefix = " " if is_last_type else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{type_info['description']}{Style.RESET_ALL}")
if type_info.get('enumValues'):
values_prefix = " " if is_last_type else " │ "
for j, enum_value in enumerate(type_info['enumValues']):
is_last_value = j == len(type_info['enumValues']) - 1
value_prefix = f"{values_prefix}└─" if is_last_value else f"{values_prefix}├─"
value_desc = f" - {enum_value['description']}" if enum_value.get('description') else ""
print(f"{value_prefix} {Fore.WHITE}{enum_value['name']}{Style.RESET_ALL}{Fore.CYAN}{value_desc}{Style.RESET_ALL}")
def _collect_security_types(self, types: List[Dict]) -> Dict[str, List[Dict]]:
"""보안 관련 타입들을 수집합니다."""
security_types = {category: [] for category in self.security_keywords.keys()}
for type_info in types:
if type_info['name'].startswith('__'):
continue
# 타입 이름으로 분류
category, _ = self._check_security_category(type_info['name'])
if category:
security_types[category].append(type_info)
continue
# 필드 이름으로 분류
if type_info.get('fields'):
for field in type_info['fields']:
category, _ = self._check_security_category(field['name'])
if category and type_info not in security_types[category]:
security_types[category].append(type_info)
break
return security_types
def _print_title(self):
print(f"\n{Fore.CYAN}{'='*80}")
print(f"{Fore.YELLOW}GraphQL 스키마 분석 결과")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
# 범례 출력
print(f"\n{Fore.WHITE}【범례】")
for category, info in self.security_keywords.items():
print(f"{info['color']}■ {category}{Style.RESET_ALL}", end=' ')
print()
def _print_categorized_types(self, types: List[Dict]):
"""타입들을 종류별로 분류하여 출력합니다."""
type_categories = {
'OBJECT': ('객체 타입', []),
'INPUT_OBJECT': ('입력 타입', []),
'ENUM': ('Enum 타입', []),
'INTERFACE': ('인터페이스 타입', []),
'SCALAR': ('스칼라 타입', [])
}
# 타입 분류
for type_info in types:
kind = type_info.get('kind')
if kind in type_categories:
type_categories[kind][1].append(type_info)
# 분류된 타입 출력
for kind, (title, type_list) in type_categories.items():
if type_list:
print(f"\n{Fore.GREEN}▶ {title}{Style.RESET_ALL}")
if kind == 'ENUM':
self._print_enum_types(type_list)
else:
self._print_types(type_list)
def _print_fields(self, fields: List[Dict], base_prefix: str, is_security: bool = False):
"""필드 정보를 트리 구조로 출력합니다."""
if not fields: # fields가 None이면 반환
return
for i, field in enumerate(fields):
is_last_field = i == len(fields) - 1
prefix = f"{base_prefix}└─" if is_last_field else f"{base_prefix}├─"
# 필드 보안 카테고리 확인
category, color = self._check_security_category(field['name'])
field_color = color if color else Fore.WHITE
# 보안 관련 필드는 아이콘 추가
security_icon = "🔒 " if category else ""
category_label = f"[{category}] " if category else ""
field_type = self._get_complete_type(field['type'])
print(f"{prefix} {security_icon}{field_color}{field['name']}{Style.RESET_ALL}: "
f"{Fore.MAGENTA}{field_type}{Style.RESET_ALL} {Fore.CYAN}{category_label}{Style.RESET_ALL}")
if field.get('description'):
desc_prefix = f"{base_prefix} " if is_last_field else f"{base_prefix}│ "
print(f"{desc_prefix}📝 {Fore.CYAN}{field['description']}{Style.RESET_ALL}")
if field.get('args'):
self._print_args(field['args'], base_prefix, is_last_field)
def _print_types(self, types: List[Dict], is_security: bool = False):
"""타입 정보를 트리 구조로 출력합니다."""
for i, type_info in enumerate(types):
is_last_type = i == len(types) - 1
prefix = " └─" if is_last_type else " ├─"
# 타입 이름 출력 (보안 관련 타입은 색상 강조)
category, color = self._check_security_category(type_info['name'])
name_color = color if color else Fore.YELLOW
print(f"{prefix} {name_color}{type_info['name']}{Style.RESET_ALL}")
if type_info.get('description'):
desc_prefix = " " if is_last_type else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{type_info['description']}{Style.RESET_ALL}")
if type_info.get('fields'):
self._print_fields(type_info['fields'], " " if is_last_type else " │ ", is_security)
def _get_complete_type(self, type_info: Dict[str, Any]) -> str:
if not type_info:
return "Unknown"
if type_info.get('kind') == 'NON_NULL':
return f"{self._get_complete_type(type_info['ofType'])}!"
elif type_info.get('kind') == 'LIST':
return f"[{self._get_complete_type(type_info['ofType'])}]"
return type_info.get('name', 'Unknown')
def save_formatted_output(self, output_file: str):
"""포맷된 출력을 파일로 저장합니다."""
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.print_schema_structure()
sys.stdout = old_stdout
# ANSI 이스케이프 코드 제거
from re import sub
ansi_escape = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(ansi_escape)
output.close()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <입력파일경로> [출력파일경로]{Style.RESET_ALL}")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "formatted_schema.txt"
try:
parser = GraphQLResponseParser(input_file)
parser.print_schema_structure() # 콘솔에 출력
parser.save_formatted_output(output_file) # 파일로 저장
print(f"\n{Fore.GREEN}✓ 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
import traceback
traceback.print_exc()
import json
from typing import Dict, Any, List
from colorama import init, Fore, Style
class GraphQLResponseParser:
def __init__(self, file_path: str):
init()
self.file_path = file_path
with open(self.file_path, 'r', encoding='utf-8') as f:
self.schema_data = json.loads(f.read())
self.security_keywords = {
'인증/계정': {
'fields': {'password', 'token', 'auth', 'authentication', 'credential', 'secret', 'key',
'username', 'email', 'account', 'login', 'session', 'user', 'oauth', 'permission',
'role', 'access', 'identity', 'refresh', 'jwt', 'apikey', 'api_key'},
'color': Fore.RED
},
'개인정보': {
'fields': {'name', 'phone', 'address', 'birthday', 'gender', 'ssn', 'social', 'person',
'personal', 'private', 'profile', 'contact', 'age', 'birth', 'national'},
'color': Fore.YELLOW
},
'결제정보': {
'fields': {'payment', 'card', 'credit', 'debit', 'bank', 'account', 'price', 'money',
'balance', 'transaction', 'billing', 'invoice', 'order', 'purchase'},
'color': Fore.MAGENTA
}
}
def print_schema_structure(self):
schema = self.schema_data.get('data', {}).get('__schema', {})
self._print_header()
self._print_root_types(schema)
types = schema.get('types', [])
if not types:
print(f"{Fore.RED}스키마에서 타입 정보를 찾을 수 없습니다.{Style.RESET_ALL}")
return
# 보안 관련 타입 먼저 출력
security_types = self._filter_security_types(types)
if security_types:
self._print_security_types(security_types)
# 일반 타입 출력
self._print_normal_types(types)
def _print_header(self):
print(f"\n{Fore.CYAN}{'='*80}")
print(f"{Fore.YELLOW}GraphQL 스키마 분석 결과")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
print(f"\n{Fore.WHITE}【범례】")
for category, info in self.security_keywords.items():
print(f"{info['color']}■ {category}{Style.RESET_ALL}", end=' ')
print("\n")
def _print_root_types(self, schema):
print(f"{Fore.GREEN}▶ 루트 타입{Style.RESET_ALL}")
query = schema.get('queryType', {})
mutation = schema.get('mutationType', {})
subscription = schema.get('subscriptionType', {})
print(f" ├─ Query: {Fore.YELLOW}{query.get('name', 'None')}{Style.RESET_ALL}")
print(f" ├─ Mutation: {Fore.YELLOW}{mutation.get('name', 'None') if mutation else 'None'}{Style.RESET_ALL}")
print(f" └─ Subscription: {Fore.YELLOW}{subscription.get('name', 'None') if subscription else 'None'}{Style.RESET_ALL}")
def _filter_security_types(self, types):
security_types = {category: [] for category in self.security_keywords}
for type_info in types:
if type_info.get('name', '').startswith('__'):
continue
type_name = type_info.get('name', '').lower()
fields = type_info.get('fields', [])
# 타입 이름으로 분류
for category, info in self.security_keywords.items():
if any(keyword in type_name for keyword in info['fields']):
security_types[category].append(type_info)
break
# 필드 이름으로 분류
if fields:
for field in fields:
field_name = field.get('name', '').lower()
for category, info in self.security_keywords.items():
if any(keyword in field_name for keyword in info['fields']):
if type_info not in security_types[category]:
security_types[category].append(type_info)
break
return security_types
def _print_security_types(self, security_types):
print(f"\n{Fore.RED}🔒 보안 민감 정보{Style.RESET_ALL}")
for category, types in security_types.items():
if types:
print(f"\n{Fore.YELLOW}▶ {category} 관련 타입{Style.RESET_ALL}")
self._print_types(types)
def _print_normal_types(self, types):
type_groups = {
'OBJECT': [],
'INPUT_OBJECT': [],
'ENUM': [],
'INTERFACE': [],
'SCALAR': []
}
# 타입 분류
for type_info in types:
if type_info.get('name', '').startswith('__'):
continue
kind = type_info.get('kind')
if kind in type_groups:
type_groups[kind].append(type_info)
# 타입별 출력
type_titles = {
'OBJECT': '객체 타입',
'INPUT_OBJECT': '입력 타입',
'ENUM': 'Enum 타입',
'INTERFACE': '인터페이스 타입',
'SCALAR': '스칼라 타입'
}
for kind, title in type_titles.items():
if type_groups[kind]:
print(f"\n{Fore.GREEN}▶ {title}{Style.RESET_ALL}")
if kind == 'ENUM':
self._print_enum_types(type_groups[kind])
else:
self._print_types(type_groups[kind])
def _print_types(self, types: List[Dict]):
for i, type_info in enumerate(types):
is_last = i == len(types) - 1
prefix = " └─" if is_last else " ├─"
name = type_info.get('name', 'Unknown')
description = type_info.get('description', '')
fields = type_info.get('fields', [])
# 타입 이름 출력
print(f"{prefix} {Fore.YELLOW}{name}{Style.RESET_ALL}")
# 설명 출력
if description:
desc_prefix = " " if is_last else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{description}{Style.RESET_ALL}")
# 필드 출력
if fields:
field_prefix = " " if is_last else " │ "
self._print_type_fields(fields, field_prefix)
def _print_type_fields(self, fields: List[Dict], base_prefix: str):
for i, field in enumerate(fields):
is_last = i == len(fields) - 1
prefix = f"{base_prefix}└─" if is_last else f"{base_prefix}├─"
name = field.get('name', 'Unknown')
description = field.get('description', '')
field_type = self._get_type_string(field.get('type', {}))
args = field.get('args', [])
# 보안 관련 필드 확인
category, color = self._check_security(name)
security_icon = "🔒 " if category else ""
category_label = f"[{category}] " if category else ""
name_color = color if color else Fore.WHITE
# 필드 정보 출력
print(f"{prefix} {security_icon}{name_color}{name}{Style.RESET_ALL}: "
f"{Fore.MAGENTA}{field_type}{Style.RESET_ALL} {Fore.CYAN}{category_label}{Style.RESET_ALL}")
next_prefix = f"{base_prefix} " if is_last else f"{base_prefix}│ "
# 설명 출력
if description:
print(f"{next_prefix}📝 {Fore.CYAN}{description}{Style.RESET_ALL}")
# 인자 출력
if args:
self._print_field_args(args, next_prefix)
def _print_field_args(self, args: List[Dict], base_prefix: str):
for i, arg in enumerate(args):
is_last = i == len(args) - 1
prefix = f"{base_prefix}└─" if is_last else f"{base_prefix}├─"
name = arg.get('name', 'Unknown')
arg_type = self._get_type_string(arg.get('type', {}))
default_value = arg.get('defaultValue')
description = arg.get('description', '')
# 보안 관련 인자 확인
category, color = self._check_security(name)
security_icon = "🔒 " if category else ""
category_label = f"[{category}] " if category else ""
name_color = color if color else Fore.WHITE
# 인자 정보 출력
arg_info = f"{prefix} 📎 {security_icon}{name_color}{name}{Style.RESET_ALL}: {Fore.MAGENTA}{arg_type}{Style.RESET_ALL}"
if default_value:
arg_info += f" = {default_value}"
if category:
arg_info += f" {Fore.CYAN}{category_label}{Style.RESET_ALL}"
print(arg_info)
if description:
print(f"{base_prefix} 📝 {Fore.CYAN}{description}{Style.RESET_ALL}")
def _print_enum_types(self, types: List[Dict]):
for i, type_info in enumerate(types):
is_last = i == len(types) - 1
prefix = " └─" if is_last else " ├─"
name = type_info.get('name', 'Unknown')
description = type_info.get('description', '')
enum_values = type_info.get('enumValues', [])
# Enum 타입 이름 출력
print(f"{prefix} {Fore.YELLOW}{name}{Style.RESET_ALL}")
# 설명 출력
if description:
desc_prefix = " " if is_last else " │ "
print(f"{desc_prefix}📝 {Fore.CYAN}{description}{Style.RESET_ALL}")
# Enum 값 출력
if enum_values:
values_prefix = " " if is_last else " │ "
for j, value in enumerate(enum_values):
is_last_value = j == len(enum_values) - 1
value_prefix = f"{values_prefix}└─" if is_last_value else f"{values_prefix}├─"
value_name = value.get('name', 'Unknown')
value_description = value.get('description', '')
print(f"{value_prefix} {Fore.WHITE}{value_name}{Style.RESET_ALL}")
if value_description:
print(f"{values_prefix} 📝 {Fore.CYAN}{value_description}{Style.RESET_ALL}")
def _check_security(self, name: str) -> tuple:
name_lower = name.lower()
for category, info in self.security_keywords.items():
if any(keyword in name_lower for keyword in info['fields']):
return category, info['color']
return None, None
def _get_type_string(self, type_info: Dict) -> str:
if not type_info:
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_type_string(of_type)}!"
elif kind == 'LIST':
return f"[{self._get_type_string(of_type)}]"
elif name:
return name
return 'Unknown'
def save_to_file(self, output_file: str):
"""결과를 파일로 저장합니다."""
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.print_schema_structure()
sys.stdout = old_stdout
# ANSI 코드 제거
from re import sub
ansi_escape = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(ansi_escape)
output.close()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <입력파일경로> [출력파일경로]{Style.RESET_ALL}")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "formatted_schema.txt"
try:
parser = GraphQLResponseParser(input_file)
parser.print_schema_structure() # 콘솔에 출력
parser.save_to_file(output_file) # 파일로 저장
print(f"\n{Fore.GREEN}✓ 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED
import json
from typing import Dict, Any, List
from colorama import init, Fore, Style
class GraphQLSchemaAnalyzer:
def __init__(self, file_path: str):
init()
self.file_path = file_path
with open(self.file_path, 'r', encoding='utf-8') as f:
self.schema_data = json.loads(f.read())
# 중요 정보 분류 기준
self.security_patterns = {
'인증': {
'patterns': ['auth', 'token', 'password', 'secret', 'credential', 'login', 'session'],
'color': Fore.RED
},
'개인정보': {
'patterns': ['user', 'profile', 'email', 'phone', 'address', 'personal', 'contact'],
'color': Fore.YELLOW
},
'권한': {
'patterns': ['permission', 'role', 'access', 'scope', 'admin', 'authorization'],
'color': Fore.GREEN
}
}
def analyze(self):
"""스키마 분석을 실행합니다."""
schema = self.schema_data.get('data', {}).get('__schema', {})
if not schema:
raise ValueError("유효한 GraphQL 스키마를 찾을 수 없습니다.")
# 분석 실행
self._print_header()
self._analyze_root_types(schema)
self._analyze_security_types(schema.get('types', []))
self._analyze_other_types(schema.get('types', []))
def _print_header(self):
print(f"\n{Fore.BLUE}GraphQL 스키마 보안 분석 보고서{Style.RESET_ALL}")
print(f"{Fore.BLUE}{'=' * 50}{Style.RESET_ALL}")
def _analyze_root_types(self, schema: Dict):
"""루트 타입 분석"""
print(f"\n{Fore.BLUE}[1] 루트 작업 타입{Style.RESET_ALL}")
print(f"• Query: {schema.get('queryType', {}).get('name', 'None')}")
print(f"• Mutation: {schema.get('mutationType', {}).get('name', 'None')}")
print(f"• Subscription: {schema.get('subscriptionType', {}).get('name', 'None')}")
def _analyze_security_types(self, types: List[Dict]):
"""보안 관련 타입 분석"""
print(f"\n{Fore.BLUE}[2] 보안 민감 정보 분석{Style.RESET_ALL}")
security_fields = []
for type_info in types:
if type_info.get('name', '').startswith('__'):
continue
fields = type_info.get('fields', [])
if not fields:
continue
for field in fields:
field_name = field.get('name', '').lower()
type_name = type_info.get('name', '')
for category, info in self.security_patterns.items():
if any(pattern in field_name for pattern in info['patterns']):
security_fields.append({
'category': category,
'type': type_name,
'field': field.get('name'),
'field_type': self._get_field_type(field.get('type', {})),
'color': info['color']
})
# 카테고리별로 정리해서 출력
if security_fields:
by_category = {}
for field in security_fields:
category = field['category']
if category not in by_category:
by_category[category] = []
by_category[category].append(field)
for category, fields in by_category.items():
print(f"\n{fields[0]['color']}▶ {category} 관련 필드{Style.RESET_ALL}")
for field in fields:
print(f" • {field['type']}.{field['field']}: {field['field_type']}")
else:
print("보안 관련 필드를 찾을 수 없습니다.")
def _analyze_other_types(self, types: List[Dict]):
"""주요 타입 구조 분석"""
print(f"\n{Fore.BLUE}[3] 주요 타입 구조{Style.RESET_ALL}")
type_counts = {
'OBJECT': 0,
'INPUT_OBJECT': 0,
'INTERFACE': 0,
'ENUM': 0,
'SCALAR': 0
}
main_types = []
for type_info in types:
kind = type_info.get('kind')
if kind in type_counts:
type_counts[kind] += 1
# 주요 타입 (필드가 5개 이상인 객체) 수집
if kind == 'OBJECT' and not type_info.get('name', '').startswith('__'):
fields = type_info.get('fields', [])
if len(fields) >= 5:
main_types.append({
'name': type_info.get('name'),
'field_count': len(fields)
})
# 타입 통계 출력
print("\n▶ 타입 통계")
print(f" • 객체 타입: {type_counts['OBJECT']}")
print(f" • 입력 타입: {type_counts['INPUT_OBJECT']}")
print(f" • 인터페이스: {type_counts['INTERFACE']}")
print(f" • Enum 타입: {type_counts['ENUM']}")
print(f" • 스칼라 타입: {type_counts['SCALAR']}")
# 주요 타입 출력
if main_types:
print("\n▶ 주요 타입 (필드 5개 이상)")
main_types.sort(key=lambda x: x['field_count'], reverse=True)
for type_info in main_types[:10]: # 상위 10개만 표시
print(f" • {type_info['name']}: {type_info['field_count']} fields")
def _get_field_type(self, type_info: Dict) -> str:
"""필드 타입 정보를 문자열로 변환"""
if not type_info:
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_field_type(of_type)}!"
elif kind == 'LIST':
return f"[{self._get_field_type(of_type)}]"
elif name:
return name
return 'Unknown'
def save_report(self, output_file: str):
"""분석 보고서를 파일로 저장"""
import sys
from io import StringIO
# 출력 캡처
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.analyze()
sys.stdout = old_stdout
# ANSI 코드 제거
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0?]*[ -/]*[@-~])', '', output.getvalue())
# 파일 저장
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
output.close()
print(f"\n{Fore.GREEN}분석 보고서가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "schema_analysis_report.txt"
try:
analyzer = GraphQLSchemaAnalyzer(input_file)
analyzer.analyze() # 콘솔에 출력
analyzer.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
import json
from typing import Dict, Any, List, Optional
from colorama import init, Fore, Style
class GraphQLSchemaAnalyzer:
def __init__(self, file_path: str):
"""GraphQL 스키마 분석기 초기화"""
init() # colorama 초기화
self.schema_data = self._load_schema(file_path)
# 보안 관련 패턴 정의
self.security_patterns = {
'인증': {
'patterns': ['auth', 'token', 'password', 'secret', 'credential', 'login', 'session'],
'color': Fore.RED
},
'개인정보': {
'patterns': ['user', 'profile', 'email', 'phone', 'address', 'personal', 'contact'],
'color': Fore.YELLOW
},
'권한': {
'patterns': ['permission', 'role', 'access', 'scope', 'admin', 'authorization'],
'color': Fore.GREEN
}
}
def _load_schema(self, file_path: str) -> Dict:
"""스키마 파일 로드"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError("JSON 데이터가 딕셔너리 형식이 아닙니다.")
return data
except json.JSONDecodeError:
raise ValueError("유효하지 않은 JSON 형식입니다.")
except Exception as e:
raise ValueError(f"파일 로드 중 오류 발생: {str(e)}")
def analyze(self):
"""스키마 분석 실행"""
try:
schema = self.schema_data.get('data', {}).get('__schema', {})
if not schema:
raise ValueError("스키마 데이터를 찾을 수 없습니다.")
self._print_header()
self._analyze_root_types(schema)
self._analyze_security_fields(schema)
self._analyze_type_statistics(schema)
except Exception as e:
print(f"{Fore.RED}분석 중 오류 발생: {str(e)}{Style.RESET_ALL}")
def _print_header(self):
"""헤더 출력"""
print(f"\n{Fore.BLUE}{'=' * 60}")
print(f"GraphQL 스키마 보안 분석 보고서")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
def _analyze_root_types(self, schema: Dict):
"""루트 타입 분석"""
print(f"{Fore.BLUE}[1] 루트 타입 분석{Style.RESET_ALL}")
query_type = schema.get('queryType', {})
mutation_type = schema.get('mutationType', {})
subscription_type = schema.get('subscriptionType', {})
print(f"• Query: {query_type.get('name', 'None')}")
print(f"• Mutation: {mutation_type.get('name', 'None') if mutation_type else 'None'}")
print(f"• Subscription: {subscription_type.get('name', 'None') if subscription_type else 'None'}\n")
def _analyze_security_fields(self, schema: Dict):
"""보안 관련 필드 분석"""
print(f"{Fore.BLUE}[2] 보안 관련 필드 분석{Style.RESET_ALL}")
types = schema.get('types', [])
security_findings = {}
for type_info in types:
# 내장 타입 제외
if type_info.get('name', '').startswith('__'):
continue
fields = type_info.get('fields', [])
if not fields:
continue
type_name = type_info.get('name', '')
for field in fields:
if not isinstance(field, dict):
continue
field_name = field.get('name', '')
if not field_name:
continue
# 보안 카테고리 확인
for category, info in self.security_patterns.items():
if any(pattern in field_name.lower() for pattern in info['patterns']):
if category not in security_findings:
security_findings[category] = []
security_findings[category].append({
'type': type_name,
'field': field_name,
'field_type': self._get_field_type(field.get('type', {}))
})
# 결과 출력
if security_findings:
for category, findings in security_findings.items():
color = self.security_patterns[category]['color']
print(f"\n{color}▶ {category} 관련 필드{Style.RESET_ALL}")
for finding in findings:
print(f" • {finding['type']}.{finding['field']}: {finding['field_type']}")
else:
print("보안 관련 필드를 찾을 수 없습니다.\n")
def _analyze_type_statistics(self, schema: Dict):
"""타입 통계 분석"""
print(f"{Fore.BLUE}[3] 타입 통계 분석{Style.RESET_ALL}")
types = schema.get('types', [])
type_counts = {'OBJECT': 0, 'INPUT_OBJECT': 0, 'INTERFACE': 0, 'ENUM': 0, 'SCALAR': 0}
complex_types = []
for type_info in types:
# 타입 종류 카운트
kind = type_info.get('kind', '')
if kind in type_counts:
type_counts[kind] += 1
# 복잡한 타입 (필드가 5개 이상) 식별
if kind == 'OBJECT' and not type_info.get('name', '').startswith('__'):
fields = type_info.get('fields', [])
if fields and len(fields) >= 5:
complex_types.append({
'name': type_info.get('name', ''),
'field_count': len(fields)
})
# 통계 출력
print("\n▶ 타입 수 통계")
print(f" • 객체 타입: {type_counts['OBJECT']}")
print(f" • 입력 타입: {type_counts['INPUT_OBJECT']}")
print(f" • 인터페이스: {type_counts['INTERFACE']}")
print(f" • Enum 타입: {type_counts['ENUM']}")
print(f" • 스칼라 타입: {type_counts['SCALAR']}")
# 복잡한 타입 출력
if complex_types:
print("\n▶ 주요 복잡 타입 (필드 5개 이상)")
complex_types.sort(key=lambda x: x['field_count'], reverse=True)
for type_info in complex_types[:10]:
print(f" • {type_info['name']}: {type_info['field_count']} fields")
print()
def _get_field_type(self, type_info: Dict) -> str:
"""필드 타입 정보를 문자열로 변환"""
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_field_type(of_type)}!"
elif kind == 'LIST':
return f"[{self._get_field_type(of_type)}]"
elif name:
return name
return 'Unknown'
def save_report(self, output_file: str):
"""분석 보고서를 파일로 저장"""
import sys
from io import StringIO
# 출력 리다이렉션
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
# 분석 실행
self.analyze()
# 원래 출력으로 복구
sys.stdout = old_stdout
# ANSI 코드 제거
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
# 파일 저장
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"{Fore.GREEN}분석 보고서가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 중 오류 발생: {str(e)}{Style.RESET_ALL}")
finally:
output.close()
def main():
"""메인 실행 함수"""
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "schema_analysis_report.txt"
try:
analyzer = GraphQLSchemaAnalyzer(input_file)
analyzer.analyze() # 콘솔에 출력
analyzer.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
import json
from typing import Dict, Any, List, Optional
from colorama import init, Fore, Style
class GraphQLAnalyzer:
def __init__(self, file_path: str):
init()
self.schema_data = self._load_schema(file_path)
self.security_patterns = {
'인증': {
'patterns': ['auth', 'token', 'password', 'secret', 'credential', 'login', 'session'],
'color': Fore.RED
},
'개인정보': {
'patterns': ['user', 'profile', 'email', 'phone', 'address', 'personal', 'contact'],
'color': Fore.YELLOW
},
'권한': {
'patterns': ['permission', 'role', 'access', 'scope', 'admin', 'authorization'],
'color': Fore.GREEN
}
}
def _load_schema(self, file_path: str) -> Dict:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError("Invalid JSON format")
return data
except Exception as e:
raise ValueError(f"Schema load error: {str(e)}")
def analyze(self):
"""전체 분석 실행"""
schema = self.schema_data.get('data', {}).get('__schema', {})
if not schema:
raise ValueError("스키마 데이터를 찾을 수 없습니다.")
self._print_header()
self._analyze_queries(schema)
self._analyze_mutations(schema)
self._analyze_security_fields(schema)
def _print_header(self):
print(f"\n{Fore.BLUE}{'=' * 60}")
print(f"GraphQL 쿼리 분석 리포트")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
def _find_type_by_name(self, schema: Dict, type_name: str) -> Optional[Dict]:
"""타입 이름으로 타입 정보 찾기"""
types = schema.get('types', [])
for type_info in types:
if type_info.get('name') == type_name:
return type_info
return None
def _analyze_queries(self, schema: Dict):
"""Query 타입 분석"""
print(f"{Fore.BLUE}[1] 사용 가능한 쿼리 목록{Style.RESET_ALL}")
query_type_name = schema.get('queryType', {}).get('name')
if not query_type_name:
print("Query 타입을 찾을 수 없습니다.")
return
query_type = self._find_type_by_name(schema, query_type_name)
if not query_type:
print("Query 타입 정보를 찾을 수 없습니다.")
return
fields = query_type.get('fields', [])
if not fields:
print("사용 가능한 쿼리가 없습니다.")
return
for field in fields:
name = field.get('name', '')
args = field.get('args', [])
description = field.get('description', '')
# 쿼리 예시 생성
query_example = self._generate_query_example(name, args)
# 보안 관련 필드 확인
category = self._check_security_category(name)
color = self.security_patterns[category]['color'] if category else Fore.WHITE
security_icon = "🔒 " if category else ""
print(f"\n{color}{security_icon}{name}{Style.RESET_ALL}")
if description:
print(f" 설명: {description}")
print(f" 쿼리 예시:")
print(f"{Fore.CYAN}{query_example}{Style.RESET_ALL}")
def _analyze_mutations(self, schema: Dict):
"""Mutation 타입 분석"""
print(f"\n{Fore.BLUE}[2] 사용 가능한 뮤테이션 목록{Style.RESET_ALL}")
mutation_type_name = schema.get('mutationType', {}).get('name')
if not mutation_type_name:
print("Mutation 타입을 찾을 수 없습니다.")
return
mutation_type = self._find_type_by_name(schema, mutation_type_name)
if not mutation_type:
print("Mutation 타입 정보를 찾을 수 없습니다.")
return
fields = mutation_type.get('fields', [])
if not fields:
print("사용 가능한 뮤테이션이 없습니다.")
return
for field in fields:
name = field.get('name', '')
args = field.get('args', [])
description = field.get('description', '')
# 뮤테이션 예시 생성
mutation_example = self._generate_mutation_example(name, args)
# 보안 관련 필드 확인
category = self._check_security_category(name)
color = self.security_patterns[category]['color'] if category else Fore.WHITE
security_icon = "🔒 " if category else ""
print(f"\n{color}{security_icon}{name}{Style.RESET_ALL}")
if description:
print(f" 설명: {description}")
print(f" 뮤테이션 예시:")
print(f"{Fore.CYAN}{mutation_example}{Style.RESET_ALL}")
def _generate_query_example(self, name: str, args: List[Dict]) -> str:
"""쿼리 예시 생성"""
query_args = self._generate_args_example(args)
return f"""query {name.capitalize()} {query_args['type_def']} {{
{name}{query_args['query_args']} {{
id
# 필요한 필드들을 여기에 추가하세요
}}
}}"""
def _generate_mutation_example(self, name: str, args: List[Dict]) -> str:
"""뮤테이션 예시 생성"""
query_args = self._generate_args_example(args)
return f"""mutation {name.capitalize()} {query_args['type_def']} {{
{name}{query_args['query_args']} {{
id
# 결과 필드들을 여기에 추가하세요
}}
}}"""
def _generate_args_example(self, args: List[Dict]) -> Dict[str, str]:
"""인자 예시 생성"""
if not args:
return {'type_def': '', 'query_args': ''}
arg_defs = []
arg_values = []
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_field_type(arg.get('type', {}))
if arg_name and arg_type:
arg_defs.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
type_def = f"({', '.join(arg_defs)})" if arg_defs else ""
query_args = f"({', '.join(arg_values)})" if arg_values else ""
return {
'type_def': type_def,
'query_args': query_args
}
def _check_security_category(self, name: str) -> Optional[str]:
"""보안 카테고리 확인"""
name_lower = name.lower()
for category, info in self.security_patterns.items():
if any(pattern in name_lower for pattern in info['patterns']):
return category
return None
def _analyze_security_fields(self, schema: Dict):
"""보안 관련 필드 분석"""
print(f"\n{Fore.BLUE}[3] 보안 관련 필드 분석{Style.RESET_ALL}")
types = schema.get('types', [])
security_findings = {}
for type_info in types:
if type_info.get('name', '').startswith('__'):
continue
fields = type_info.get('fields', [])
if not fields:
continue
type_name = type_info.get('name', '')
for field in fields:
field_name = field.get('name', '')
category = self._check_security_category(field_name)
if category:
if category not in security_findings:
security_findings[category] = []
security_findings[category].append({
'type': type_name,
'field': field_name,
'field_type': self._get_field_type(field.get('type', {}))
})
if security_findings:
for category, findings in security_findings.items():
color = self.security_patterns[category]['color']
print(f"\n{color}▶ {category} 관련 필드{Style.RESET_ALL}")
for finding in findings:
print(f" • {finding['type']}.{finding['field']}: {finding['field_type']}")
else:
print("보안 관련 필드를 찾을 수 없습니다.")
def _get_field_type(self, type_info: Dict) -> str:
"""필드 타입 정보를 문자열로 변환"""
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_field_type(of_type)}!"
elif kind == 'LIST':
return f"[{self._get_field_type(of_type)}]"
elif name:
return name
return 'Unknown'
def save_report(self, output_file: str):
"""분석 보고서를 파일로 저장"""
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.analyze()
sys.stdout = old_stdout
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"{Fore.GREEN}분석 보고서가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 중 오류 발생: {str(e)}{Style.RESET_ALL}")
finally:
output.close()
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "graphql_analysis_report.txt"
try:
analyzer = GraphQLAnalyzer(input_file)
analyzer.analyze() # 콘솔에 출력
analyzer.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
import json
from typing import Dict, Any, List, Set
from colorama import init, Fore, Style
class GraphQLSecurityAnalyzer:
def __init__(self, file_path: str):
init()
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.raw_data = json.load(f)
except Exception as e:
print(f"{Fore.RED}파일 읽기 오류: {str(e)}{Style.RESET_ALL}")
raise
# 보안 관련 키워드 정의
self.security_patterns = {
'관리자 권한': {
'keywords': ['admin', 'superuser', 'supervisor', 'root', 'manager',
'administrator', 'moderator', 'system', 'master'],
'color': Fore.RED,
'risk': '높음',
'description': '관리자 권한 관련 쿼리'
},
'계정 정보': {
'keywords': ['user', 'account', 'profile', 'member', 'employee',
'staff', 'personnel', 'worker'],
'color': Fore.YELLOW,
'risk': '중간',
'description': '사용자 계정 정보 조회'
},
'권한 관리': {
'keywords': ['permission', 'role', 'access', 'privilege', 'grant',
'authority', 'right', 'security', 'policy'],
'color': Fore.MAGENTA,
'risk': '높음',
'description': '권한 관리 관련 쿼리'
},
'민감한 데이터': {
'keywords': ['password', 'token', 'secret', 'key', 'credential',
'authentication', 'session', 'cookie', 'jwt', 'apikey'],
'color': Fore.RED,
'risk': '매우 높음',
'description': '민감한 인증 정보 관련'
},
'개인정보': {
'keywords': ['email', 'phone', 'address', 'contact', 'ssn', 'birth',
'private', 'personal', 'identity', 'social'],
'color': Fore.YELLOW,
'risk': '중간',
'description': '개인정보 관련 쿼리'
}
}
self.schema = self._get_schema()
self.types = self._get_types()
def _get_schema(self) -> Dict:
"""스키마 데이터 추출"""
try:
return self.raw_data.get('data', {}).get('__schema', {})
except Exception:
return {}
def _get_types(self) -> List[Dict]:
"""타입 목록 추출"""
try:
types = self.schema.get('types', [])
return [t for t in types if isinstance(t, dict) and not t.get('name', '').startswith('__')]
except Exception:
return []
def analyze_security_queries(self):
"""보안 관련 쿼리 분석 실행"""
print(f"\n{Fore.RED}🔒 GraphQL 보안 취약점 분석 보고서{Style.RESET_ALL}")
print(f"{Fore.RED}{'='*60}{Style.RESET_ALL}\n")
# 위험한 쿼리 분석
self._analyze_dangerous_queries()
# 위험한 뮤테이션 분석
self._analyze_dangerous_mutations()
def _analyze_dangerous_queries(self):
"""위험한 쿼리 분석"""
print(f"{Fore.RED}[1] 잠재적으로 위험한 쿼리 탐지{Style.RESET_ALL}")
query_type_name = self.schema.get('queryType', {}).get('name')
if not query_type_name:
return
dangerous_queries = []
fields = self._get_type_fields(query_type_name)
for field in fields:
field_info = self._analyze_field_security(field, is_query=True)
if field_info:
dangerous_queries.append(field_info)
if dangerous_queries:
self._print_security_findings(dangerous_queries, "Query")
else:
print("위험한 쿼리를 발견하지 못했습니다.\n")
def _analyze_dangerous_mutations(self):
"""위험한 뮤테이션 분석"""
print(f"\n{Fore.RED}[2] 잠재적으로 위험한 뮤테이션 탐지{Style.RESET_ALL}")
mutation_type_name = self.schema.get('mutationType', {}).get('name')
if not mutation_type_name:
return
dangerous_mutations = []
fields = self._get_type_fields(mutation_type_name)
for field in fields:
field_info = self._analyze_field_security(field, is_query=False)
if field_info:
dangerous_mutations.append(field_info)
if dangerous_mutations:
self._print_security_findings(dangerous_mutations, "Mutation")
else:
print("위험한 뮤테이션을 발견하지 못했습니다.\n")
def _get_type_fields(self, type_name: str) -> List[Dict]:
"""타입의 필드 목록 추출"""
try:
for t in self.types:
if t.get('name') == type_name:
return t.get('fields', [])
return []
except Exception:
return []
def _analyze_field_security(self, field: Dict, is_query: bool = True) -> Optional[Dict]:
"""필드의 보안 위험도 분석"""
try:
name = field.get('name', '').lower()
if not name:
return None
# 보안 카테고리 확인
for category, info in self.security_patterns.items():
if any(keyword in name for keyword in info['keywords']):
return {
'name': field.get('name'),
'category': category,
'risk_level': info['risk'],
'description': info['description'],
'color': info['color'],
'type': self._get_type_string(field.get('type', {})),
'args': field.get('args', []),
'operation_type': 'Query' if is_query else 'Mutation'
}
return None
except Exception:
return None
def _print_security_findings(self, findings: List[Dict], operation_type: str):
"""보안 분석 결과 출력"""
risk_order = {'매우 높음': 0, '높음': 1, '중간': 2}
findings.sort(key=lambda x: risk_order.get(x['risk_level'], 999))
for finding in findings:
color = finding['color']
print(f"\n{color}▶ {finding['name']}{Style.RESET_ALL}")
print(f" • 작업 유형: {operation_type}")
print(f" • 위험 수준: {finding['risk_level']}")
print(f" • 카테고리: {finding['category']}")
print(f" • 설명: {finding['description']}")
print(f" • 반환 타입: {finding['type']}")
# 인자 정보 출력
args = finding.get('args', [])
if args:
print(" • 인자:")
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_type_string(arg.get('type', {}))
print(f" - {arg_name}: {arg_type}")
# 실제 사용 예시 출력
example = self._generate_example(finding)
print(f"\n • 실행 예시:\n{example}")
def _get_type_string(self, type_info: Dict) -> str:
"""타입 정보를 문자열로 변환"""
try:
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_type_string(of_type)}!" if of_type else 'Unknown!'
elif kind == 'LIST':
return f"[{self._get_type_string(of_type)}]" if of_type else '[Unknown]'
elif name:
return name
return 'Unknown'
except Exception:
return 'Unknown'
def _generate_example(self, finding: Dict) -> str:
"""GraphQL 쿼리 예시 생성"""
try:
name = finding['name']
args = finding.get('args', [])
operation_type = finding['operation_type'].lower()
# 변수 정의 생성
variables = []
arg_values = []
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_type_string(arg.get('type', {}))
if arg_name and arg_type:
variables.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
operation_name = name.replace('_', ' ').title().replace(' ', '')
variables_str = f"({', '.join(variables)})" if variables else ""
args_str = f"({', '.join(arg_values)})" if arg_values else ""
return f"""{operation_type} {operation_name}{variables_str} {{
{name}{args_str} {{
id
name
# 추가 필드는 실제 응답 구조에 따라 조정하세요
}}
}}"""
except Exception:
return f"# {name} 예시 생성 실패"
def save_report(self, output_file: str):
"""분석 보고서를 파일로 저장"""
try:
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.analyze_security_queries()
sys.stdout = old_stdout
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n{Fore.GREEN}보안 분석 보고서가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 오류: {str(e)}{Style.RESET_ALL}")
finally:
if 'output' in locals():
output.close()
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "graphql_security_report.txt"
try:
analyzer = GraphQLSecurityAnalyzer(input_file)
analyzer.analyze_security_queries() # 콘솔에 출력
analyzer.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
import json
from typing import Dict, Any, List, Set
from colorama import init, Fore, Style
class GraphQLSecurityAnalyzer:
def __init__(self, file_path: str):
init()
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.raw_data = json.load(f)
except Exception as e:
print(f"{Fore.RED}파일 읽기 오류: {str(e)}{Style.RESET_ALL}")
raise
# 보안 관련 키워드 정의
self.security_patterns = {
'관리자 권한': {
'keywords': ['admin', 'superuser', 'supervisor', 'root', 'manager',
'administrator', 'moderator', 'system', 'master'],
'color': Fore.RED,
'risk': '높음',
'description': '관리자 권한 관련 쿼리'
},
'계정 정보': {
'keywords': ['user', 'account', 'profile', 'member', 'employee',
'staff', 'personnel', 'worker'],
'color': Fore.YELLOW,
'risk': '중간',
'description': '사용자 계정 정보 조회'
},
'권한 관리': {
'keywords': ['permission', 'role', 'access', 'privilege', 'grant',
'authority', 'right', 'security', 'policy'],
'color': Fore.MAGENTA,
'risk': '높음',
'description': '권한 관리 관련 쿼리'
},
'민감한 데이터': {
'keywords': ['password', 'token', 'secret', 'key', 'credential',
'authentication', 'session', 'cookie', 'jwt', 'apikey'],
'color': Fore.RED,
'risk': '매우 높음',
'description': '민감한 인증 정보 관련'
},
'개인정보': {
'keywords': ['email', 'phone', 'address', 'contact', 'ssn', 'birth',
'private', 'personal', 'identity', 'social'],
'color': Fore.YELLOW,
'risk': '중간',
'description': '개인정보 관련 쿼리'
}
}
self.schema = self._get_schema()
self.types = self._get_types()
def _get_schema(self) -> Dict:
"""스키마 데이터 추출"""
try:
return self.raw_data.get('data', {}).get('__schema', {})
except Exception:
return {}
def _get_types(self) -> List[Dict]:
"""타입 목록 추출"""
try:
types = self.schema.get('types', [])
return [t for t in types if isinstance(t, dict) and not t.get('name', '').startswith('__')]
except Exception:
return []
def analyze_security_queries(self):
"""보안 관련 쿼리 분석 실행"""
print(f"\n{Fore.RED}🔒 GraphQL 보안 취약점 분석 보고서{Style.RESET_ALL}")
print(f"{Fore.RED}{'='*60}{Style.RESET_ALL}\n")
# 위험한 쿼리 분석
self._analyze_dangerous_queries()
# 위험한 뮤테이션 분석
self._analyze_dangerous_mutations()
def _analyze_dangerous_queries(self):
"""위험한 쿼리 분석"""
print(f"{Fore.RED}[1] 잠재적으로 위험한 쿼리 탐지{Style.RESET_ALL}")
query_type_name = self.schema.get('queryType', {}).get('name')
if not query_type_name:
return
dangerous_queries = []
fields = self._get_type_fields(query_type_name)
for field in fields:
field_info = self._analyze_field_security(field, is_query=True)
if field_info:
dangerous_queries.append(field_info)
if dangerous_queries:
self._print_security_findings(dangerous_queries, "Query")
else:
print("위험한 쿼리를 발견하지 못했습니다.\n")
def _analyze_dangerous_mutations(self):
"""위험한 뮤테이션 분석"""
print(f"\n{Fore.RED}[2] 잠재적으로 위험한 뮤테이션 탐지{Style.RESET_ALL}")
mutation_type_name = self.schema.get('mutationType', {}).get('name')
if not mutation_type_name:
return
dangerous_mutations = []
fields = self._get_type_fields(mutation_type_name)
for field in fields:
field_info = self._analyze_field_security(field, is_query=False)
if field_info:
dangerous_mutations.append(field_info)
if dangerous_mutations:
self._print_security_findings(dangerous_mutations, "Mutation")
else:
print("위험한 뮤테이션을 발견하지 못했습니다.\n")
def _get_type_fields(self, type_name: str) -> List[Dict]:
"""타입의 필드 목록 추출"""
try:
for t in self.types:
if t.get('name') == type_name:
return t.get('fields', [])
return []
except Exception:
return []
def _analyze_field_security(self, field: Dict, is_query: bool = True) -> Optional[Dict]:
"""필드의 보안 위험도 분석"""
try:
name = field.get('name', '').lower()
if not name:
return None
# 보안 카테고리 확인
for category, info in self.security_patterns.items():
if any(keyword in name for keyword in info['keywords']):
return {
'name': field.get('name'),
'category': category,
'risk_level': info['risk'],
'description': info['description'],
'color': info['color'],
'type': self._get_type_string(field.get('type', {})),
'args': field.get('args', []),
'operation_type': 'Query' if is_query else 'Mutation'
}
return None
except Exception:
return None
def _print_security_findings(self, findings: List[Dict], operation_type: str):
"""보안 분석 결과 출력"""
risk_order = {'매우 높음': 0, '높음': 1, '중간': 2}
findings.sort(key=lambda x: risk_order.get(x['risk_level'], 999))
for finding in findings:
color = finding['color']
print(f"\n{color}▶ {finding['name']}{Style.RESET_ALL}")
print(f" • 작업 유형: {operation_type}")
print(f" • 위험 수준: {finding['risk_level']}")
print(f" • 카테고리: {finding['category']}")
print(f" • 설명: {finding['description']}")
print(f" • 반환 타입: {finding['type']}")
# 인자 정보 출력
args = finding.get('args', [])
if args:
print(" • 인자:")
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_type_string(arg.get('type', {}))
print(f" - {arg_name}: {arg_type}")
# 실제 사용 예시 출력
example = self._generate_example(finding)
print(f"\n • 실행 예시:\n{example}")
def _get_type_string(self, type_info: Dict) -> str:
"""타입 정보를 문자열로 변환"""
try:
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_type_string(of_type)}!" if of_type else 'Unknown!'
elif kind == 'LIST':
return f"[{self._get_type_string(of_type)}]" if of_type else '[Unknown]'
elif name:
return name
return 'Unknown'
except Exception:
return 'Unknown'
def _generate_example(self, finding: Dict) -> str:
"""GraphQL 쿼리 예시 생성"""
try:
name = finding['name']
args = finding.get('args', [])
operation_type = finding['operation_type'].lower()
# 변수 정의 생성
variables = []
arg_values = []
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_type_string(arg.get('type', {}))
if arg_name and arg_type:
variables.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
operation_name = name.replace('_', ' ').title().replace(' ', '')
variables_str = f"({', '.join(variables)})" if variables else ""
args_str = f"({', '.join(arg_values)})" if arg_values else ""
return f"""{operation_type} {operation_name}{variables_str} {{
{name}{args_str} {{
id
name
# 추가 필드는 실제 응답 구조에 따라 조정하세요
}}
}}"""
except Exception:
return f"# {name} 예시 생성 실패"
def save_report(self, output_file: str):
"""분석 보고서를 파일로 저장"""
try:
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.analyze_security_queries()
sys.stdout = old_stdout
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n{Fore.GREEN}보안 분석 보고서가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 오류: {str(e)}{Style.RESET_ALL}")
finally:
if 'output' in locals():
output.close()
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "graphql_security_report.txt"
try:
analyzer = GraphQLSecurityAnalyzer(input_file)
analyzer.analyze_security_queries() # 콘솔에 출력
analyzer.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
import json
from typing import Dict, List, Optional
from colorama import init, Fore, Style
class SafeGraphQLScanner:
def __init__(self, schema_file: str):
init()
self.schema_data = self._load_schema_safely(schema_file)
self.sensitive_keywords = {
'ADMIN': ['admin', 'root', 'superuser', 'supervisor'],
'AUTH': ['login', 'auth', 'token', 'password', 'credential'],
'USER': ['user', 'account', 'profile', 'member'],
'PERMISSION': ['permission', 'role', 'access', 'grant'],
'SENSITIVE': ['email', 'phone', 'address', 'ssn', 'secret']
}
def _load_schema_safely(self, file_path: str) -> dict:
"""안전하게 스키마 파일 로드"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"{Fore.RED}스키마 파일 로드 실패: {str(e)}{Style.RESET_ALL}")
return {}
def _safe_get_schema(self) -> dict:
"""안전하게 스키마 데이터 추출"""
try:
if 'data' not in self.schema_data:
return {}
data = self.schema_data['data']
if not isinstance(data, dict):
return {}
if '__schema' not in data:
return {}
schema = data['__schema']
if not isinstance(schema, dict):
return {}
return schema
except Exception:
return {}
def _safe_get_types(self, schema: dict) -> List[dict]:
"""안전하게 타입 목록 추출"""
try:
if 'types' not in schema:
return []
types = schema['types']
if not isinstance(types, list):
return []
return [t for t in types if isinstance(t, dict)]
except Exception:
return []
def _is_sensitive_field(self, name: str) -> tuple:
"""필드가 민감한지 확인"""
name_lower = name.lower()
for category, keywords in self.sensitive_keywords.items():
if any(keyword in name_lower for keyword in keywords):
return True, category
return False, None
def scan_security_issues(self):
"""보안 이슈 스캔"""
schema = self._safe_get_schema()
if not schema:
print(f"{Fore.RED}유효한 스키마를 찾을 수 없습니다.{Style.RESET_ALL}")
return
print(f"\n{Fore.RED}🔍 GraphQL 보안 스캔 결과{Style.RESET_ALL}")
print(f"{Fore.RED}{'='*50}{Style.RESET_ALL}\n")
# Query 분석
self._scan_queries(schema)
# Mutation 분석
self._scan_mutations(schema)
def _get_operation_fields(self, schema: dict, operation_type: str) -> List[dict]:
"""Query/Mutation 필드 안전하게 가져오기"""
try:
# operation_type은 'queryType' 또는 'mutationType'
if operation_type not in schema:
return []
type_info = schema[operation_type]
if not isinstance(type_info, dict):
return []
type_name = type_info.get('name')
if not type_name:
return []
# 해당 타입의 필드 찾기
types = self._safe_get_types(schema)
for type_obj in types:
if type_obj.get('name') == type_name:
fields = type_obj.get('fields', [])
return fields if isinstance(fields, list) else []
return []
except Exception:
return []
def _scan_queries(self, schema: dict):
"""Query 분석"""
print(f"{Fore.YELLOW}[1] 민감한 Query 검사{Style.RESET_ALL}")
fields = self._get_operation_fields(schema, 'queryType')
if not fields:
print("Query 필드를 찾을 수 없습니다.\n")
return
sensitive_queries = []
for field in fields:
if not isinstance(field, dict):
continue
field_name = field.get('name', '')
if not field_name:
continue
is_sensitive, category = self._is_sensitive_field(field_name)
if is_sensitive:
sensitive_queries.append({
'name': field_name,
'category': category,
'type': self._get_safe_type_string(field.get('type', {})),
'args': field.get('args', [])
})
if sensitive_queries:
self._print_sensitive_operations(sensitive_queries, "Query")
else:
print("민감한 Query를 발견하지 못했습니다.\n")
def _scan_mutations(self, schema: dict):
"""Mutation 분석"""
print(f"\n{Fore.YELLOW}[2] 민감한 Mutation 검사{Style.RESET_ALL}")
fields = self._get_operation_fields(schema, 'mutationType')
if not fields:
print("Mutation 필드를 찾을 수 없습니다.\n")
return
sensitive_mutations = []
for field in fields:
if not isinstance(field, dict):
continue
field_name = field.get('name', '')
if not field_name:
continue
is_sensitive, category = self._is_sensitive_field(field_name)
if is_sensitive:
sensitive_mutations.append({
'name': field_name,
'category': category,
'type': self._get_safe_type_string(field.get('type', {})),
'args': field.get('args', [])
})
if sensitive_mutations:
self._print_sensitive_operations(sensitive_mutations, "Mutation")
else:
print("민감한 Mutation을 발견하지 못했습니다.\n")
def _get_safe_type_string(self, type_info: dict) -> str:
"""타입 정보 안전하게 문자열로 변환"""
try:
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_safe_type_string(of_type)}!" if of_type else 'Unknown!'
elif kind == 'LIST':
return f"[{self._get_safe_type_string(of_type)}]" if of_type else '[Unknown]'
elif name:
return name
return 'Unknown'
except Exception:
return 'Unknown'
def _print_sensitive_operations(self, operations: List[dict], op_type: str):
"""민감한 작업 정보 출력"""
for op in operations:
try:
name = op['name']
category = op['category']
return_type = op['type']
args = op.get('args', [])
print(f"\n{Fore.RED}▶ {name}{Style.RESET_ALL}")
print(f" • 유형: {op_type}")
print(f" • 보안 분류: {category}")
print(f" • 반환 타입: {return_type}")
if args:
print(" • 인자:")
for arg in args:
if not isinstance(arg, dict):
continue
arg_name = arg.get('name', '')
arg_type = self._get_safe_type_string(arg.get('type', {}))
if arg_name and arg_type:
print(f" - {arg_name}: {arg_type}")
# 예시 쿼리 생성
example = self._generate_safe_example(op_type, name, args)
print(f"\n • 사용 예시:\n{example}")
except Exception:
continue
def _generate_safe_example(self, op_type: str, name: str, args: List[dict]) -> str:
"""안전한 예시 쿼리 생성"""
try:
# 변수 정의 생성
variables = []
arg_values = []
for arg in args:
if not isinstance(arg, dict):
continue
arg_name = arg.get('name', '')
arg_type = self._get_safe_type_string(arg.get('type', {}))
if arg_name and arg_type:
variables.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
operation_name = name.replace('_', ' ').title().replace(' ', '')
variables_str = f"({', '.join(variables)})" if variables else ""
args_str = f"({', '.join(arg_values)})" if arg_values else ""
return f"""{op_type.lower()} {operation_name}{variables_str} {{
{name}{args_str} {{
id
# 필요한 필드들을 추가하세요
}}
}}"""
except Exception:
return f"# {name} 예시 생성 실패"
def save_report(self, output_file: str):
"""결과를 파일로 저장"""
try:
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.scan_security_issues()
sys.stdout = old_stdout
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n{Fore.GREEN}스캔 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 오류: {str(e)}{Style.RESET_ALL}")
finally:
if 'output' in locals():
output.close()
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "security_scan_report.txt"
try:
scanner = SafeGraphQLScanner(input_file)
scanner.scan_security_issues() # 콘솔에 출력
scanner.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
import json
from typing import Dict, List, Optional, Set
from colorama import init, Fore, Style
class GraphQLScanner:
def __init__(self, schema_file: str):
init()
self.schema_data = self._load_schema_safely(schema_file)
self.schema = self._safe_get_schema()
self.types = self._safe_get_types(self.schema)
# 중요 키워드 정의
self.sensitive_keywords = {
'ADMIN': {
'keywords': ['admin', 'root', 'superuser', 'supervisor'],
'color': Fore.RED,
'risk': '높음'
},
'AUTH': {
'keywords': ['login', 'auth', 'token', 'password', 'credential'],
'color': Fore.RED,
'risk': '높음'
},
'USER': {
'keywords': ['user', 'account', 'profile', 'member'],
'color': Fore.YELLOW,
'risk': '중간'
},
'PERMISSION': {
'keywords': ['permission', 'role', 'access', 'grant'],
'color': Fore.RED,
'risk': '높음'
},
'SENSITIVE': {
'keywords': ['email', 'phone', 'address', 'ssn', 'secret'],
'color': Fore.YELLOW,
'risk': '중간'
}
}
def _load_schema_safely(self, file_path: str) -> dict:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"{Fore.RED}스키마 파일 로드 실패: {str(e)}{Style.RESET_ALL}")
return {}
def _safe_get_schema(self) -> dict:
try:
return self.schema_data.get('data', {}).get('__schema', {})
except Exception:
return {}
def _safe_get_types(self, schema: dict) -> List[dict]:
try:
types = schema.get('types', [])
return [t for t in types if isinstance(t, dict)]
except Exception:
return []
def _find_type_by_name(self, type_name: str) -> Optional[dict]:
"""타입 이름으로 타입 정보 찾기"""
try:
for t in self.types:
if t.get('name') == type_name:
return t
return None
except Exception:
return None
def _get_all_fields_for_type(self, type_name: str, visited: Set[str] = None) -> List[str]:
"""타입의 모든 가능한 필드를 재귀적으로 수집"""
if visited is None:
visited = set()
if type_name in visited:
return []
visited.add(type_name)
fields = []
type_info = self._find_type_by_name(type_name)
if not type_info:
return []
for field in type_info.get('fields', []):
field_name = field.get('name')
if not field_name:
continue
field_type = self._get_field_type_name(field.get('type', {}))
if not field_type:
continue
fields.append(field_name)
# 객체 타입인 경우 재귀적으로 필드 수집
if self._is_object_type(field_type) and field_type not in visited:
sub_fields = self._get_all_fields_for_type(field_type, visited)
if sub_fields:
fields.extend([f"{field_name}.{sub}" for sub in sub_fields])
return fields
def _is_object_type(self, type_name: str) -> bool:
"""해당 타입이 객체 타입인지 확인"""
type_info = self._find_type_by_name(type_name)
if not type_info:
return False
return type_info.get('kind') == 'OBJECT'
def _get_field_type_name(self, type_info: dict) -> Optional[str]:
"""필드의 실제 타입 이름 추출"""
try:
if not type_info:
return None
if type_info.get('kind') == 'NON_NULL':
return self._get_field_type_name(type_info.get('ofType', {}))
elif type_info.get('kind') == 'LIST':
return self._get_field_type_name(type_info.get('ofType', {}))
else:
return type_info.get('name')
except Exception:
return None
def scan(self):
"""보안 스캔 실행"""
print(f"\n{Fore.RED}🔍 GraphQL 보안 스캔 결과{Style.RESET_ALL}")
print(f"{Fore.RED}{'='*60}{Style.RESET_ALL}\n")
self._scan_operations('queryType', 'Query')
print("\n" + "="*60 + "\n")
self._scan_operations('mutationType', 'Mutation')
def _scan_operations(self, operation_type: str, operation_name: str):
"""작업(Query/Mutation) 스캔"""
print(f"{Fore.YELLOW}[*] 민감한 {operation_name} 분석{Style.RESET_ALL}")
root_type = self.schema.get(operation_type, {})
if not root_type:
print(f"- {operation_name} 타입을 찾을 수 없습니다.\n")
return
type_name = root_type.get('name')
if not type_name:
print(f"- {operation_name} 타입 이름을 찾을 수 없습니다.\n")
return
type_info = self._find_type_by_name(type_name)
if not type_info:
print(f"- {operation_name} 타입 정보를 찾을 수 없습니다.\n")
return
fields = type_info.get('fields', [])
if not fields:
print(f"- 사용 가능한 {operation_name}가 없습니다.\n")
return
for field in fields:
self._analyze_sensitive_field(field, operation_name)
def _analyze_sensitive_field(self, field: dict, operation_type: str):
"""민감한 필드 분석"""
try:
name = field.get('name', '')
if not name:
return
# 보안 분류 확인
field_category = None
risk_level = None
color = None
for category, info in self.sensitive_keywords.items():
if any(keyword in name.lower() for keyword in info['keywords']):
field_category = category
risk_level = info['risk']
color = info['color']
break
if field_category:
# 반환 타입 분석
return_type = self._get_field_type_name(field.get('type', {}))
if not return_type:
return
# 사용 가능한 모든 필드 수집
available_fields = self._get_all_fields_for_type(return_type)
if not available_fields:
return
# 출력
print(f"\n{color}▶ {name}{Style.RESET_ALL}")
print(f" • 보안 분류: {field_category}")
print(f" • 위험 수준: {risk_level}")
print(f" • 반환 타입: {return_type}")
# 인자 정보 출력
args = field.get('args', [])
arg_strings = []
if args:
print(" • 인자:")
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_safe_type_string(arg.get('type', {}))
if arg_name and arg_type:
print(f" - {arg_name}: {arg_type}")
arg_strings.append(f"{arg_name}: ${arg_name}")
# 완성된 쿼리 예시 생성
self._print_complete_query(
operation_type.lower(),
name,
args,
available_fields
)
except Exception:
pass
def _get_safe_type_string(self, type_info: dict) -> str:
"""안전한 타입 문자열 변환"""
try:
if not isinstance(type_info, dict):
return 'Unknown'
kind = type_info.get('kind', '')
name = type_info.get('name')
of_type = type_info.get('ofType')
if kind == 'NON_NULL':
return f"{self._get_safe_type_string(of_type)}!" if of_type else 'Unknown!'
elif kind == 'LIST':
return f"[{self._get_safe_type_string(of_type)}]" if of_type else '[Unknown]'
elif name:
return name
return 'Unknown'
except Exception:
return 'Unknown'
def _print_complete_query(self, operation_type: str, name: str, args: List[dict], fields: List[str]):
"""완성된 쿼리 출력"""
try:
# 변수 정의 생성
var_defs = []
arg_values = []
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_safe_type_string(arg.get('type', {}))
if arg_name and arg_type:
var_defs.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
# 필드 구조화
structured_fields = self._structure_fields(fields)
operation_name = name.replace('_', ' ').title().replace(' ', '')
var_def_str = f"({', '.join(var_defs)})" if var_defs else ""
args_str = f"({', '.join(arg_values)})" if arg_values else ""
print(f"\n • 실행 가능한 쿼리:")
print(f"{Fore.CYAN}")
print(f"{operation_type} {operation_name}{var_def_str} {{")
print(f" {name}{args_str} {{")
self._print_structured_fields(structured_fields, " ")
print(" }")
print("}")
print(f"{Style.RESET_ALL}")
except Exception:
print(f"# {name} 쿼리 생성 실패")
def _structure_fields(self, fields: List[str]) -> Dict:
"""필드 목록을 구조화"""
result = {}
for field in fields:
parts = field.split('.')
current = result
for i, part in enumerate(parts):
if i == len(parts) - 1:
current[part] = None
else:
if part not in current:
current[part] = {}
current = current[part]
return result
def _print_structured_fields(self, fields: Dict, indent: str):
"""구조화된 필드 출력"""
for field, subfields in fields.items():
if subfields is None:
print(f"{indent}{field}")
else:
print(f"{indent}{field} {{")
self._print_structured_fields(subfields, indent + " ")
print(f"{indent}}}")
def save_report(self, output_file: str):
"""결과를 파일로 저장"""
try:
import sys
from io import StringIO
old_stdout = sys.stdout
output = StringIO()
sys.stdout = output
self.scan()
sys.stdout = old_stdout
from re import sub
report = sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', output.getvalue())
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n{Fore.GREEN}스캔 결과가 {output_file}에 저장되었습니다.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}파일 저장 오류: {str(e)}{Style.RESET_ALL}")
finally:
if 'output' in locals():
output.close()
def main():
import sys
if len(sys.argv) < 2:
print(f"{Fore.RED}사용법: python {sys.argv[0]} <스키마파일> [출력파일]{Style.RESET_ALL}")
return
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "security_scan_report.txt"
try:
scanner = GraphQLScanner(input_file)
scanner.scan() # 콘솔에 출력
scanner.save_report(output_file) # 파일로 저장
except Exception as e:
print(f"{Fore.RED}오류 발생: {str(e)}{Style.RESET_ALL}")
if __name__ == "__main__":
main()
# ... (이전 코드와 동일) ...
def _print_complete_query(self, operation_type: str, name: str, args: List[dict], fields: List[str]):
"""사용자 친화적인 쿼리 템플릿 출력"""
try:
# 변수 정의와 예시값 생성
var_defs = []
arg_values = []
example_values = []
for arg in args:
arg_name = arg.get('name', '')
arg_type = self._get_safe_type_string(arg.get('type', {}))
if arg_name and arg_type:
var_defs.append(f"${arg_name}: {arg_type}")
arg_values.append(f"{arg_name}: ${arg_name}")
# 타입에 따른 예시값 생성
example_value = self._get_example_value(arg_type)
example_values.append(f' "{arg_name}": {example_value}')
# 쿼리 템플릿 생성
print(f"\n • 실행 가능한 쿼리 템플릿:")
print(f"{Fore.CYAN}")
# 1. 변수 예시
if example_values:
print("# 변수 예시:")
print("variables = {")
print("\n".join(example_values))
print("}")
print()
# 2. 쿼리 템플릿
operation_name = name.replace('_', ' ').title().replace(' ', '')
var_def_str = f"({', '.join(var_defs)})" if var_defs else ""
args_str = f"({', '.join(arg_values)})" if arg_values else ""
print(f"{operation_type} {operation_name}{var_def_str} {{")
print(f" {name}{args_str} {{")
# 필드 출력 (구조화된 형태로)
structured_fields = self._structure_fields(fields)
self._print_structured_fields(structured_fields, " ")
print(" }")
print("}")
print(f"{Style.RESET_ALL}")
# 3. 사용 예시
print(f"\n • 실행 방법:")
print(" 1) variables에 필요한 값들을 채웁니다.")
print(" 2) 필요한 필드들을 선택적으로 활성화/비활성화합니다.")
print(" 3) GraphQL 엔드포인트로 요청을 보냅니다.")
except Exception as e:
print(f"# 쿼리 템플릿 생성 오류: {str(e)}")
def _get_example_value(self, type_str: str) -> str:
"""타입에 따른 예시값 생성"""
type_str = type_str.replace('!', '').replace('[', '').replace(']', '')
example_values = {
'ID': '"______" # ID 값을 입력하세요',
'String': '"______" # 문자열을 입력하세요',
'Int': '0 # 숫자를 입력하세요',
'Float': '0.0 # 실수를 입력하세요',
'Boolean': 'true # true/false를 입력하세요',
'DateTime': '"YYYY-MM-DD" # 날짜를 입력하세요',
'Email': '"user@example.com" # 이메일을 입력하세요',
'Password': '"******" # 비밀번호를 입력하세요',
'Token': '"______" # 토큰을 입력하세요'
}
return example_values.get(type_str, '"______" # 값을 입력하세요')
def _print_structured_fields(self, fields: Dict, indent: str):
"""구조화된 필드 출력 (선택 가능한 형태로)"""
for field, subfields in sorted(fields.items()):
if subfields is None:
# 기본 필드는 활성화, 민감한 필드는 주석 처리
if any(keyword in field.lower() for keywords in self.sensitive_keywords.values()
for keyword in keywords['keywords']):
print(f"{indent}# {field} # 민감한 필드")
else:
print(f"{indent}{field}")
else:
print(f"{indent}{field} {{")
self._print_structured_fields(subfields, indent + " ")
print(f"{indent}}}")
# ... (나머지 코드는 동일) ...