티스토리 뷰
참고 : https://blog.naver.com/mbmb7777/222356672936
https://post.naver.com/viewer/postView.naver?volumeNo=28850610&memberNo=21815
PCA로 피쳐를 합성하고, 합성된 피쳐를 이용해 이상탐지를 하는 모델을 구현했다. 여기에 필요한 개념은 마할로노비스 거리이다. 이는 축으로부터 떨어진 거리가 threshold를 넘을 정도로 멀면 이상으로 탐지하는 방법이다.
import pandas as pd
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
# import torch
#TRAIN_DATASET = sorted([x for x in Path("csv 파일 경로").glob("*.csv")])
TRAIN_DATASET = "CSV 파일 경로\\파일.csv"
def dataframe_from_csv(target):
return pd.read_csv(target).rename(columns=lambda x:x.strip())
def dataframe_from_csvs(targets):
return pd.concat([dataframe_from_csv(x) for x in targets])
TRAIN_DF_RAW = dataframe_from_csv(TRAIN_DATASET)
TIMEFIELD = "timestamp"
TRAIN_DF_RAW = TRAIN_DF_RAW.drop(TIMEFIELD,axis=1)
TRAIN_TAG_MIN = TRAIN_DF_RAW[TRAIN_DF_RAW.columns].min()
TRAIN_TAG_MAX = TRAIN_DF_RAW[TRAIN_DF_RAW.columns].max()
def normalize(df, min, max):
ndf = df.copy()
for c in df.columns:
if min[c] == max[c]:
ndf[c] = df[c] - min[c]
else:
ndf[c] = (df[c] - min[c]) / (max[c] - min[c])
return ndf
TRAIN_DF_RAW = normalize(TRAIN_DF_RAW,TRAIN_TAG_MIN,TRAIN_TAG_MAX)
처음 주석된 sorted는 경로 내에 여러 파일을 불러와야할 경우에 사용한다. 그리고 dataframe_from_csvs함수를 사용한다.
지금은 한 파일만으로 사용하니까 dataframe_from_csv함수를 사용한다. csv 내용을 데이터프레임으로 만들고, 필요없는 timefield를 제거하고 정규화를 한다.
from sklearn.decomposition import PCA
import numpy as np
pca = PCA(n_components=5)
pca.fit(TRAIN_DF_RAW)
TRAIN_PCA = pca.transform(TRAIN_DF_RAW)
TRAIN_PCA
PCA로 합성된 피쳐 5개로 만든다.
cov_mat = np.cov(TRAIN_PCA,rowvar=False)
공분산을 만들기 위해 np.cov를 이용해 계산을 한다.
import numpy.linalg as lin
lin.eig(cov_mat)
inv_cov_matrix = np.linalg.inv(cov_mat)
eigenvalue, eigenvector = np.linalg.eig(cov_mat)
mat = np.zeros((5,5))
mat[0][0] = eigenvalue[0]
mat[1][1] = eigenvalue[1]
mat[2][2] = eigenvalue[2]
mat[3][3] = eigenvalue[3]
mat[4][4] = eigenvalue[4]
np.dot(np.dot(eigenvector,mat),eigenvector.T)
eigenvector.shape
def new_coordinates(X, eigenvector):
for i in range(eigenvector.shape[0]):
if i == 0:
new = [X.dot(eigenvector.T[i])]
else:
new = np.concatenate((new,[X.dot(eigenvector.T[i])]),axis=0)
return new.T
new_coordinates(TRAIN_PCA,eigenvector)
np.cov로 만든 공분산을 이용해 eigenvalue와 eigenvector를 계산한다.
def MahalanobisDist(inv_cov_matrix, mean_distr, data, verbose=False):
inv_covariance_matrix = inv_cov_matrix
vars_mean = mean_distr
diff = data - vars_mean
md = []
for i in range(len(diff)):
md.append(np.sqrt(diff[i].dot(inv_covariance_matrix).dot(diff[i])))
return md
마할로노비스 함수를 구현한다.
def MD_threshold(dist,extreme=False, verbose=False):
k=2 if extreme else 2.
threshold = np.mean(dist)*k
return threshold
threshold를 구현한다. threshold가 너무 높거나 낮으면 k 값을 조정해 threshold를 조절할 수 있다.
data_train = np.array(TRAIN_PCA)
inv_cov_matrix = np.linalg.inv(cov_mat)
mean_distr = data_train.mean(axis=0)
dist_train = MahalanobisDist(inv_cov_matrix, mean_distr, data_train, verbose=False)
역공분산을 만들고 train 샘플을 이용해 거리를 계산한다.
threshold = MD_threshold(dist_train,extreme=True)
거리를 기반으로 treshold를 계산한다.
anomaly = pd.DataFrame()
anomaly['Mob dist'] = dist_train
anomaly['Thresh'] = threshold
anomaly['Anomaly'] = anomaly['Mob dist'] > anomaly['Thresh']
이제 위 값들을 기반으로 anomlay 세트를 만든다.
anomaly.plot(figsize=(14,6),color=['green','red'])
시각화한다.
이 때 threshold값이 너무 높거나 낮으면 k값을 조절한다. train 샘플이 모두 정상 데이터 셋이기 때문에 threshold가 높게 있는게 정상이다.
여기까지 train 셋을 이용해 threshold를 잡았다. 이제 test 셋을 이용해 threshold가 적당한지, 적당하면 이상치들을 잘 잡아내는지를 확인해야 한다.
test용 코드들을 똑같은 파일에 작성하고, threshold에 관련된 코드들을 제외하고 test셋이 train셋을 대체하는 코드만 실행한다.
import pandas as pd
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
#TEST_DATASET = sorted([x for x in Path("test csv 경로").glob("*.csv")])
TEST_DATASET = "test csv 경로\\test1.csv"
def dataframe_from_csv(target):
return pd.read_csv(target).rename(columns=lambda x:x.strip())
def dataframe_from_csvs(targets):
return pd.concat([dataframe_from_csv(x) for x in targets])
TEST_DF_RAW = dataframe_from_csv(TEST_DATASET)
TIMEFIELD = "timestamp"
TEST_DF_RAW = TEST_DF_RAW.drop(TIMEFIELD,axis=1)
TEST_TAG_MIN = TEST_DF_RAW[TEST_DF_RAW.columns].min()
TEST_TAG_MAX = TEST_DF_RAW[TEST_DF_RAW.columns].max()
def normalize(df, min, max):
ndf = df.copy()
for c in df.columns:
if min[c] == max[c]:
ndf[c] = df[c] - min[c]
else:
ndf[c] = (df[c] - min[c]) / (max[c] - min[c])
return ndf
TEST_DF_RAW = normalize(TEST_DF_RAW,TEST_TAG_MIN,TEST_TAG_MIN)
from sklearn.decomposition import PCA
import numpy as np
pca = PCA(n_components=5)
pca.fit(TEST_DF_RAW)
TEST_PCA = pca.transform(TEST_DF_RAW)
TEST_PCA
cov_mat = np.cov(TEST_PCA,rowvar=False)
mat = np.zeros((5,5))
mat[0][0] = eigenvalue[0]
mat[1][1] = eigenvalue[1]
mat[2][2] = eigenvalue[2]
mat[3][3] = eigenvalue[3]
mat[4][4] = eigenvalue[4]
np.dot(np.dot(eigenvector,mat),eigenvector.T)
eigenvector.shape
def new_coordinates(X, eigenvector):
for i in range(eigenvector.shape[0]):
if i == 0:
new = [X.dot(eigenvector.T[i])]
else:
new = np.concatenate((new,[X.dot(eigenvector.T[i])]),axis=0)
return new.T
new_coordinates(TEST_PCA,eigenvector)
data_test = np.array(TEST_PCA)
inv_cov_matrix = np.linalg.inv(cov_mat)
mean_distr = data_test.mean(axis=0)
dist_test = MahalanobisDist(inv_cov_matrix, mean_distr, data_test, verbose=False)
anomaly = pd.DataFrame()
anomaly['Mob dist'] = dist_test
anomaly['Thresh'] = threshold
anomaly['Anomaly'] = anomaly['Mob dist'] > anomaly['Thresh']
anomaly.plot(figsize=(14,6),color=['green','red'])
적당하게 이상치를 잡았다.
'머신러닝' 카테고리의 다른 글
머신러닝 / ROC, AUC (0) | 2023.05.18 |
---|---|
머신러닝 / 공분산 행렬 PCA - 1 (1) | 2023.05.14 |
머신러닝 / HAI Baseline (0) | 2023.03.27 |
머신러닝 / 표준화, 정규화, 스케일링 (0) | 2023.01.16 |
머신러닝 / 차원 축소 1편 (0) | 2023.01.14 |
- Total
- Today
- Yesterday
- automotive
- json2html
- many-to-many
- 딥러닝
- SOME/IP
- one-to-many
- Python
- porks
- many-to-one
- 머신러닝
- HTML
- 크로스 엔트로피
- AE
- AVTP
- problem statement
- 회귀
- PCA
- 차량 네트워크
- 이상탐지
- 로지스틱회귀
- 차량용 이더넷
- SVM
- 케라스
- automotive ethernet
- 단순선형회귀
- AVB
- CAN-FD
- Ethernet
- 논문 잘 쓰는법
- cuckoo
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |