- Published on
AI Manufacturing & Industry 4.0: Predictive Maintenance, Digital Twins, and Quality Inspection AI
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- The Convergence of Industry 4.0 and AI
- Predictive Maintenance
- Computer Vision-Based Quality Inspection
- Digital Twin
- Supply Chain Optimization
- Industrial Robot AI: Collaborative Robots and Vision-Based Grasping
- Edge AI Deployment: TensorRT on Jetson
- Quiz
- Conclusion
The Convergence of Industry 4.0 and AI
Industry 4.0 refers to the fourth industrial revolution, which combines Cyber-Physical Systems (CPS), IIoT, cloud computing, and AI. Every piece of equipment and process on the factory floor becomes digitized, and decisions are made in real time based on data.
The core technology stack consists of the following:
- CPS (Cyber-Physical System): An integrated system that connects the physical world with the digital world
- IIoT (Industrial Internet of Things): A network of industrial sensors, actuators, and control systems
- OPC-UA (Open Platform Communications Unified Architecture): The standard data exchange protocol between manufacturing equipment
- MQTT: A lightweight message broker protocol optimized for IoT edge devices
- Digital Twin: A real-time virtual replica of a physical asset
OPC-UA Python Client Implementation
OPC-UA is the data integration standard between PLCs, SCADA systems, and MES in manufacturing environments. The following implements a Python client that collects sensor data from an OPC-UA server.
from opcua import Client
import pandas as pd
import time
from datetime import datetime
class ManufacturingDataCollector:
def __init__(self, server_url: str):
self.client = Client(server_url)
self.data_buffer = []
def connect(self):
self.client.connect()
print(f"OPC-UA server connected: {self.client.get_endpoints()}")
def read_sensor_nodes(self, node_ids: list) -> dict:
readings = {}
for node_id in node_ids:
node = self.client.get_node(node_id)
value = node.get_value()
readings[node_id] = {
"value": value,
"timestamp": datetime.utcnow().isoformat()
}
return readings
def collect_stream(self, node_ids: list, interval_sec: float = 1.0):
"""Real-time streaming collection"""
while True:
readings = self.read_sensor_nodes(node_ids)
self.data_buffer.append(readings)
time.sleep(interval_sec)
def to_dataframe(self) -> pd.DataFrame:
rows = []
for snapshot in self.data_buffer:
row = {"timestamp": list(snapshot.values())[0]["timestamp"]}
for node_id, data in snapshot.items():
row[node_id] = data["value"]
rows.append(row)
return pd.DataFrame(rows)
def disconnect(self):
self.client.disconnect()
# Usage example
collector = ManufacturingDataCollector("opc.tcp://factory-plc:4840/")
collector.connect()
# CNC machine sensor node IDs
sensor_nodes = [
"ns=2;i=1001", # Spindle speed (RPM)
"ns=2;i=1002", # Vibration (g)
"ns=2;i=1003", # Temperature (°C)
"ns=2;i=1004", # Current (A)
]
collector.collect_stream(sensor_nodes, interval_sec=0.5)
df = collector.to_dataframe()
df.to_parquet("sensor_data.parquet")
Predictive Maintenance
Predictive maintenance is a strategy that detects early signs of equipment failure before it occurs and performs planned maintenance. It minimizes downtime and reduces costs compared to traditional corrective maintenance or time-based preventive maintenance.
Anomaly Detection vs. Fault Classification
A predictive maintenance pipeline consists of two stages:
- Anomaly Detection: Detecting data that deviates from normal patterns. Can be performed with unsupervised learning without labels
- Fault Classification: Classifying the type of fault after an anomaly is detected. Requires labeled fault data
In a real factory, fault data is extremely scarce, so the practical approach is to first build Stage 1 (anomaly detection), then train a Stage 2 classifier on the collected anomaly data.
Sensor Anomaly Detection with Isolation Forest
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
def train_anomaly_detector(df: pd.DataFrame, feature_cols: list,
contamination: float = 0.05):
"""
Isolation Forest-based anomaly detector training
contamination: expected anomaly ratio (0.05 = 5%)
"""
X = df[feature_cols].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
model = IsolationForest(
n_estimators=200,
contamination=contamination,
max_samples="auto",
random_state=42,
n_jobs=-1
)
model.fit(X_scaled)
# Anomaly score: lower means more anomalous (negative)
scores = model.decision_function(X_scaled)
predictions = model.predict(X_scaled) # 1: normal, -1: anomaly
return model, scaler, scores, predictions
def detect_anomalies_realtime(model, scaler, new_data: dict,
feature_cols: list) -> bool:
"""Real-time anomaly detection"""
x = np.array([[new_data[col] for col in feature_cols]])
x_scaled = scaler.transform(x)
score = model.decision_function(x_scaled)[0]
prediction = model.predict(x_scaled)[0]
return prediction == -1, score
# Load data and train
df = pd.read_parquet("sensor_data.parquet")
features = ["spindle_rpm", "vibration_g", "temperature_c", "current_a"]
# Train only on normal operation data
normal_df = df[df["timestamp"] < "2026-01-01"]
model, scaler, scores, preds = train_anomaly_detector(
normal_df, features, contamination=0.03
)
# Visualize anomaly intervals
df["anomaly_score"] = model.decision_function(
scaler.transform(df[features].values)
)
df["is_anomaly"] = model.predict(
scaler.transform(df[features].values)
) == -1
anomaly_count = df["is_anomaly"].sum()
print(f"Detected anomaly points: {anomaly_count} / {len(df)}")
Autoencoder-Based Anomaly Detection
A deep learning-based Autoencoder captures complex nonlinear patterns. After training on normal data, samples with high reconstruction error are flagged as anomalies.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class SensorAutoencoder(nn.Module):
def __init__(self, input_dim: int, latent_dim: int = 8):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def forward(self, x):
z = self.encoder(x)
return self.decoder(z)
def train_autoencoder(X_normal: np.ndarray, epochs: int = 100,
threshold_percentile: float = 95.0):
X_tensor = torch.FloatTensor(X_normal)
dataset = TensorDataset(X_tensor, X_tensor)
loader = DataLoader(dataset, batch_size=256, shuffle=True)
model = SensorAutoencoder(input_dim=X_normal.shape[1])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(epochs):
for x_batch, _ in loader:
recon = model(x_batch)
loss = criterion(recon, x_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Set threshold: 95th percentile of reconstruction error on normal data
with torch.no_grad():
recon = model(X_tensor)
errors = torch.mean((recon - X_tensor) ** 2, dim=1).numpy()
threshold = np.percentile(errors, threshold_percentile)
return model, threshold
LSTM-Based Remaining Useful Life (RUL) Prediction
RUL (Remaining Useful Life) prediction estimates how much life an asset has left before failure. NASA's CMAPSS turbofan engine dataset is commonly used as a benchmark.
import torch
import torch.nn as nn
import numpy as np
class RULPredictor(nn.Module):
"""LSTM-based Remaining Useful Life prediction model"""
def __init__(self, input_size: int, hidden_size: int = 128,
num_layers: int = 2, dropout: float = 0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.fc = nn.Sequential(
nn.Linear(hidden_size, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 1)
)
def forward(self, x):
# x: (batch, seq_len, features)
out, _ = self.lstm(x)
# Use only the last time step
out = self.fc(out[:, -1, :])
return out.squeeze(-1)
def prepare_rul_sequences(df: pd.DataFrame, seq_len: int = 30,
sensor_cols: list = None):
"""Generate sequences with a sliding window"""
sequences, targets = [], []
for engine_id in df["engine_id"].unique():
engine_df = df[df["engine_id"] == engine_id].sort_values("cycle")
max_cycle = engine_df["cycle"].max()
engine_df["rul"] = max_cycle - engine_df["cycle"]
X = engine_df[sensor_cols].values
y = engine_df["rul"].values
for i in range(len(X) - seq_len):
sequences.append(X[i:i + seq_len])
targets.append(y[i + seq_len - 1])
return np.array(sequences), np.array(targets)
Computer Vision-Based Quality Inspection
MVTec AD and One-Class Classification
MVTec AD is the standard benchmark dataset for manufacturing defect detection, containing normal and various defect types across 15 industrial categories. The key characteristic is that only normal images are provided during training, and defective images appear at test time.
Why One-Class Classification is Advantageous: In real manufacturing environments, it is impossible to collect all defect types in advance. The practical approach is to model the normal distribution using only normal products and flag anything that deviates from it as a defect.
import torch
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import numpy as np
from sklearn.neighbors import NearestNeighbors
class MVTecDataset(Dataset):
def __init__(self, root: str, split: str = "train",
category: str = "bottle"):
self.transform = T.Compose([
T.Resize((256, 256)),
T.CenterCrop(224),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
self.image_paths = []
self.labels = []
base = os.path.join(root, category, split)
for cls_name in os.listdir(base):
label = 0 if cls_name == "good" else 1
cls_dir = os.path.join(base, cls_name)
for fname in os.listdir(cls_dir):
if fname.endswith(".png"):
self.image_paths.append(os.path.join(cls_dir, fname))
self.labels.append(label)
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img = Image.open(self.image_paths[idx]).convert("RGB")
return self.transform(img), self.labels[idx]
class PatchCoreDetector:
"""PatchCore: pretrained features + k-NN-based anomaly detection"""
def __init__(self, backbone: str = "resnet50", k: int = 5):
self.model = models.__dict__[backbone](pretrained=True)
# Extract intermediate layer features
self.model = nn.Sequential(*list(self.model.children())[:-2])
self.model.eval()
self.knn = NearestNeighbors(n_neighbors=k, metric="euclidean")
self.memory_bank = None
def extract_features(self, loader: DataLoader) -> np.ndarray:
features = []
with torch.no_grad():
for imgs, _ in loader:
feat = self.model(imgs)
# Spatial average pooling
feat = feat.mean(dim=[2, 3]).numpy()
features.append(feat)
return np.concatenate(features, axis=0)
def fit(self, train_loader: DataLoader):
self.memory_bank = self.extract_features(train_loader)
self.knn.fit(self.memory_bank)
print(f"Memory bank size: {self.memory_bank.shape}")
def score(self, test_loader: DataLoader) -> np.ndarray:
test_features = self.extract_features(test_loader)
distances, _ = self.knn.kneighbors(test_features)
return distances.mean(axis=1) # anomaly score
Digital Twin
A digital twin is a real-time virtual replica of a physical asset. NVIDIA Omniverse provides an enterprise-grade digital twin platform that combines physics-based rendering with simulation.
Hybrid Model: Physics-Based + ML
A pure physics model struggles to capture complex nonlinear phenomena, while a pure data-driven model may violate physical constraints. Physics-Informed Neural Networks (PINNs), which combine both approaches, are effective here.
import torch
import torch.nn as nn
class HybridDigitalTwin(nn.Module):
"""
Hybrid digital twin: physics equation residuals corrected by ML
Example: thermal deformation model for a CNC machine
"""
def __init__(self, physics_input_dim: int, correction_input_dim: int):
super().__init__()
# Physics-based parameters (learnable)
self.thermal_coeff = nn.Parameter(torch.tensor(0.001))
self.damping = nn.Parameter(torch.tensor(0.1))
# ML correction network
self.correction_net = nn.Sequential(
nn.Linear(correction_input_dim, 64),
nn.Tanh(),
nn.Linear(64, 32),
nn.Tanh(),
nn.Linear(32, 1)
)
def physics_model(self, temperature: torch.Tensor,
time: torch.Tensor) -> torch.Tensor:
"""Simplified thermal expansion model: delta_L = alpha * L0 * delta_T"""
delta_T = temperature - 20.0 # reference temperature 20°C
return self.thermal_coeff * delta_T * torch.exp(-self.damping * time)
def forward(self, temperature: torch.Tensor, time: torch.Tensor,
context_features: torch.Tensor) -> torch.Tensor:
physics_pred = self.physics_model(temperature, time)
correction = self.correction_net(context_features)
return physics_pred + correction
def train_digital_twin(model, train_loader, epochs: int = 200,
physics_weight: float = 0.1):
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
mse = nn.MSELoss()
for epoch in range(epochs):
for batch in train_loader:
temp, time_val, context, target = batch
pred = model(temp, time_val, context)
# Data loss
data_loss = mse(pred, target)
# Physical constraint: low temperature should produce low deformation
cold_mask = temp < 15.0
physics_loss = torch.mean(
torch.relu(pred[cold_mask]) # penalty for positive deformation at low temp
) if cold_mask.any() else torch.tensor(0.0)
loss = data_loss + physics_weight * physics_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
Supply Chain Optimization
Vehicle Route Optimization (VRP) with OR-Tools
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import numpy as np
def create_distance_matrix(locations: list) -> list:
"""Create a Euclidean distance matrix"""
n = len(locations)
matrix = []
for i in range(n):
row = []
for j in range(n):
if i == j:
row.append(0)
else:
dx = locations[i][0] - locations[j][0]
dy = locations[i][1] - locations[j][1]
row.append(int(np.sqrt(dx**2 + dy**2) * 100))
matrix.append(row)
return matrix
def solve_vrp(distance_matrix: list, num_vehicles: int,
demands: list, vehicle_capacity: int) -> dict:
"""
Solve the Capacitated Vehicle Routing Problem (CVRP)
using OR-Tools
"""
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix), num_vehicles, 0 # depot=0
)
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_idx, to_idx):
from_node = manager.IndexToNode(from_idx)
to_node = manager.IndexToNode(to_idx)
return distance_matrix[from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# Capacity constraint
def demand_callback(idx):
node = manager.IndexToNode(idx)
return demands[node]
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_callback_index, 0,
[vehicle_capacity] * num_vehicles,
True, "Capacity"
)
search_params = pywrapcp.DefaultRoutingSearchParameters()
search_params.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_params.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
search_params.time_limit.seconds = 30
solution = routing.SolveWithParameters(search_params)
routes = {}
if solution:
for vehicle_id in range(num_vehicles):
index = routing.Start(vehicle_id)
route = []
while not routing.IsEnd(index):
route.append(manager.IndexToNode(index))
index = solution.Value(routing.NextVar(index))
routes[vehicle_id] = route
return routes
Industrial Robot AI: Collaborative Robots and Vision-Based Grasping
Collaborative robots (cobots) operate safely in the same space as humans and are well suited to flexible manufacturing environments. Vision-based grasping combines 3D cameras with deep learning to grasp objects in arbitrary poses.
Sim-to-Real Transfer is a technique that applies policies learned in a simulator to a real robot. Domain Randomization — randomly varying physical parameters (friction, mass, lighting) in simulation — is the key enabler.
Edge AI Deployment: TensorRT on Jetson
Manufacturing environments require real-time inference without cloud latency. NVIDIA Jetson (AGX Orin, Orin NX) uses TensorRT to accelerate AI models at the edge.
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path: str,
max_batch_size: int = 1,
fp16: bool = True) -> trt.ICudaEngine:
"""Convert an ONNX model to a TensorRT engine"""
with trt.Builder(TRT_LOGGER) as builder:
network_flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
with builder.create_network(network_flags) as network:
with trt.OnnxParser(network, TRT_LOGGER) as parser:
with open(onnx_path, "rb") as f:
parser.parse(f.read())
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
if fp16 and builder.platform_has_fast_fp16:
config.set_flag(trt.BuilderFlag.FP16)
engine = builder.build_engine(network, config)
return engine
class TRTInference:
def __init__(self, engine: trt.ICudaEngine):
self.engine = engine
self.context = engine.create_execution_context()
self.bindings = []
self.host_inputs, self.device_inputs = [], []
self.host_outputs, self.device_outputs = [], []
for binding in engine:
size = trt.volume(engine.get_binding_shape(binding))
dtype = trt.nptype(engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if engine.binding_is_input(binding):
self.host_inputs.append(host_mem)
self.device_inputs.append(device_mem)
else:
self.host_outputs.append(host_mem)
self.device_outputs.append(device_mem)
def infer(self, input_data: np.ndarray) -> np.ndarray:
np.copyto(self.host_inputs[0], input_data.ravel())
stream = cuda.Stream()
cuda.memcpy_htod_async(
self.device_inputs[0], self.host_inputs[0], stream
)
self.context.execute_async_v2(
bindings=self.bindings, stream_handle=stream.handle
)
cuda.memcpy_dtoh_async(
self.host_outputs[0], self.device_outputs[0], stream
)
stream.synchronize()
return self.host_outputs[0]
Quiz
Q1. What is the difference between anomaly detection and fault classification in predictive maintenance?
Answer: Anomaly detection identifies deviations from normal patterns (unsupervised); fault classification labels the type of anomaly (supervised).
Explanation: In a real factory, fault data is extremely scarce. Anomaly detection can be trained in an unsupervised manner using only normal data, so it is built first. Fault classification then adds a supervised model trained on the collected anomaly data. The roles of the two stages: anomaly detection raises an alarm (alert), while fault classification determines the type of maintenance required (diagnosis).
Q2. Why is one-class classification advantageous for MVTec AD?
Answer: Because it is impossible to collect all defect types in advance in a manufacturing environment.
Explanation: Binary classification requires data from both normal and defect classes. In real manufacturing, however, defects are unpredictable, varied, and cannot be collected before they occur. One-class classification models the normal distribution using only normal products and detects any new defect type as a deviation from that distribution. PatchCore and PaDiM on MVTec AD follow this approach.
Q3. What are the advantages of the hybrid approach to digital twins?
Answer: Physical constraint compliance + data efficiency + extrapolation capability to unobserved regions.
Explanation: Pure physics models struggle to accurately model complex nonlinear phenomena (friction, turbulence). Pure ML models can make predictions that violate physical laws and are unreliable outside the training distribution. PINNs (Physics-Informed Neural Networks) address both shortcomings by adding physics equation residuals to the loss function.
Q4. Why is OPC-UA chosen as the standard in manufacturing environments?
Answer: Platform independence, security (built-in TLS), semantic data model, and real-time subscription support.
Explanation: OPC-UA is not tied to any specific OS or hardware, making integration between PLCs, SCADA, MES, and ERP straightforward. TLS-based encryption and authentication are built in, satisfying security requirements. Beyond simple data transfer, it provides an information model including data types, relationships, and methods, and also supports real-time monitoring via a Pub/Sub mechanism.
Q5. Why is sim-to-real transfer important for cobot vision-based grasping?
Answer: Because collecting data on a real robot is dangerous and costly, learning is done in simulation first and then applied to the real robot.
Explanation: Training a real robot requires thousands of attempts and carries risks of equipment damage and safety hazards. Applying domain randomization (lighting, texture, friction, camera noise) in simulators such as Isaac Sim or PyBullet covers the diversity of real-world environments. This allows the learned grasping policy to generalize to new objects or lighting conditions.
Conclusion
AI applications in Industry 4.0 span six core areas: predictive maintenance, quality inspection, digital twins, supply chain optimization, robotic automation, and edge AI deployment. Each area can be applied independently, but synergies are maximized when they are connected into an integrated smart factory platform. OPC-UA-based data integration and an edge-cloud hybrid architecture form the foundation of it all.