파이썬 실습/악성코드_바이러스탐지 프로그램 만들기

파이썬 악성코드/바이러스 탐지 프로그램 만들기 - 4.악성코드 점검 자동화

파기차차 2022. 11. 12. 14:51
728x90
반응형
SMALL
728x90

ㅁ 개요

 

 

O 프로그램 소개

 

 

 - 이번 프로그램은 이전글(2022.11.06 - [파이썬 실습/유틸리티] - 파이썬 악성코드/바이러스 탐지 프로그램 만들기 - 3.파일업로드를 통한 악성코드 유무 확인)에 이은 네번째 글로 기존에 수동으로 점검하던 방식을 개선하여 외부에서 다운로드 받은 파일을 자동으로 검사하는 방법에 대하여 설명합니다.

 

 

O 완성된 프로그램 실행 화면
 

 

 - 이번 편에서 최종 완성된 프로그램의 결과화면은 아래와 같습니다.

(1) 프로그램 실행 시 '서버에 업로드된 파일의 분석된 해시값'을 가져와서 파일의 악성유무를 판단하고, 악성인 경우 경고창을 띄워주고 있습니다.

 

다만, 아래에서 다시 말씀드리겠지만, API 방식으로 업로드하는 경우 서버에 즉시 반영이 되지 않기때문에 우리는 이부분을 다시 한번 고민해야 합니다.

 


 

O 사전 준비 및 주의 사항!!

 

 - 우리는 이번 테스트를 위해 멀웨어닷컴에서 해시값을 확인할 수 없는 새로운 악성파일이 필요합니다. 이 악성파일을 잘못 다루는 경우 감염될 수 있으므로 반드시 세심한 주의를 하셔야 합니다.(exe파일 더블클릭 절대 금지!!)

 

!!!!! 주의 사항

우리가 준비하려는 새로운 악성파일은 말 그대로 악성코드가 포함된 파일이므로 취급시 매우 매우 조심하셔야 하며, 잘못하면 자신의 컴퓨터가 감염될 수 있으니, 이점이 우려되시는 분은 따라하지 마시기 바랍니다.!!!

 

 

자세한 내용은 이전글(2022.11.06 - [파이썬 실습/유틸리티] - 파이썬 악성코드/바이러스 탐지 프로그램 만들기 - 3.파일업로드를 통한 악성코드 유무 확인)을 참고하시기 바랍니다.

 

 

 

 - 이제 우리는 다음 단계로 넘어갈 준비가 되었습니다.


 

ㅁ 세부 내용
 
O 완성된 소스

 

import requests 
import sys
import os
import win32api
import time
from ast import literal_eval


api_key = '본인의 api키'


print(len(sys.argv))
if len(sys.argv) > 1 and api_key:
    print("ok")
    imsiDir = "imsi/"
    if not(os.path.isdir(imsiDir)): # imsi 디렉토리가 없으면 만든다.
        os.makedirs(os.path.join(imsiDir))

    for i in range(1, len(sys.argv)):
        filenames = os.listdir(sys.argv[i])
        print(filenames)

        for (path, dirs, filenames) in os.walk(sys.argv[i]):
            for filename in filenames:
                full_filename = os.path.join(path, filename)
                print(full_filename)
                os.system('certUtil -hashfile "' + full_filename + '" MD5 > ' + imsiDir + 'imsi_hash.txt')
                txtf = open(imsiDir+"imsi_hash.txt", "r", encoding="euckr")
                data = txtf.read()
                second_line = data.split('\n',2)[1]
                hash = second_line.replace(" ","")
                print("파일명: ", full_filename, "###########\t\t md5 해쉬: ", hash)

                params = {'api_key': api_key, 'hash': hash}
                response = requests.get('https://public.api.malwares.com/v3/file/mwsinfo', params=params)
                json_response = response.json()
                # json_response = str(json_response).replace("\'", "\"")
                print(json_response)

                if json_response['result_code'] == int(1): # or json_response['result_code'] == int(2): 
                    pass
                    print("서버에 해시정보가 있습니다. 보내주신 해시값과 서버의 해시값을 비교합니다.")
                    f = open(imsiDir+hash+".txt", "w")
                    f.write(str(json_response))
                    # f.write(json_response)
                    f.close()

                    with open(imsiDir+hash+".txt") as f:
                        # data2 = json.load(json_file)
                        # data2 = eval(f.read())
                        data2 = literal_eval(f.read()) # str -> dic로 읽는다.


                        ai_score = data2['ai_score']
                        print(ai_score)
                        if data2['signcheck']:
                            signcheck = data2['signcheck']['verified']
                        else:
                            signcheck = 'None'
                        print(signcheck)
                        check = "파일이름 : " + str(filename) + "\nai_score : " + str(ai_score) + "\nsigncheck : " + str(signcheck) +"\n\nDetection Malware!!!!!! "
                        if signcheck == "unsigned" or ai_score > int(3):
                        # if signcheck == 'None' or signcheck == "unsigned" or ai_score > int(3):
                            win32api.MessageBox(None, check, 'Warning!!!!',0)

                elif json_response['result_code'] == int(2):
                    print(full_filename, " 이 파일은 현재 분석 진행중입니다. 5분 후에 다시 시도 하시기 바랍니다.")
                
                else:
                    # json_response['result_code'] == int(0)
                    print(full_filename, "이 파일은 서버에 해시 정보가 없으므로 파일을 서버에 업로드 후 해시정보를 다시 받아와야 합니다.")


                    params = {"api_key": api_key,"filename": full_filename}
                    files = {"file": (full_filename, open(full_filename, "rb"), 'application/octet-stream')}
                    # requests.post("https://public.api.malwares.com/v3/file/upload", files=files, data=params)
                    response = requests.post("https://public.api.malwares.com/v3/file/upload", files=files, data=params)
                    json_response = response.json()
                    print(json_response)
                    # print(json_response['md5'])
                    # hash = json_response['md5']
                    print("파일 업로드 중입니다. 약 1분정도 소요됩니다.")
                    time.sleep(30)

                    os.system('certUtil -hashfile "' + full_filename + '" MD5 > ' + imsiDir + 'imsi_hash.txt')
                    txtf = open(imsiDir+"imsi_hash.txt", "r", encoding="euckr")
                    data = txtf.read()
                    second_line = data.split('\n',2)[1]
                    hash = second_line.replace(" ","")
                    print("파일명: ", full_filename, "###########\t\t md5 해쉬: ", hash)


                    params = {'api_key': api_key, 'hash': hash}
                    response = requests.get('https://public.api.malwares.com/v3/file/mwsinfo', params=params)
                    json_response = response.json()
                    print(json_response)

                    if json_response['result_code'] == int(1):
                        pass
                        print("업로드 후 서버에 해시정보가 생겼습니다. 보내주신 해시값과 서버의 해시값을 비교합니다.")
                        f = open(imsiDir+hash+".txt", "w")
                        f.write(str(json_response))
                        # f.write(json_response)
                        f.close()

                        with open(imsiDir+hash+".txt") as f:
                            # data2 = json.load(json_file)
                            # data2 = eval(f.read())
                            data2 = literal_eval(f.read()) # str -> dic로 읽는다.


                            ai_score = data2['ai_score']
                            print(ai_score)
                            if data2['signcheck']:
                                signcheck = data2['signcheck']['verified']
                            else:
                                signcheck = 'None'
                            print(signcheck)
                            check = "파일이름 : " + str(filename) + "\n\nDetection Malware!!!!!! "
                            if signcheck == "unsigned" or ai_score > int(3):
                            # if signcheck == 'None' or signcheck == "unsigned" or ai_score > int(3):
                                win32api.MessageBox(None, check, 'Warning!!!!',0)
                    
                    else:
                        print(full_filename, "이 파일은 파일업로드 후에도 서버에 해시 정보가 없습니다. 개별적으로 확인해 보시기 바랍니다.")

                
                txtf.close()
            time.sleep(1)

 

 

O 소스 다운로드 및 실행

 

 

 

 - cmd 또는 파워쉘, 비주얼스튜디오코드 등에서 아래와 같이 실행하시기 바랍니다.
>python 3.malwareCom_hash_for_pub.py "점검할 폴더"(.\checkdir)

 

 

 

 


 

O 소스 분석

 

1.from ast import literal_eval <-- 문자열을 딕셔너리로 변경하게 해주는 모듈 임포트
2.len(sys.argv) <-- 인자의 갯수(예: >python 3.malwareCom_hash_for_pub.py <--이렇게 샐행하면 인자는 1임(python 뒤의 지정된 명령 개수)
3.if not(os.path.isdir(imsiDir)): <-- imsiDir 디렉토리가 없으면 만든다.
        os.makedirs(os.path.join(imsiDir))
4.for i in range(1, len(sys.argv)): <-- 인자 갯수만큼 돈다(즉, 폴더를 여러개 지정 시 지정한 모든 폴더의 파일을 점검)
5.filenames = os.listdir(sys.argv[i]) <-- 지정한 폴더 내 파일 목록을 list로 반환
6.for (path, dirs, filenames) in os.walk(sys.argv[i]): <-- 지정한 폴더의 경로, 디렉토리, 파일목록을 반환
7.os.system('certUtil -hashfile "' + full_filename + '" MD5 > ' + imsiDir + 'imsi_hash.txt') <-- win10에 내장된 certUtil 툴은 해시값을 뽑아주는 도구이며, 여기서는 MD5 해시를 파일로 저장
8.second_line = data.split('\n',2)[1] <-- 해시값이 저장된 파일(imsi_hash.txt)에서 두번째 라인을 읽어서 second_line변수에 저장
9.hash = second_line.replace(" ","") <--해시값에 공백이 없도록 replace()함수로 대체시켜줌
10.if json_response['result_code'] == int(1): <-- 서버로 부터 API응답코드('result_code')에 데이터가 있으면(1, 즉 서버에 해시값이 있으면)
11.data2 = literal_eval(f.read()) <-- imsi+hash+".txt" 파일의 내용은 스트링으로 되어 있으므로 딕셔너리구조로 읽어오기 위해 literal_eval()함수로 형식을 변환
12.if signcheck == "unsigned" or ai_score > int(3): <-- signcheck변수가 'unsigned'이거나 'ai_score'가 3보다 크면 악성파일로 정의
13.win32api.MessageBox(None, check, 'Warning!!!!',0) <--위 12번 조건에 맞으면 경고창 띄움
14.print("파일 업로드 중입니다. 약 1분정도 소요됩니다.")
           time.sleep(30) <-- 파일 업로드 시 약간의 시간 텀을 주어야 함(텀을 주지 않으면 서버에서 Ddos공격으로 의심하여 업로드를 한동안 차단 당할 수 있음)

 

 


 

O 기본 내용

 

 

1. 위의 악성코드 점검 자동화 프로그램을 실행시키면 아래와 같이 동작합니다.

1.1. 우리가 점검하는 파일은 새로운 악성파일로 서버에 존재하지 않으므로 API 방식으로 서버에 파일을 업로드 후 분석된 해시 값을 받아야 합니다.

1.2.서버에 파일을 업로드 합니다.
1.3.업로드 후 바로 서버에서 분석된 'ai_score'와 'result_code'를 받는데, 이상하게도 'ai_score'가 0(안전)입니다. 왜 그럴까요?

 

 

2. 이전글(2022.10.31 - [파이썬 실습/유틸리티] - 파이썬 악성코드/바이러스 탐지 프로그램 만들기 - 2.해시값을 이용한 악성코드 유무 확인)에서 만들었던 해시값 비교 프로그램으로 테스트 해 봅니다.

하지만 여전히 결과는 동일('안전')합니다.('ai_score': 0)

 

 

 

 

 

3. 홈페이지에서 파일을 직접 업로드 해 봅니다.

 

 

 

 

 

3-1 파일이 업로드 되고 있습니다.

 

 

4. 업로드된 파일을 서버에서 분석 중이며, 결과가 '9/100' <-- 이것과 같이 악성파일로 강력히 의심되고 있습니다.

홈페이지에 업로드 하니 API방식으로 업로드할때 와는 달리 서버에 '즉시 반영'이 되었습니다.

 

 

 

 

 

5. 다시 프로그램(3.malwareCom_hash_for_pub.py)을 실행하면 이제 정상적으로 해당 파일을 악성파일로 인식하여 경고창을 띄워 주고 있습니다.

 

 

 

 

 

 


 

ㅁ 정리
 
O 오늘 우리가 배운 내용
 
 - 오늘 우리는 이전글에서 수동으로 점검하던 방식을 개선하여 외부에서 다운로드 받은 파일을 자동으로 검사하는 방법에 대하여 배웠습니다.
 
 - 간단히 정리해 보면 다음과 같습니다.
 > 1.API를 이용한 파일 업로드 '악성코드 자동분석 프로그램' 실행
     => 서버에 즉시 반영되지 않아 'ai_score'의 값이 0('안전')인 문제점 발생
 > 2. 멀웨어 닷컴 홈페이지를 이용하여 직접 업로드
     => 홈페이지 업로드의 경우 서버에서 멀웨어 정보 분석 후 즉시 반영됨
 > 3. 다시 프로그램 실행 시(API 요청시) 정상적으로 작동
     => 홈페이지 업로드 후에는 업로드된 파일을 정상적으로 악성파일로 인식

 

 

오늘은 여기까지 이며, 다음편에서 우리는 API방식으로 악성파일을 업로드 하는 경우 서버에 즉시 반영되지 않는 문제점을 어떻게 개선할 수 있는지에 대하여 살펴 보겠습니다.

 
 

 

위의 내용이 유익하셨다면, 좋아요와 구독 부탁드립니다.

 

 

감사합니다.

 

728x90
반응형
LIST