- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 인더스트리 4.0과 AI의 융합
- 예측 유지보수 (Predictive Maintenance)
- 컴퓨터 비전 기반 품질 검사
- 디지털 트윈
- 공급망 최적화
- 산업용 로봇 AI: 협동 로봇과 비전 기반 그리핑
- 엣지 AI 배포: TensorRT on Jetson
- 퀴즈
- 마치며
인더스트리 4.0과 AI의 융합
인더스트리 4.0(Industry 4.0)은 사이버 물리 시스템(CPS), IIoT, 클라우드 컴퓨팅, AI가 결합된 4차 산업혁명을 의미합니다. 제조 현장의 모든 장비와 프로세스가 디지털화되고, 실시간 데이터 기반으로 의사결정이 이루어집니다.
핵심 기술 스택은 다음과 같습니다.
- CPS (Cyber-Physical System): 물리 세계와 디지털 세계를 연결하는 통합 시스템
- IIoT (Industrial Internet of Things): 산업용 센서, 액추에이터, 제어 시스템의 네트워크
- OPC-UA (Open Platform Communications Unified Architecture): 제조 장비 간 표준 데이터 교환 프로토콜
- MQTT: 경량 메시지 브로커 프로토콜, IoT 엣지 디바이스에 최적화
- 디지털 트윈: 물리 자산의 실시간 가상 복제본
OPC-UA Python 클라이언트 구현
OPC-UA는 제조 현장에서 PLC, SCADA, MES 간의 데이터 통합 표준입니다. Python으로 OPC-UA 서버에서 센서 데이터를 수집하는 클라이언트를 구현합니다.
from opcua import Client
import pandas as pd
import time
from datetime import datetime
class ManufacturingDataCollector:
def __init__(self, server_url: str):
self.client = Client(server_url)
self.data_buffer = []
def connect(self):
self.client.connect()
print(f"OPC-UA 서버 연결 완료: {self.client.get_endpoints()}")
def read_sensor_nodes(self, node_ids: list) -> dict:
readings = {}
for node_id in node_ids:
node = self.client.get_node(node_id)
value = node.get_value()
readings[node_id] = {
"value": value,
"timestamp": datetime.utcnow().isoformat()
}
return readings
def collect_stream(self, node_ids: list, interval_sec: float = 1.0):
"""실시간 스트리밍 수집"""
while True:
readings = self.read_sensor_nodes(node_ids)
self.data_buffer.append(readings)
time.sleep(interval_sec)
def to_dataframe(self) -> pd.DataFrame:
rows = []
for snapshot in self.data_buffer:
row = {"timestamp": list(snapshot.values())[0]["timestamp"]}
for node_id, data in snapshot.items():
row[node_id] = data["value"]
rows.append(row)
return pd.DataFrame(rows)
def disconnect(self):
self.client.disconnect()
# 사용 예시
collector = ManufacturingDataCollector("opc.tcp://factory-plc:4840/")
collector.connect()
# CNC 머신 센서 노드 ID
sensor_nodes = [
"ns=2;i=1001", # 스핀들 속도 (RPM)
"ns=2;i=1002", # 진동 (g)
"ns=2;i=1003", # 온도 (°C)
"ns=2;i=1004", # 전류 (A)
]
collector.collect_stream(sensor_nodes, interval_sec=0.5)
df = collector.to_dataframe()
df.to_parquet("sensor_data.parquet")
예측 유지보수 (Predictive Maintenance)
예측 유지보수는 장비 고장이 발생하기 전에 이상 징후를 감지하여 계획적 유지보수를 수행하는 전략입니다. 전통적인 사후 정비(Corrective Maintenance)나 주기적 정비(Preventive Maintenance)보다 가동 중단 시간을 최소화하고 비용을 절감합니다.
이상 감지 vs 고장 분류
예측 유지보수 파이프라인은 두 단계로 구성됩니다.
- 이상 감지 (Anomaly Detection): 정상 패턴에서 벗어난 데이터를 탐지. 레이블 없이 비지도 학습으로 수행 가능
- 고장 분류 (Fault Classification): 이상이 감지된 후 고장 유형을 분류. 레이블된 고장 데이터 필요
실제 공장에서는 고장 데이터가 극히 적기 때문에 1단계(이상 감지)를 먼저 구축하고, 이후 수집된 이상 데이터로 2단계 분류기를 학습시키는 방식이 현실적입니다.
Isolation Forest를 이용한 센서 이상 감지
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
def train_anomaly_detector(df: pd.DataFrame, feature_cols: list,
contamination: float = 0.05):
"""
Isolation Forest 기반 이상 감지기 학습
contamination: 예상 이상 비율 (0.05 = 5%)
"""
X = df[feature_cols].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
model = IsolationForest(
n_estimators=200,
contamination=contamination,
max_samples="auto",
random_state=42,
n_jobs=-1
)
model.fit(X_scaled)
# 이상 점수: 낮을수록 이상 (음수)
scores = model.decision_function(X_scaled)
predictions = model.predict(X_scaled) # 1: 정상, -1: 이상
return model, scaler, scores, predictions
def detect_anomalies_realtime(model, scaler, new_data: dict,
feature_cols: list) -> bool:
"""실시간 이상 감지"""
x = np.array([[new_data[col] for col in feature_cols]])
x_scaled = scaler.transform(x)
score = model.decision_function(x_scaled)[0]
prediction = model.predict(x_scaled)[0]
return prediction == -1, score
# 데이터 로드 및 학습
df = pd.read_parquet("sensor_data.parquet")
features = ["spindle_rpm", "vibration_g", "temperature_c", "current_a"]
# 정상 운영 데이터로만 학습
normal_df = df[df["timestamp"] < "2026-01-01"]
model, scaler, scores, preds = train_anomaly_detector(
normal_df, features, contamination=0.03
)
# 이상 구간 시각화
df["anomaly_score"] = model.decision_function(
scaler.transform(df[features].values)
)
df["is_anomaly"] = model.predict(
scaler.transform(df[features].values)
) == -1
anomaly_count = df["is_anomaly"].sum()
print(f"감지된 이상 포인트: {anomaly_count} / {len(df)}")
Autoencoder 기반 이상 감지
딥러닝 기반의 Autoencoder는 복잡한 비선형 패턴을 포착합니다. 정상 데이터로 학습한 뒤, 재구성 오류(Reconstruction Error)가 높은 샘플을 이상으로 판단합니다.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class SensorAutoencoder(nn.Module):
def __init__(self, input_dim: int, latent_dim: int = 8):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def forward(self, x):
z = self.encoder(x)
return self.decoder(z)
def train_autoencoder(X_normal: np.ndarray, epochs: int = 100,
threshold_percentile: float = 95.0):
X_tensor = torch.FloatTensor(X_normal)
dataset = TensorDataset(X_tensor, X_tensor)
loader = DataLoader(dataset, batch_size=256, shuffle=True)
model = SensorAutoencoder(input_dim=X_normal.shape[1])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(epochs):
for x_batch, _ in loader:
recon = model(x_batch)
loss = criterion(recon, x_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 임계값 설정: 정상 데이터 재구성 오류의 95 퍼센타일
with torch.no_grad():
recon = model(X_tensor)
errors = torch.mean((recon - X_tensor) ** 2, dim=1).numpy()
threshold = np.percentile(errors, threshold_percentile)
return model, threshold
LSTM 기반 잔여 유효 수명 (RUL) 예측
RUL(Remaining Useful Life) 예측은 장비가 고장 나기까지 남은 수명을 예측합니다. NASA의 CMAPSS 터보팬 엔진 데이터셋이 벤치마크로 자주 사용됩니다.
import torch
import torch.nn as nn
import numpy as np
class RULPredictor(nn.Module):
"""LSTM 기반 잔여 유효 수명 예측 모델"""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 2, dropout: float = 0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.fc = nn.Sequential(
nn.Linear(hidden_size, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 1)
)
def forward(self, x):
# x: (batch, seq_len, features)
out, _ = self.lstm(x)
# 마지막 타임스텝만 사용
out = self.fc(out[:, -1, :])
return out.squeeze(-1)
def prepare_rul_sequences(df: pd.DataFrame, seq_len: int = 30,
sensor_cols: list = None):
"""슬라이딩 윈도우로 시퀀스 생성"""
sequences, targets = [], []
for engine_id in df["engine_id"].unique():
engine_df = df[df["engine_id"] == engine_id].sort_values("cycle")
max_cycle = engine_df["cycle"].max()
engine_df["rul"] = max_cycle - engine_df["cycle"]
X = engine_df[sensor_cols].values
y = engine_df["rul"].values
for i in range(len(X) - seq_len):
sequences.append(X[i:i + seq_len])
targets.append(y[i + seq_len - 1])
return np.array(sequences), np.array(targets)
컴퓨터 비전 기반 품질 검사
MVTec AD와 One-Class Classification
MVTec AD는 제조 결함 탐지의 표준 벤치마크 데이터셋으로, 15개 산업 카테고리에서 정상과 다양한 결함 유형을 포함합니다. 핵심 특징은 학습 시 정상 이미지만 제공되고, 테스트 시 결함 이미지가 나온다는 점입니다.
One-Class Classification이 유리한 이유: 실제 제조 환경에서 모든 결함 유형을 미리 수집하는 것은 불가능합니다. 정상 제품만으로 학습하고 정상 분포에서 벗어난 것을 결함으로 판단하는 방식이 현실적입니다.
import torch
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import numpy as np
from sklearn.neighbors import NearestNeighbors
class MVTecDataset(Dataset):
def __init__(self, root: str, split: str = "train",
category: str = "bottle"):
self.transform = T.Compose([
T.Resize((256, 256)),
T.CenterCrop(224),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
self.image_paths = []
self.labels = []
base = os.path.join(root, category, split)
for cls_name in os.listdir(base):
label = 0 if cls_name == "good" else 1
cls_dir = os.path.join(base, cls_name)
for fname in os.listdir(cls_dir):
if fname.endswith(".png"):
self.image_paths.append(os.path.join(cls_dir, fname))
self.labels.append(label)
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img = Image.open(self.image_paths[idx]).convert("RGB")
return self.transform(img), self.labels[idx]
class PatchCoreDetector:
"""PatchCore: 사전학습 특징 + k-NN 기반 이상 감지"""
def __init__(self, backbone: str = "resnet50", k: int = 5):
self.model = models.__dict__[backbone](pretrained=True)
# 중간 레이어 특징 추출
self.model = nn.Sequential(*list(self.model.children())[:-2])
self.model.eval()
self.knn = NearestNeighbors(n_neighbors=k, metric="euclidean")
self.memory_bank = None
def extract_features(self, loader: DataLoader) -> np.ndarray:
features = []
with torch.no_grad():
for imgs, _ in loader:
feat = self.model(imgs)
# 공간 평균 풀링
feat = feat.mean(dim=[2, 3]).numpy()
features.append(feat)
return np.concatenate(features, axis=0)
def fit(self, train_loader: DataLoader):
self.memory_bank = self.extract_features(train_loader)
self.knn.fit(self.memory_bank)
print(f"메모리 뱅크 크기: {self.memory_bank.shape}")
def score(self, test_loader: DataLoader) -> np.ndarray:
test_features = self.extract_features(test_loader)
distances, _ = self.knn.kneighbors(test_features)
return distances.mean(axis=1) # 이상 점수
디지털 트윈
디지털 트윈은 물리 자산의 실시간 가상 복제본입니다. NVIDIA Omniverse는 물리 기반 렌더링과 시뮬레이션을 결합한 엔터프라이즈급 디지털 트윈 플랫폼을 제공합니다.
하이브리드 모델: 물리 기반 + ML
순수 물리 모델은 복잡한 비선형 현상을 포착하기 어렵고, 순수 데이터 기반 모델은 물리적 제약을 위반할 수 있습니다. 두 접근법을 결합한 Physics-Informed Neural Network(PINN)이 효과적입니다.
import torch
import torch.nn as nn
class HybridDigitalTwin(nn.Module):
"""
하이브리드 디지털 트윈: 물리 방정식 잔차를 ML로 보정
예: CNC 머신의 열 변형 모델
"""
def __init__(self, physics_input_dim: int, correction_input_dim: int):
super().__init__()
# 물리 기반 파라미터 (학습 가능)
self.thermal_coeff = nn.Parameter(torch.tensor(0.001))
self.damping = nn.Parameter(torch.tensor(0.1))
# ML 보정 네트워크
self.correction_net = nn.Sequential(
nn.Linear(correction_input_dim, 64),
nn.Tanh(),
nn.Linear(64, 32),
nn.Tanh(),
nn.Linear(32, 1)
)
def physics_model(self, temperature: torch.Tensor,
time: torch.Tensor) -> torch.Tensor:
"""단순화된 열 팽창 모델: delta_L = alpha * L0 * delta_T"""
delta_T = temperature - 20.0 # 기준 온도 20°C
return self.thermal_coeff * delta_T * torch.exp(-self.damping * time)
def forward(self, temperature: torch.Tensor, time: torch.Tensor,
context_features: torch.Tensor) -> torch.Tensor:
physics_pred = self.physics_model(temperature, time)
correction = self.correction_net(context_features)
return physics_pred + correction
def train_digital_twin(model, train_loader, epochs: int = 200,
physics_weight: float = 0.1):
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
mse = nn.MSELoss()
for epoch in range(epochs):
for batch in train_loader:
temp, time_val, context, target = batch
pred = model(temp, time_val, context)
# 데이터 손실
data_loss = mse(pred, target)
# 물리적 제약: 온도가 낮으면 변형도 낮아야 함
cold_mask = temp < 15.0
physics_loss = torch.mean(
torch.relu(pred[cold_mask]) # 낮은 온도에서 양의 변형 패널티
) if cold_mask.any() else torch.tensor(0.0)
loss = data_loss + physics_weight * physics_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
공급망 최적화
OR-Tools를 이용한 차량 경로 최적화 (VRP)
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import numpy as np
def create_distance_matrix(locations: list) -> list:
"""유클리드 거리 행렬 생성"""
n = len(locations)
matrix = []
for i in range(n):
row = []
for j in range(n):
if i == j:
row.append(0)
else:
dx = locations[i][0] - locations[j][0]
dy = locations[i][1] - locations[j][1]
row.append(int(np.sqrt(dx**2 + dy**2) * 100))
matrix.append(row)
return matrix
def solve_vrp(distance_matrix: list, num_vehicles: int,
demands: list, vehicle_capacity: int) -> dict:
"""
용량 제약 차량 경로 문제 (CVRP) 풀기
OR-Tools 사용
"""
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix), num_vehicles, 0 # depot=0
)
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_idx, to_idx):
from_node = manager.IndexToNode(from_idx)
to_node = manager.IndexToNode(to_idx)
return distance_matrix[from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# 용량 제약
def demand_callback(idx):
node = manager.IndexToNode(idx)
return demands[node]
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_callback_index, 0,
[vehicle_capacity] * num_vehicles,
True, "Capacity"
)
search_params = pywrapcp.DefaultRoutingSearchParameters()
search_params.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_params.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
search_params.time_limit.seconds = 30
solution = routing.SolveWithParameters(search_params)
routes = {}
if solution:
for vehicle_id in range(num_vehicles):
index = routing.Start(vehicle_id)
route = []
while not routing.IsEnd(index):
route.append(manager.IndexToNode(index))
index = solution.Value(routing.NextVar(index))
routes[vehicle_id] = route
return routes
산업용 로봇 AI: 협동 로봇과 비전 기반 그리핑
협동 로봇(코봇, Collaborative Robot)은 사람과 같은 공간에서 안전하게 작동하며, 유연한 제조 환경에 적합합니다. 비전 기반 그리핑은 3D 카메라와 딥러닝을 결합하여 임의 자세의 물체를 파지합니다.
Sim-to-Real Transfer는 시뮬레이터에서 학습한 정책을 실제 로봇에 적용하는 기법입니다. 시뮬레이션에서 물리 파라미터(마찰, 질량, 조명)를 무작위로 변화시키는 도메인 랜덤화(Domain Randomization)가 핵심입니다.
엣지 AI 배포: TensorRT on Jetson
제조 현장에서는 클라우드 레이턴시 없이 실시간 추론이 필요합니다. NVIDIA Jetson(AGX Orin, Orin NX)은 TensorRT를 사용하여 AI 모델을 엣지에서 가속화합니다.
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path: str,
max_batch_size: int = 1,
fp16: bool = True) -> trt.ICudaEngine:
"""ONNX 모델을 TensorRT 엔진으로 변환"""
with trt.Builder(TRT_LOGGER) as builder:
network_flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
with builder.create_network(network_flags) as network:
with trt.OnnxParser(network, TRT_LOGGER) as parser:
with open(onnx_path, "rb") as f:
parser.parse(f.read())
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
if fp16 and builder.platform_has_fast_fp16:
config.set_flag(trt.BuilderFlag.FP16)
engine = builder.build_engine(network, config)
return engine
class TRTInference:
def __init__(self, engine: trt.ICudaEngine):
self.engine = engine
self.context = engine.create_execution_context()
self.bindings = []
self.host_inputs, self.device_inputs = [], []
self.host_outputs, self.device_outputs = [], []
for binding in engine:
size = trt.volume(engine.get_binding_shape(binding))
dtype = trt.nptype(engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if engine.binding_is_input(binding):
self.host_inputs.append(host_mem)
self.device_inputs.append(device_mem)
else:
self.host_outputs.append(host_mem)
self.device_outputs.append(device_mem)
def infer(self, input_data: np.ndarray) -> np.ndarray:
np.copyto(self.host_inputs[0], input_data.ravel())
stream = cuda.Stream()
cuda.memcpy_htod_async(
self.device_inputs[0], self.host_inputs[0], stream
)
self.context.execute_async_v2(
bindings=self.bindings, stream_handle=stream.handle
)
cuda.memcpy_dtoh_async(
self.host_outputs[0], self.device_outputs[0], stream
)
stream.synchronize()
return self.host_outputs[0]
퀴즈
Q1. 예측 유지보수에서 이상 감지와 고장 분류의 차이는?
정답: 이상 감지는 정상 패턴 이탈 탐지(비지도), 고장 분류는 이상 유형 레이블링(지도)
설명: 실제 공장에서 고장 데이터는 극히 희소합니다. 이상 감지는 정상 데이터만으로 비지도 학습이 가능하여 먼저 구축되고, 고장 분류는 이후 수집된 이상 데이터로 지도 학습 모델을 추가합니다. 두 단계의 역할: 이상 감지는 알람(경보), 고장 분류는 유지보수 유형 결정(진단)입니다.
Q2. MVTec AD에서 one-class classification이 유리한 이유는?
정답: 제조 현장에서 모든 결함 유형을 사전에 수집하는 것이 불가능하기 때문
설명: Binary classification은 정상과 결함 양 클래스 데이터가 필요합니다. 하지만 실제 제조에서 결함은 예측 불가능하고 다양하며, 발생 전 수집이 불가능합니다. One-class classification은 정상 제품만으로 정상 분포를 모델링하고, 새로운 결함 유형도 정상 분포 이탈로 탐지합니다. MVTec AD의 PatchCore, PaDiM 등이 이 방식을 따릅니다.
Q3. 디지털 트윈의 하이브리드 접근법 장점은?
정답: 물리적 제약 준수 + 데이터 효율성 + 미관측 영역 외삽 능력
설명: 순수 물리 모델은 복잡한 비선형 현상(마찰, 난류)을 정확히 모델링하기 어렵습니다. 순수 ML 모델은 물리적 법칙을 위반하는 예측을 할 수 있고, 학습 분포 밖에서 신뢰성이 낮습니다. PINN(Physics-Informed Neural Network)은 손실 함수에 물리 방정식 잔차를 추가하여 두 단점을 보완합니다.
Q4. OPC-UA가 제조 현장 표준으로 선택되는 이유는?
정답: 플랫폼 독립성, 보안(TLS 내장), 의미론적 데이터 모델, 실시간 구독 지원
설명: OPC-UA는 특정 OS나 하드웨어에 종속되지 않아 PLC, SCADA, MES, ERP 간 통합이 용이합니다. TLS 기반 암호화와 인증이 내장되어 있어 보안 요구사항을 충족합니다. 단순 데이터 전송을 넘어 데이터 타입, 관계, 메서드를 포함한 정보 모델을 제공하며, Pub/Sub 메커니즘으로 실시간 모니터링도 지원합니다.
Q5. 코봇 비전 기반 그리핑에서 sim-to-real transfer가 중요한 이유는?
정답: 실제 로봇에서의 데이터 수집이 위험하고 비용이 크기 때문에 시뮬레이션 학습 후 실제 적용
설명: 실제 로봇 학습에는 수천 번의 시도가 필요하고, 장비 손상과 안전 위험이 따릅니다. Isaac Sim이나 PyBullet 같은 시뮬레이터에서 도메인 랜덤화(조명, 질감, 마찰, 카메라 노이즈)를 적용하면 실제 환경의 다양성을 커버합니다. 이를 통해 학습된 그리핑 정책이 새로운 물체나 조명 조건에도 일반화됩니다.
마치며
인더스트리 4.0의 AI 적용은 예측 유지보수, 품질 검사, 디지털 트윈, 공급망 최적화, 로봇 자동화, 엣지 AI 배포의 6개 핵심 영역으로 구성됩니다. 각 영역은 독립적으로 적용할 수 있지만, 통합 스마트 팩토리 플랫폼으로 연결될 때 시너지 효과가 극대화됩니다. OPC-UA 기반 데이터 통합과 엣지-클라우드 하이브리드 아키텍처가 이 모든 것의 기반입니다.