Split View: 그래프 신경망(GNN) 완전 정복: GCN, GAT, GraphSAGE부터 분자 설계까지
그래프 신경망(GNN) 완전 정복: GCN, GAT, GraphSAGE부터 분자 설계까지
그래프 신경망(GNN) 완전 정복 가이드
소셜 네트워크, 분자 구조, 지식 그래프, 추천 시스템 — 현실 세계의 수많은 데이터는 그래프 형태로 표현됩니다. 그래프 신경망(Graph Neural Network, GNN)은 이런 비유클리드적 데이터를 딥러닝으로 처리하는 핵심 도구입니다. 이 가이드에서는 그래프 이론의 기초부터 최신 GNN 아키텍처, PyTorch Geometric을 이용한 실전 구현까지 체계적으로 다룹니다.
1. 그래프 이론 기초
그래프의 정의
그래프 G는 노드(Node) 집합 V와 엣지(Edge) 집합 E로 구성됩니다. 즉 G = (V, E)로 표현합니다. 노드는 개체(entity)를 나타내고, 엣지는 개체 간의 관계를 나타냅니다.
- 노드(Vertex/Node): 개체를 표현. 예: 사용자, 원자, 논문
- 엣지(Edge): 관계를 표현. 예: 친구 관계, 화학 결합, 인용 관계
- 노드 특성(Node Feature): 각 노드에 연결된 특성 벡터
- 엣지 특성(Edge Feature): 각 엣지에 연결된 특성 벡터
방향/무방향 그래프
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
# 무방향 그래프 (Undirected Graph)
G_undirected = nx.Graph()
G_undirected.add_edges_from([(0, 1), (1, 2), (2, 3), (3, 0), (0, 2)])
# 방향 그래프 (Directed Graph)
G_directed = nx.DiGraph()
G_directed.add_edges_from([(0, 1), (1, 2), (2, 0), (0, 3)])
print(f"무방향 그래프 - 노드: {G_undirected.number_of_nodes()}, 엣지: {G_undirected.number_of_edges()}")
print(f"방향 그래프 - 노드: {G_directed.number_of_nodes()}, 엣지: {G_directed.number_of_edges()}")
인접 행렬과 엣지 리스트
import torch
import numpy as np
# 인접 행렬 (Adjacency Matrix)
# A[i][j] = 1이면 노드 i와 j 사이에 엣지 존재
adj_matrix = torch.tensor([
[0, 1, 1, 0],
[1, 0, 1, 0],
[1, 1, 0, 1],
[0, 0, 1, 0]
], dtype=torch.float32)
# 엣지 리스트 (Edge Index) - PyG에서 사용하는 형식
# shape: (2, num_edges) - 첫 번째 행: 소스 노드, 두 번째 행: 타겟 노드
edge_index = torch.tensor([
[0, 0, 1, 1, 2, 2, 2, 3], # 소스 노드
[1, 2, 0, 2, 0, 1, 3, 2] # 타겟 노드
], dtype=torch.long)
print(f"인접 행렬 크기: {adj_matrix.shape}") # (4, 4)
print(f"엣지 리스트 크기: {edge_index.shape}") # (2, 8)
# 인접 행렬 -> 엣지 리스트 변환
def adj_to_edge_index(adj):
"""인접 행렬을 엣지 인덱스로 변환"""
row, col = torch.where(adj > 0)
return torch.stack([row, col], dim=0)
converted = adj_to_edge_index(adj_matrix)
print(f"변환된 엣지 리스트:\n{converted}")
그래프 특성
import networkx as nx
import numpy as np
def analyze_graph(G):
"""그래프의 주요 특성 분석"""
# 차수 (Degree)
degrees = dict(G.degree())
avg_degree = np.mean(list(degrees.values()))
# 클러스터링 계수 (Clustering Coefficient)
clustering = nx.average_clustering(G)
# 평균 경로 길이 (Average Path Length)
if nx.is_connected(G):
avg_path = nx.average_shortest_path_length(G)
else:
# 연결된 컴포넌트 중 가장 큰 것 사용
largest_cc = max(nx.connected_components(G), key=len)
subgraph = G.subgraph(largest_cc)
avg_path = nx.average_shortest_path_length(subgraph)
# 중심성 (Centrality)
betweenness = nx.betweenness_centrality(G)
pagerank = nx.pagerank(G)
print(f"노드 수: {G.number_of_nodes()}")
print(f"엣지 수: {G.number_of_edges()}")
print(f"평균 차수: {avg_degree:.2f}")
print(f"클러스터링 계수: {clustering:.3f}")
print(f"평균 경로 길이: {avg_path:.2f}")
return {
"degrees": degrees,
"clustering": clustering,
"avg_path": avg_path,
"betweenness": betweenness,
"pagerank": pagerank
}
# 소셜 네트워크 예시 (Karate Club)
G = nx.karate_club_graph()
stats = analyze_graph(G)
현실 세계의 그래프
| 도메인 | 노드 | 엣지 | 태스크 |
|---|---|---|---|
| 소셜 네트워크 | 사용자 | 친구 관계 | 커뮤니티 탐지 |
| 분자 구조 | 원자 | 화학 결합 | 분자 특성 예측 |
| 지식 그래프 | 개체 | 관계 | 링크 예측 |
| 인용 네트워크 | 논문 | 인용 관계 | 노드 분류 |
| 교통 네트워크 | 교차로 | 도로 | 경로 예측 |
| 추천 시스템 | 사용자/아이템 | 상호작용 | 추천 |
2. 그래프 머신러닝 동기
왜 CNN/RNN이 부족한가?
기존의 CNN은 격자(grid) 구조를 전제로 합니다. 이미지는 픽셀이 규칙적인 2D 격자에 배치되어 있어서 합성곱 연산이 자연스럽게 작동합니다. RNN은 시퀀스(sequence) 구조를 가정합니다.
하지만 그래프는:
- 비규칙적 구조: 각 노드의 이웃 수가 다름
- 순서가 없음: 노드의 순열 불변성(Permutation Invariance)
- 전역적 의존성: 멀리 떨어진 노드도 영향을 줄 수 있음
# 그래프 데이터의 특성 설명
# 이미지: 고정 크기 격자
image = torch.randn(3, 224, 224) # 채널, 높이, 너비
# 시퀀스: 순서 있는 데이터
sequence = torch.randn(100, 512) # 시퀀스 길이, 특성 차원
# 그래프: 가변적 이웃 구조
# 노드 특성: (num_nodes, feature_dim)
node_features = torch.randn(34, 16) # 34개 노드, 16차원 특성
# 엣지: (2, num_edges) - 희소 연결
edge_index = torch.randint(0, 34, (2, 78))
메시지 패싱 패러다임
모든 GNN의 기본 원리는 메시지 패싱(Message Passing)입니다. 각 노드는 이웃 노드로부터 메시지를 받아 자신의 표현을 업데이트합니다.
메시지 패싱 신경망(MPNN) 프레임워크:
- 메시지 계산: 엣지 (u, v)에 대해 노드 u에서 v로 전달할 메시지 계산
- 집계: 각 노드가 모든 이웃 메시지를 합산
- 업데이트: 집계된 메시지로 노드 표현 업데이트
m_v^(l) = AGGREGATE({h_u^(l-1) : u in N(v)})
h_v^(l) = UPDATE(h_v^(l-1), m_v^(l))
여기서 N(v)는 노드 v의 이웃 집합입니다.
3. GNN 기초 수식
집계(Aggregation)와 업데이트(Update)
import torch
import torch.nn as nn
from torch_scatter import scatter_mean, scatter_sum, scatter_max
def manual_message_passing(node_features, edge_index, aggregation="mean"):
"""
수동으로 구현한 메시지 패싱
node_features: (N, F) - N개 노드, F차원 특성
edge_index: (2, E) - E개 엣지
"""
src, dst = edge_index[0], edge_index[1]
num_nodes = node_features.size(0)
# 소스 노드의 특성을 메시지로 사용
messages = node_features[src] # (E, F)
if aggregation == "mean":
# 목적 노드별 평균
aggregated = scatter_mean(messages, dst, dim=0, dim_size=num_nodes)
elif aggregation == "sum":
aggregated = scatter_sum(messages, dst, dim=0, dim_size=num_nodes)
elif aggregation == "max":
aggregated, _ = scatter_max(messages, dst, dim=0, dim_size=num_nodes)
# 업데이트: 원래 특성 + 집계된 메시지
updated = node_features + aggregated
return updated
# 예시
N, F = 6, 8
node_features = torch.randn(N, F)
edge_index = torch.tensor([[0,1,2,3,4,0,1], [1,2,3,4,0,3,4]])
output = manual_message_passing(node_features, edge_index, "mean")
print(f"Input shape: {node_features.shape}")
print(f"Output shape: {output.shape}")
4. 주요 GNN 아키텍처
GCN (Graph Convolutional Network)
Kipf & Welling (2017)이 제안한 GCN은 스펙트럼 그래프 이론에서 출발하여 효율적인 레이어별 전파 규칙을 도출했습니다.
레이어별 전파 규칙:
정규화된 인접 행렬을 사용한 전파: 틸데 A = D^(-1/2) _ (A + I) _ D^(-1/2)
H^(l+1) = sigma(틸데 A _ H^(l) _ W^(l))
여기서 D는 차수 행렬(degree matrix), I는 항등 행렬, W는 학습 가능한 가중치입니다.
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
# 데이터셋 로드
dataset = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
data = dataset[0]
print(f"노드 수: {data.num_nodes}")
print(f"엣지 수: {data.num_edges}")
print(f"노드 특성 차원: {data.num_node_features}")
print(f"클래스 수: {dataset.num_classes}")
print(f"훈련 노드: {data.train_mask.sum().item()}")
print(f"검증 노드: {data.val_mask.sum().item()}")
print(f"테스트 노드: {data.test_mask.sum().item()}")
class GCN(nn.Module):
"""Graph Convolutional Network"""
def __init__(self, in_channels, hidden_channels, out_channels, dropout=0.5):
super().__init__()
self.conv1 = GCNConv(in_channels, hidden_channels)
self.conv2 = GCNConv(hidden_channels, out_channels)
self.dropout = dropout
def forward(self, x, edge_index):
# 첫 번째 GCN 레이어
x = self.conv1(x, edge_index)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# 두 번째 GCN 레이어
x = self.conv2(x, edge_index)
return x
# 모델, 옵티마이저 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
data = data.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
def train_gcn():
model.train()
optimizer.zero_grad()
out = model(data.x, data.edge_index)
loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
loss.backward()
optimizer.step()
return loss.item()
def test_gcn():
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
results = {}
for split, mask in [("train", data.train_mask),
("val", data.val_mask),
("test", data.test_mask)]:
correct = pred[mask].eq(data.y[mask]).sum().item()
results[split] = correct / mask.sum().item()
return results
# 훈련 루프
best_val_acc = 0
for epoch in range(200):
loss = train_gcn()
accs = test_gcn()
if accs["val"] > best_val_acc:
best_val_acc = accs["val"]
if (epoch + 1) % 50 == 0:
print(f"Epoch {epoch+1:03d} | Loss: {loss:.4f} | "
f"Train: {accs['train']:.4f} | Val: {accs['val']:.4f} | "
f"Test: {accs['test']:.4f}")
GCN 수동 구현
import torch
import torch.nn as nn
import torch.nn.functional as F
class ManualGCNLayer(nn.Module):
"""GCN 레이어 수동 구현 - 내부 동작 이해용"""
def __init__(self, in_features, out_features):
super().__init__()
self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
self.bias = nn.Parameter(torch.FloatTensor(out_features))
self.reset_parameters()
def reset_parameters(self):
nn.init.xavier_uniform_(self.weight)
nn.init.zeros_(self.bias)
def forward(self, x, adj):
"""
x: 노드 특성 (N, F_in)
adj: 정규화된 인접 행렬 (N, N)
"""
# 선형 변환: X * W
support = x @ self.weight
# 그래프 합성곱: A_hat * X * W
output = adj @ support + self.bias
return output
@staticmethod
def normalize_adjacency(adj):
"""D^(-1/2) * A * D^(-1/2) 정규화"""
# 자기 루프 추가
N = adj.size(0)
adj_hat = adj + torch.eye(N, device=adj.device)
# 차수 행렬 계산
deg = adj_hat.sum(dim=1)
d_inv_sqrt = torch.diag(deg.pow(-0.5))
# 정규화
adj_normalized = d_inv_sqrt @ adj_hat @ d_inv_sqrt
return adj_normalized
GraphSAGE (Inductive Learning)
GraphSAGE는 귀납적(inductive) 학습을 위해 설계되었습니다. 전체 그래프 대신 이웃을 샘플링하여 미니배치 학습이 가능합니다.
from torch_geometric.nn import SAGEConv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GraphSAGE(nn.Module):
"""GraphSAGE - 귀납적 표현 학습"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=3, dropout=0.5, aggr="mean"):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.convs.append(SAGEConv(in_channels, hidden_channels, aggr=aggr))
for _ in range(num_layers - 2):
self.convs.append(SAGEConv(hidden_channels, hidden_channels, aggr=aggr))
self.convs.append(SAGEConv(hidden_channels, out_channels, aggr=aggr))
self.bns = nn.ModuleList([
nn.BatchNorm1d(hidden_channels)
for _ in range(num_layers - 1)
])
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.bns[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.convs[-1](x, edge_index)
return x
# 이웃 샘플링을 이용한 미니배치 훈련
from torch_geometric.loader import NeighborLoader
# NeighborLoader: 각 레이어마다 num_neighbors개의 이웃 샘플링
train_loader = NeighborLoader(
data,
num_neighbors=[25, 10], # 2-hop: 첫 hop에서 25개, 두 번째 hop에서 10개
batch_size=256,
input_nodes=data.train_mask,
shuffle=True
)
model_sage = GraphSAGE(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
optimizer_sage = torch.optim.Adam(model_sage.parameters(), lr=0.001)
def train_sage():
model_sage.train()
total_loss = 0
for batch in train_loader:
batch = batch.to(device)
optimizer_sage.zero_grad()
out = model_sage(batch.x, batch.edge_index)
# 배치의 앞 batch_size개 노드만 훈련 노드
loss = F.cross_entropy(out[:batch.batch_size], batch.y[:batch.batch_size])
loss.backward()
optimizer_sage.step()
total_loss += loss.item()
return total_loss / len(train_loader)
GAT (Graph Attention Network)
GAT는 각 이웃에 다른 가중치를 부여하기 위해 어텐션 메커니즘을 사용합니다. "모든 이웃이 동등하게 중요하지 않다"는 직관을 구현합니다.
어텐션 계수 계산:
어텐션 점수: e_ij = LeakyReLU(a^T [Wh_i || Wh_j])
소프트맥스 정규화: alpha_ij = exp(e_ij) / sum_k(exp(e_ik))
업데이트: h_i' = sigma(sum_j alpha_ij _ W _ h_j)
from torch_geometric.nn import GATConv, GATv2Conv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GAT(nn.Module):
"""Graph Attention Network"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=8, dropout=0.6):
super().__init__()
self.dropout = dropout
# 첫 번째 레이어: 멀티헤드 어텐션
self.conv1 = GATConv(
in_channels,
hidden_channels,
heads=heads,
dropout=dropout,
concat=True # 헤드를 연결(concatenate)
)
# 두 번째 레이어: 평균 헤드
self.conv2 = GATConv(
hidden_channels * heads,
out_channels,
heads=1,
dropout=dropout,
concat=False # 헤드를 평균
)
def forward(self, x, edge_index):
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv1(x, edge_index)
x = F.elu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv2(x, edge_index)
return x
class GATv2(nn.Module):
"""
GATv2 - 개선된 어텐션 메커니즘
GATv2는 동적 어텐션을 계산하여 표현력이 더 높음
"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=8, dropout=0.6):
super().__init__()
self.conv1 = GATv2Conv(
in_channels,
hidden_channels,
heads=heads,
dropout=dropout,
concat=True
)
self.conv2 = GATv2Conv(
hidden_channels * heads,
out_channels,
heads=1,
dropout=dropout,
concat=False
)
self.dropout = dropout
def forward(self, x, edge_index):
x = F.dropout(x, p=self.dropout, training=self.training)
x = F.elu(self.conv1(x, edge_index))
x = F.dropout(x, p=self.dropout, training=self.training)
return self.conv2(x, edge_index)
# GAT 훈련
model_gat = GAT(
in_channels=dataset.num_features,
hidden_channels=8,
out_channels=dataset.num_classes,
heads=8
).to(device)
optimizer_gat = torch.optim.Adam(model_gat.parameters(), lr=0.005, weight_decay=5e-4)
Graph Transformer
Graph Transformer는 Transformer의 전역 어텐션을 그래프에 적용합니다.
from torch_geometric.nn import TransformerConv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GraphTransformer(nn.Module):
"""Graph Transformer Layer"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=4, num_layers=3, dropout=0.3):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.convs.append(
TransformerConv(in_channels, hidden_channels // heads, heads=heads,
dropout=dropout, beta=True)
)
for _ in range(num_layers - 2):
self.convs.append(
TransformerConv(hidden_channels, hidden_channels // heads,
heads=heads, dropout=dropout, beta=True)
)
self.convs.append(
TransformerConv(hidden_channels, out_channels // heads,
heads=heads, dropout=dropout, beta=True)
)
self.norms = nn.ModuleList([
nn.LayerNorm(hidden_channels) for _ in range(num_layers - 1)
])
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.norms[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
return self.convs[-1](x, edge_index)
5. 그래프 수준 예측
노드 분류가 개별 노드에 대한 예측이라면, 그래프 분류는 전체 그래프에 대한 예측입니다. 예: 분자가 독성인지 예측.
Global Pooling
from torch_geometric.nn import (
global_mean_pool,
global_max_pool,
global_add_pool
)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
class GraphClassifier(nn.Module):
"""그래프 분류 모델"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=3, dropout=0.5, pooling="mean"):
super().__init__()
self.dropout = dropout
self.pooling = pooling
self.convs = nn.ModuleList()
self.convs.append(GCNConv(in_channels, hidden_channels))
for _ in range(num_layers - 1):
self.convs.append(GCNConv(hidden_channels, hidden_channels))
self.bns = nn.ModuleList([
nn.BatchNorm1d(hidden_channels) for _ in range(num_layers)
])
# 그래프 레벨 분류기
self.classifier = nn.Sequential(
nn.Linear(hidden_channels, hidden_channels),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_channels, out_channels)
)
def forward(self, x, edge_index, batch):
"""
batch: 각 노드가 어느 그래프에 속하는지 나타내는 인덱스 벡터
"""
# 노드 임베딩
for conv, bn in zip(self.convs, self.bns):
x = conv(x, edge_index)
x = bn(x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# 그래프 수준 풀링
if self.pooling == "mean":
x = global_mean_pool(x, batch)
elif self.pooling == "max":
x = global_max_pool(x, batch)
elif self.pooling == "sum":
x = global_add_pool(x, batch)
# 분류
return self.classifier(x)
DiffPool (Differentiable Pooling)
from torch_geometric.nn import dense_diff_pool
import torch
import torch.nn as nn
import torch.nn.functional as F
class DiffPoolLayer(nn.Module):
"""계층적 그래프 풀링"""
def __init__(self, in_channels, hidden_channels, num_clusters):
super().__init__()
# GNN for node embedding
self.gnn_embed = nn.Sequential(
nn.Linear(in_channels, hidden_channels),
nn.ReLU()
)
# GNN for cluster assignment
self.gnn_pool = nn.Sequential(
nn.Linear(in_channels, num_clusters),
)
def forward(self, x, adj, mask=None):
embed = self.gnn_embed(x)
# Cluster assignment matrix
s = torch.softmax(self.gnn_pool(x), dim=-1)
# DiffPool
out, out_adj, link_loss, entropy_loss = dense_diff_pool(embed, adj, s, mask)
return out, out_adj, link_loss, entropy_loss
6. 링크 예측
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.utils import negative_sampling
from torch_geometric.transforms import RandomLinkSplit
class LinkPredictor(nn.Module):
"""링크 예측 모델"""
def __init__(self, in_channels, hidden_channels, out_channels):
super().__init__()
# 노드 임베딩 인코더
self.encoder = nn.ModuleList([
GCNConv(in_channels, hidden_channels),
GCNConv(hidden_channels, out_channels)
])
# 엣지 디코더
self.decoder = nn.Sequential(
nn.Linear(out_channels * 2, out_channels),
nn.ReLU(),
nn.Linear(out_channels, 1)
)
def encode(self, x, edge_index):
for i, conv in enumerate(self.encoder):
x = conv(x, edge_index)
if i < len(self.encoder) - 1:
x = F.relu(x)
return x
def decode(self, z, edge_index):
# 소스/타겟 노드 임베딩 연결
src, dst = edge_index
edge_feat = torch.cat([z[src], z[dst]], dim=1)
return self.decoder(edge_feat).squeeze()
def forward(self, x, edge_index, pos_edge_index, neg_edge_index):
z = self.encode(x, edge_index)
pos_pred = self.decode(z, pos_edge_index)
neg_pred = self.decode(z, neg_edge_index)
return pos_pred, neg_pred
def train_link_prediction(model, data, optimizer):
model.train()
optimizer.zero_grad()
# 노드 임베딩
z = model.encode(data.x, data.edge_index)
# 양성 엣지
pos_edge = data.train_pos_edge_index
# 음성 엣지 샘플링
neg_edge = negative_sampling(
edge_index=pos_edge,
num_nodes=data.num_nodes,
num_neg_samples=pos_edge.size(1)
)
pos_pred = model.decode(z, pos_edge)
neg_pred = model.decode(z, neg_edge)
# Binary cross-entropy loss
pred = torch.cat([pos_pred, neg_pred])
labels = torch.cat([
torch.ones(pos_pred.size(0)),
torch.zeros(neg_pred.size(0))
]).to(pred.device)
loss = F.binary_cross_entropy_with_logits(pred, labels)
loss.backward()
optimizer.step()
return loss.item()
7. PyTorch Geometric (PyG) 완전 가이드
설치
pip install torch-geometric
pip install torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu121.html
Data 객체
from torch_geometric.data import Data
import torch
# 그래프 데이터 생성
x = torch.randn(6, 3) # 6개 노드, 3차원 특성
edge_index = torch.tensor([
[0, 1, 2, 3, 4, 0],
[1, 2, 3, 4, 0, 3]
], dtype=torch.long)
y = torch.tensor([0, 1, 0, 1, 0, 1]) # 노드 레이블
edge_attr = torch.randn(6, 2) # 엣지 특성
data = Data(
x=x,
edge_index=edge_index,
y=y,
edge_attr=edge_attr
)
print(data)
print(f"노드 수: {data.num_nodes}")
print(f"엣지 수: {data.num_edges}")
print(f"노드 특성 차원: {data.num_node_features}")
print(f"엣지 특성 차원: {data.num_edge_features}")
print(f"self-loop 여부: {data.has_self_loops()}")
print(f"방향 그래프 여부: {data.is_directed()}")
# 유효성 검사
print(f"유효한 데이터: {data.validate()}")
DataLoader와 미니배치
from torch_geometric.data import Data, DataLoader
import torch
# 그래프 데이터셋 생성
dataset = []
for _ in range(100):
n = torch.randint(5, 20, (1,)).item() # 5~20개 노드
e = torch.randint(10, 40, (1,)).item() # 10~40개 엣지
data = Data(
x=torch.randn(n, 8),
edge_index=torch.randint(0, n, (2, e)),
y=torch.randint(0, 3, (1,)) # 그래프 레이블
)
dataset.append(data)
# DataLoader: 여러 그래프를 하나의 불연속 그래프로 배치
loader = DataLoader(dataset, batch_size=32, shuffle=True)
for batch in loader:
print(f"배치 그래프 수: {batch.num_graphs}")
print(f"전체 노드 수: {batch.num_nodes}")
print(f"전체 엣지 수: {batch.num_edges}")
print(f"batch 벡터: {batch.batch.shape}") # 각 노드의 그래프 인덱스
break
내장 데이터셋
from torch_geometric.datasets import (
Planetoid, # Cora, Citeseer, PubMed
TUDataset, # 분자 데이터셋 (MUTAG, ENZYMES 등)
OGB, # Open Graph Benchmark
)
from torch_geometric.transforms import NormalizeFeatures, RandomNodeSplit
# Cora 인용 네트워크
cora = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
print(f"Cora - 노드: {cora[0].num_nodes}, 엣지: {cora[0].num_edges}")
# MUTAG 분자 데이터셋
mutag = TUDataset(root='/tmp/TUDataset', name='MUTAG')
print(f"MUTAG - 그래프 수: {len(mutag)}, 클래스: {mutag.num_classes}")
# Open Graph Benchmark (대규모)
try:
from ogb.nodeproppred import PygNodePropPredDataset
dataset_ogb = PygNodePropPredDataset(name='ogbn-arxiv')
split_idx = dataset_ogb.get_idx_split()
data_ogb = dataset_ogb[0]
print(f"OGB-Arxiv - 노드: {data_ogb.num_nodes}, 엣지: {data_ogb.num_edges}")
except ImportError:
print("ogb 패키지가 없습니다. pip install ogb")
완전한 노드 분류 파이프라인
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv, SAGEConv
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
import matplotlib.pyplot as plt
# 데이터 로드
dataset = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
data = dataset[0]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data = data.to(device)
class MultiLayerGNN(nn.Module):
"""여러 GNN 레이어를 조합한 모델"""
def __init__(self, in_channels, hidden_channels, out_channels,
gnn_type="gcn", num_layers=3, dropout=0.5):
super().__init__()
self.dropout = dropout
self.gnn_type = gnn_type
self.convs = nn.ModuleList()
self.bns = nn.ModuleList()
# 입력 레이어
self.convs.append(self._make_conv(in_channels, hidden_channels, gnn_type))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# 중간 레이어
for _ in range(num_layers - 2):
self.convs.append(self._make_conv(hidden_channels, hidden_channels, gnn_type))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# 출력 레이어
self.convs.append(self._make_conv(hidden_channels, out_channels, gnn_type))
def _make_conv(self, in_ch, out_ch, gnn_type):
if gnn_type == "gcn":
return GCNConv(in_ch, out_ch)
elif gnn_type == "sage":
return SAGEConv(in_ch, out_ch)
elif gnn_type == "gat":
return GATConv(in_ch, out_ch, heads=1)
else:
raise ValueError(f"Unknown GNN type: {gnn_type}")
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.bns[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
return self.convs[-1](x, edge_index)
def run_experiment(gnn_type, epochs=200):
model = MultiLayerGNN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes,
gnn_type=gnn_type,
num_layers=3
).to(device)
optimizer = torch.optim.Adam(
model.parameters(), lr=0.01, weight_decay=5e-4
)
train_losses = []
val_accs = []
for epoch in range(epochs):
# 훈련
model.train()
optimizer.zero_grad()
out = model(data.x, data.edge_index)
loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
loss.backward()
optimizer.step()
train_losses.append(loss.item())
# 평가
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
val_acc = pred[data.val_mask].eq(data.y[data.val_mask]).sum().item()
val_acc /= data.val_mask.sum().item()
val_accs.append(val_acc)
# 최종 테스트
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
test_acc = pred[data.test_mask].eq(data.y[data.test_mask]).sum().item()
test_acc /= data.test_mask.sum().item()
return test_acc, train_losses, val_accs
# 다양한 GNN 비교
results = {}
for gnn_type in ["gcn", "sage", "gat"]:
test_acc, losses, val_accs = run_experiment(gnn_type)
results[gnn_type] = test_acc
print(f"{gnn_type.upper():10s}: Test Accuracy = {test_acc:.4f}")
그래프 분류 완전 예제
from torch_geometric.datasets import TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import (
GINConv, global_mean_pool, global_add_pool
)
import torch
import torch.nn as nn
import torch.nn.functional as F
# MUTAG 데이터셋 로드
dataset = TUDataset(root='/tmp/TUDataset', name='MUTAG')
dataset = dataset.shuffle()
# 훈련/테스트 분할
n = len(dataset)
train_dataset = dataset[:int(0.8 * n)]
test_dataset = dataset[int(0.8 * n):]
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)
class GIN(nn.Module):
"""
Graph Isomorphism Network (GIN) - 최대 표현력을 가진 GNN
GCN보다 더 강력한 구별 능력을 가짐
"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=5, dropout=0.5):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.bns = nn.ModuleList()
for i in range(num_layers):
in_ch = in_channels if i == 0 else hidden_channels
# GIN의 MLP
mlp = nn.Sequential(
nn.Linear(in_ch, hidden_channels),
nn.BatchNorm1d(hidden_channels),
nn.ReLU(),
nn.Linear(hidden_channels, hidden_channels)
)
self.convs.append(GINConv(mlp, train_eps=True))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# 그래프 분류기
self.classifier = nn.Sequential(
nn.Linear(hidden_channels * num_layers, hidden_channels),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_channels, out_channels)
)
def forward(self, x, edge_index, batch):
# 각 레이어의 출력을 저장
xs = []
for conv, bn in zip(self.convs, self.bns):
x = conv(x, edge_index)
x = bn(x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
xs.append(global_add_pool(x, batch)) # 그래프 수준 집계
# 모든 레이어의 그래프 표현 연결
out = torch.cat(xs, dim=1)
return self.classifier(out)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_gin = GIN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
optimizer = torch.optim.Adam(model_gin.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
def train_gin():
model_gin.train()
total_loss = 0
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
out = model_gin(batch.x, batch.edge_index, batch.batch)
loss = F.cross_entropy(out, batch.y)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def test_gin(loader):
model_gin.eval()
correct = 0
for batch in loader:
batch = batch.to(device)
with torch.no_grad():
pred = model_gin(batch.x, batch.edge_index, batch.batch).argmax(dim=1)
correct += pred.eq(batch.y).sum().item()
return correct / len(loader.dataset)
for epoch in range(1, 201):
loss = train_gin()
train_acc = test_gin(train_loader)
test_acc = test_gin(test_loader)
scheduler.step()
if epoch % 20 == 0:
print(f"Epoch {epoch:03d} | Loss: {loss:.4f} | "
f"Train: {train_acc:.4f} | Test: {test_acc:.4f}")
8. DGL (Deep Graph Library) 비교
# DGL 예시 - PyG와 비교
# pip install dgl
try:
import dgl
import dgl.nn as dglnn
import torch
import torch.nn as nn
import torch.nn.functional as F
class DGLGCN(nn.Module):
"""DGL로 구현한 GCN"""
def __init__(self, in_feats, hidden_size, num_classes):
super().__init__()
self.conv1 = dglnn.GraphConv(in_feats, hidden_size)
self.conv2 = dglnn.GraphConv(hidden_size, num_classes)
def forward(self, g, features):
x = F.relu(self.conv1(g, features))
x = F.dropout(x, training=self.training)
return self.conv2(g, x)
# DGL 그래프 생성
src = torch.tensor([0, 1, 2, 3, 4])
dst = torch.tensor([1, 2, 3, 4, 0])
g = dgl.graph((src, dst))
g.ndata['feat'] = torch.randn(5, 16)
model_dgl = DGLGCN(16, 32, 4)
out = model_dgl(g, g.ndata['feat'])
print(f"DGL GCN output: {out.shape}")
except ImportError:
print("DGL이 설치되지 않았습니다. pip install dgl")
PyG vs DGL 비교:
| 특성 | PyTorch Geometric (PyG) | Deep Graph Library (DGL) |
|---|---|---|
| API 스타일 | PyTorch-native | 프레임워크 독립 |
| 데이터 표현 | edge_index (COO) | DGLGraph 객체 |
| 속도 | 매우 빠름 | 빠름 |
| 커뮤니티 | 대규모 | 대규모 |
| 모델 수 | 매우 많음 | 많음 |
| 학습 곡선 | 낮음 | 중간 |
9. 실전 응용
분자 특성 예측 (OGB)
try:
from ogb.graphproppred import PygGraphPropPredDataset, Evaluator
from torch_geometric.loader import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GINEConv, global_mean_pool
# HIV 분자 데이터셋 로드
dataset_mol = PygGraphPropPredDataset(name='ogbg-molhiv')
split_idx = dataset_mol.get_idx_split()
train_loader_mol = DataLoader(
dataset_mol[split_idx["train"]],
batch_size=32,
shuffle=True
)
class MoleculeGNN(nn.Module):
"""분자 특성 예측 모델"""
def __init__(self, hidden_channels=300, num_layers=5):
super().__init__()
self.atom_encoder = nn.Embedding(100, hidden_channels)
self.bond_encoder = nn.Embedding(10, hidden_channels)
self.convs = nn.ModuleList()
for _ in range(num_layers):
mlp = nn.Sequential(
nn.Linear(hidden_channels, hidden_channels * 2),
nn.BatchNorm1d(hidden_channels * 2),
nn.ReLU(),
nn.Linear(hidden_channels * 2, hidden_channels)
)
self.convs.append(GINEConv(mlp))
self.pool = global_mean_pool
self.predictor = nn.Linear(hidden_channels, 1)
def forward(self, x, edge_index, edge_attr, batch):
x = self.atom_encoder(x.squeeze())
edge_attr = self.bond_encoder(edge_attr.squeeze())
for conv in self.convs:
x = conv(x, edge_index, edge_attr)
x = F.relu(x)
graph_embed = self.pool(x, batch)
return self.predictor(graph_embed)
print("OGB 분자 데이터셋 로드 성공")
except ImportError:
print("ogb 패키지가 없습니다. pip install ogb")
추천 시스템
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import LightGCN
class RecommendationSystem(nn.Module):
"""
LightGCN 기반 협업 필터링
사용자-아이템 이분 그래프에서 임베딩 학습
"""
def __init__(self, num_users, num_items, embedding_dim=64, num_layers=3):
super().__init__()
self.num_users = num_users
self.num_items = num_items
self.embedding_dim = embedding_dim
self.num_layers = num_layers
# 사용자/아이템 임베딩
self.user_emb = nn.Embedding(num_users, embedding_dim)
self.item_emb = nn.Embedding(num_items, embedding_dim)
# LightGCN: 비선형 변환 없이 단순 집계
self.lightgcn = LightGCN(
num_nodes=num_users + num_items,
embedding_dim=embedding_dim,
num_layers=num_layers
)
self._init_weights()
def _init_weights(self):
nn.init.normal_(self.user_emb.weight, std=0.01)
nn.init.normal_(self.item_emb.weight, std=0.01)
def forward(self, edge_index):
# 전체 노드 임베딩
x = torch.cat([self.user_emb.weight, self.item_emb.weight], dim=0)
# LightGCN 전파
embeddings = self.lightgcn(x, edge_index)
return embeddings[:self.num_users], embeddings[self.num_users:]
def predict(self, user_ids, item_ids, edge_index):
user_embs, item_embs = self(edge_index)
u = user_embs[user_ids]
i = item_embs[item_ids]
return (u * i).sum(dim=1)
# BPR 손실 함수
def bpr_loss(pos_scores, neg_scores):
"""Bayesian Personalized Ranking Loss"""
return -F.logsigmoid(pos_scores - neg_scores).mean()
10. 그래프 생성 모델
GraphVAE
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
class GraphVAE(nn.Module):
"""그래프 변분 오토인코더"""
def __init__(self, in_channels, hidden_channels, latent_dim):
super().__init__()
# 인코더 (GNN)
self.conv1 = GCNConv(in_channels, hidden_channels)
self.conv_mu = GCNConv(hidden_channels, latent_dim)
self.conv_logvar = GCNConv(hidden_channels, latent_dim)
def encode(self, x, edge_index):
h = F.relu(self.conv1(x, edge_index))
mu = self.conv_mu(h, edge_index)
logvar = self.conv_logvar(h, edge_index)
return mu, logvar
def reparameterize(self, mu, logvar):
if self.training:
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
return mu
def decode(self, z):
# 내적으로 엣지 확률 계산
adj_pred = torch.sigmoid(z @ z.t())
return adj_pred
def forward(self, x, edge_index):
mu, logvar = self.encode(x, edge_index)
z = self.reparameterize(mu, logvar)
adj_pred = self.decode(z)
return adj_pred, mu, logvar
def loss(self, adj_pred, adj_target, mu, logvar):
# 재구성 손실
recon_loss = F.binary_cross_entropy(adj_pred, adj_target)
# KL 발산
kl_loss = -0.5 * torch.mean(
1 + logvar - mu.pow(2) - logvar.exp()
)
return recon_loss + kl_loss
퀴즈
Q1. GCN과 GAT의 가장 큰 차이점은 무엇인가요?
정답: GCN은 모든 이웃 노드를 고정된 가중치(차수 기반 정규화)로 집계하지만, GAT는 어텐션 메커니즘을 통해 각 이웃에 서로 다른 가중치를 동적으로 학습합니다.
설명: GCN의 집계 가중치는 노드의 차수(degree)에 의해 고정됩니다. 반면 GAT의 어텐션 계수는 연결된 두 노드의 특성 벡터에 기반해 동적으로 계산되므로, 더 중요한 이웃에 더 많은 주의를 기울일 수 있습니다. 멀티헤드 어텐션으로 안정성을 높이는 것도 GAT의 장점입니다.
Q2. GraphSAGE가 GCN보다 인덕티브 학습에 유리한 이유는 무엇인가요?
정답: GraphSAGE는 집계 함수(aggregator)를 학습하여 새로운 노드의 이웃으로부터 임베딩을 생성할 수 있기 때문입니다.
설명: GCN은 학습 시 전체 그래프의 인접 행렬을 필요로 하므로, 새로운 노드가 추가되면 재학습이 필요한 트랜스덕티브(transductive) 방식입니다. GraphSAGE는 이웃을 샘플링하고 집계하는 함수를 학습하므로, 학습 시 보지 못한 새로운 노드에도 이 함수를 적용해 임베딩을 생성할 수 있습니다. Pinterest, LinkedIn 등 동적으로 변하는 대규모 그래프에서 실제로 활용됩니다.
Q3. 메시지 패싱(MPNN) 프레임워크의 세 단계는 무엇인가요?
정답: Message(메시지 계산), Aggregate(집계), Update(업데이트) 세 단계입니다.
설명: Message 단계에서는 각 엣지에 대해 이웃 노드로부터 전달할 메시지를 계산합니다. Aggregate 단계에서는 노드가 수신한 모든 이웃 메시지를 합산, 평균, 최댓값 등으로 집계합니다. Update 단계에서는 집계된 메시지와 현재 노드 임베딩을 결합해 새로운 노드 임베딩을 생성합니다. GCN, GAT, GraphSAGE, GIN 등 대부분의 GNN이 이 프레임워크로 통일될 수 있습니다.
Q4. 과평활화(Over-smoothing) 문제란 무엇이며, 어떻게 해결하나요?
정답: 레이어가 깊어질수록 모든 노드의 임베딩이 유사해지는 현상입니다. 잔차 연결, JK-Net, DropEdge 등으로 완화할 수 있습니다.
설명: K 레이어 GNN은 K-hop 이웃의 정보를 집계합니다. 레이어가 많아질수록 더 넓은 이웃을 포함하게 되고, 결국 모든 노드가 동일한 글로벌 평균에 수렴합니다. 잔차 연결(Residual connections)은 이전 레이어의 정보를 직접 전달해 고유 정보를 보존합니다. JK-Net(Jumping Knowledge Networks)은 모든 레이어의 임베딩을 최종 표현에 활용합니다. DropEdge는 학습 시 일부 엣지를 무작위로 제거합니다.
Q5. GNN의 표현력이 WL 테스트와 동등하다는 것은 무엇을 의미하나요?
정답: 표준 GNN은 Weisfeiler-Leman(WL) 그래프 동형 테스트로 구별할 수 없는 두 그래프를 동일하게 임베딩한다는 의미입니다.
설명: WL 테스트는 두 그래프가 동형(isomorphic)인지 판별하는 알고리즘으로, 반복적으로 이웃의 레이블을 집계하고 해싱합니다. Xu et al. (2019)은 GIN(Graph Isomorphism Network)을 통해 표준 GNN의 표현력이 1-WL 테스트와 동등하다는 것을 증명했습니다. WL 테스트가 구별하지 못하는 그래프 쌍에서 GNN도 두 그래프를 구별하지 못합니다. 이를 극복하기 위해 더 강력한 k-차 WL 테스트에 해당하는 고차원 GNN 연구가 진행 중입니다.
마치며
이 가이드에서는 그래프 신경망의 전체 생태계를 다루었습니다:
- 그래프 이론 기초: 노드, 엣지, 인접 행렬, 그래프 특성
- 메시지 패싱 패러다임: GNN의 핵심 원리
- 주요 아키텍처: GCN, GraphSAGE, GAT, Graph Transformer, GIN
- 그래프 수준 예측: Global Pooling, DiffPool
- 링크 예측: 지식 그래프, 추천 시스템
- PyTorch Geometric: 노드 분류, 그래프 분류 완전 예제
- 실전 응용: 분자 설계, 추천 시스템, 사기 탐지
- 그래프 생성 모델: GraphVAE
GNN은 분자 설계, 약물 발견, 소셜 네트워크 분석, 교통 예측, 추천 시스템 등 다양한 분야에서 혁신적인 성과를 내고 있습니다. PyTorch Geometric과 DGL 같은 라이브러리 덕분에 구현이 점점 쉬워지고 있으며, OGB 같은 벤치마크를 통해 공정한 비교도 가능해졌습니다.
참고 자료
Graph Neural Networks Complete Guide: GCN, GAT, GraphSAGE to Molecular Design
Graph Neural Networks Complete Guide
Social networks, molecular structures, knowledge graphs, recommendation systems — countless real-world datasets are naturally represented as graphs. Graph Neural Networks (GNNs) are the core tool for applying deep learning to this non-Euclidean data. This guide systematically covers everything from graph theory fundamentals to the latest GNN architectures and hands-on implementations with PyTorch Geometric.
1. Graph Theory Fundamentals
Graph Definition
A graph G consists of a set of nodes V and a set of edges E, expressed as G = (V, E). Nodes represent entities, while edges represent relationships between entities.
- Nodes (Vertices): Represent entities. Examples: users, atoms, papers
- Edges: Represent relationships. Examples: friendships, chemical bonds, citations
- Node Features: Feature vectors attached to each node
- Edge Features: Feature vectors attached to each edge
Directed vs Undirected Graphs
import networkx as nx
import numpy as np
# Undirected Graph
G_undirected = nx.Graph()
G_undirected.add_edges_from([(0, 1), (1, 2), (2, 3), (3, 0), (0, 2)])
# Directed Graph
G_directed = nx.DiGraph()
G_directed.add_edges_from([(0, 1), (1, 2), (2, 0), (0, 3)])
print(f"Undirected - Nodes: {G_undirected.number_of_nodes()}, Edges: {G_undirected.number_of_edges()}")
print(f"Directed - Nodes: {G_directed.number_of_nodes()}, Edges: {G_directed.number_of_edges()}")
Adjacency Matrix and Edge List
import torch
import numpy as np
# Adjacency Matrix
# A[i][j] = 1 means an edge exists between nodes i and j
adj_matrix = torch.tensor([
[0, 1, 1, 0],
[1, 0, 1, 0],
[1, 1, 0, 1],
[0, 0, 1, 0]
], dtype=torch.float32)
# Edge Index (used by PyG)
# shape: (2, num_edges) - first row: source nodes, second row: target nodes
edge_index = torch.tensor([
[0, 0, 1, 1, 2, 2, 2, 3], # Source nodes
[1, 2, 0, 2, 0, 1, 3, 2] # Target nodes
], dtype=torch.long)
print(f"Adjacency matrix shape: {adj_matrix.shape}") # (4, 4)
print(f"Edge list shape: {edge_index.shape}") # (2, 8)
# Convert adjacency matrix to edge index
def adj_to_edge_index(adj):
"""Convert adjacency matrix to edge index"""
row, col = torch.where(adj > 0)
return torch.stack([row, col], dim=0)
converted = adj_to_edge_index(adj_matrix)
print(f"Converted edge list:\n{converted}")
Graph Properties
import networkx as nx
import numpy as np
def analyze_graph(G):
"""Analyze key properties of a graph"""
# Degree
degrees = dict(G.degree())
avg_degree = np.mean(list(degrees.values()))
# Clustering Coefficient
clustering = nx.average_clustering(G)
# Average Path Length
if nx.is_connected(G):
avg_path = nx.average_shortest_path_length(G)
else:
# Use the largest connected component
largest_cc = max(nx.connected_components(G), key=len)
subgraph = G.subgraph(largest_cc)
avg_path = nx.average_shortest_path_length(subgraph)
# Centrality
betweenness = nx.betweenness_centrality(G)
pagerank = nx.pagerank(G)
print(f"Nodes: {G.number_of_nodes()}")
print(f"Edges: {G.number_of_edges()}")
print(f"Average degree: {avg_degree:.2f}")
print(f"Clustering coefficient: {clustering:.3f}")
print(f"Average path length: {avg_path:.2f}")
return {
"degrees": degrees,
"clustering": clustering,
"avg_path": avg_path,
"betweenness": betweenness,
"pagerank": pagerank
}
# Social network example (Karate Club)
G = nx.karate_club_graph()
stats = analyze_graph(G)
Real-World Graphs
| Domain | Nodes | Edges | Task |
|---|---|---|---|
| Social Network | Users | Friendships | Community detection |
| Molecular Structure | Atoms | Chemical bonds | Property prediction |
| Knowledge Graph | Entities | Relations | Link prediction |
| Citation Network | Papers | Citations | Node classification |
| Traffic Network | Intersections | Roads | Route prediction |
| Recommendation | Users/Items | Interactions | Recommendation |
2. Motivation for Graph Machine Learning
Why CNN/RNN Falls Short
Traditional CNNs assume a grid structure. Images work naturally because pixels are arranged in a regular 2D grid. RNNs assume sequential structure.
But graphs are:
- Irregular structure: Each node has a different number of neighbors
- Orderless: Permutation invariance of nodes
- Global dependencies: Distant nodes can still influence each other
# Illustrating graph data characteristics
# Images: fixed-size grids
image = torch.randn(3, 224, 224) # channels, height, width
# Sequences: ordered data
sequence = torch.randn(100, 512) # sequence length, feature dim
# Graphs: variable neighborhood structure
# Node features: (num_nodes, feature_dim)
node_features = torch.randn(34, 16) # 34 nodes, 16-dim features
# Edges: (2, num_edges) - sparse connections
edge_index = torch.randint(0, 34, (2, 78))
Message Passing Paradigm
The fundamental principle of all GNNs is message passing. Each node receives messages from its neighbors and updates its own representation.
The Message Passing Neural Network (MPNN) framework:
- Message computation: Compute message to send from node u to v along edge (u, v)
- Aggregation: Each node aggregates all incoming neighbor messages
- Update: Update the node representation using the aggregated message
m_v^(l) = AGGREGATE({h_u^(l-1) : u in N(v)})
h_v^(l) = UPDATE(h_v^(l-1), m_v^(l))
where N(v) is the set of neighbors of node v.
3. GNN Fundamental Equations
Aggregation and Update
import torch
import torch.nn as nn
from torch_scatter import scatter_mean, scatter_sum, scatter_max
def manual_message_passing(node_features, edge_index, aggregation="mean"):
"""
Manually implemented message passing
node_features: (N, F) - N nodes, F-dimensional features
edge_index: (2, E) - E edges
"""
src, dst = edge_index[0], edge_index[1]
num_nodes = node_features.size(0)
# Use source node features as messages
messages = node_features[src] # (E, F)
if aggregation == "mean":
aggregated = scatter_mean(messages, dst, dim=0, dim_size=num_nodes)
elif aggregation == "sum":
aggregated = scatter_sum(messages, dst, dim=0, dim_size=num_nodes)
elif aggregation == "max":
aggregated, _ = scatter_max(messages, dst, dim=0, dim_size=num_nodes)
# Update: original features + aggregated messages
updated = node_features + aggregated
return updated
# Example
N, F = 6, 8
node_features = torch.randn(N, F)
edge_index = torch.tensor([[0,1,2,3,4,0,1], [1,2,3,4,0,3,4]])
output = manual_message_passing(node_features, edge_index, "mean")
print(f"Input shape: {node_features.shape}")
print(f"Output shape: {output.shape}")
4. Key GNN Architectures
GCN (Graph Convolutional Network)
GCN, proposed by Kipf and Welling in 2017, derives an efficient layer-wise propagation rule starting from spectral graph theory.
Layer-wise propagation rule:
Using a normalized adjacency matrix: A_tilde = D^(-1/2) _ (A + I) _ D^(-1/2)
H^(l+1) = sigma(A_tilde _ H^(l) _ W^(l))
where D is the degree matrix, I is the identity matrix, and W is the learnable weight matrix.
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
# Load dataset
dataset = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
data = dataset[0]
print(f"Nodes: {data.num_nodes}")
print(f"Edges: {data.num_edges}")
print(f"Node feature dim: {data.num_node_features}")
print(f"Num classes: {dataset.num_classes}")
print(f"Train nodes: {data.train_mask.sum().item()}")
print(f"Val nodes: {data.val_mask.sum().item()}")
print(f"Test nodes: {data.test_mask.sum().item()}")
class GCN(nn.Module):
"""Graph Convolutional Network"""
def __init__(self, in_channels, hidden_channels, out_channels, dropout=0.5):
super().__init__()
self.conv1 = GCNConv(in_channels, hidden_channels)
self.conv2 = GCNConv(hidden_channels, out_channels)
self.dropout = dropout
def forward(self, x, edge_index):
# First GCN layer
x = self.conv1(x, edge_index)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# Second GCN layer
x = self.conv2(x, edge_index)
return x
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
data = data.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
def train_gcn():
model.train()
optimizer.zero_grad()
out = model(data.x, data.edge_index)
loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
loss.backward()
optimizer.step()
return loss.item()
def test_gcn():
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
results = {}
for split, mask in [("train", data.train_mask),
("val", data.val_mask),
("test", data.test_mask)]:
correct = pred[mask].eq(data.y[mask]).sum().item()
results[split] = correct / mask.sum().item()
return results
# Training loop
best_val_acc = 0
for epoch in range(200):
loss = train_gcn()
accs = test_gcn()
if accs["val"] > best_val_acc:
best_val_acc = accs["val"]
if (epoch + 1) % 50 == 0:
print(f"Epoch {epoch+1:03d} | Loss: {loss:.4f} | "
f"Train: {accs['train']:.4f} | Val: {accs['val']:.4f} | "
f"Test: {accs['test']:.4f}")
Manual GCN Implementation
import torch
import torch.nn as nn
import torch.nn.functional as F
class ManualGCNLayer(nn.Module):
"""Manual GCN layer implementation - for understanding internals"""
def __init__(self, in_features, out_features):
super().__init__()
self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
self.bias = nn.Parameter(torch.FloatTensor(out_features))
self.reset_parameters()
def reset_parameters(self):
nn.init.xavier_uniform_(self.weight)
nn.init.zeros_(self.bias)
def forward(self, x, adj):
"""
x: node features (N, F_in)
adj: normalized adjacency matrix (N, N)
"""
# Linear transform: X * W
support = x @ self.weight
# Graph convolution: A_hat * X * W
output = adj @ support + self.bias
return output
@staticmethod
def normalize_adjacency(adj):
"""D^(-1/2) * A * D^(-1/2) normalization"""
# Add self-loops
N = adj.size(0)
adj_hat = adj + torch.eye(N, device=adj.device)
# Compute degree matrix
deg = adj_hat.sum(dim=1)
d_inv_sqrt = torch.diag(deg.pow(-0.5))
# Normalize
adj_normalized = d_inv_sqrt @ adj_hat @ d_inv_sqrt
return adj_normalized
GraphSAGE (Inductive Learning)
GraphSAGE is designed for inductive learning. It uses neighbor sampling to enable mini-batch training instead of processing the entire graph.
from torch_geometric.nn import SAGEConv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GraphSAGE(nn.Module):
"""GraphSAGE - Inductive Representation Learning"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=3, dropout=0.5, aggr="mean"):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.convs.append(SAGEConv(in_channels, hidden_channels, aggr=aggr))
for _ in range(num_layers - 2):
self.convs.append(SAGEConv(hidden_channels, hidden_channels, aggr=aggr))
self.convs.append(SAGEConv(hidden_channels, out_channels, aggr=aggr))
self.bns = nn.ModuleList([
nn.BatchNorm1d(hidden_channels)
for _ in range(num_layers - 1)
])
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.bns[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.convs[-1](x, edge_index)
return x
# Mini-batch training with neighbor sampling
from torch_geometric.loader import NeighborLoader
# NeighborLoader: sample num_neighbors neighbors per layer
train_loader = NeighborLoader(
data,
num_neighbors=[25, 10], # 2-hop: 25 at 1st hop, 10 at 2nd hop
batch_size=256,
input_nodes=data.train_mask,
shuffle=True
)
model_sage = GraphSAGE(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
optimizer_sage = torch.optim.Adam(model_sage.parameters(), lr=0.001)
def train_sage():
model_sage.train()
total_loss = 0
for batch in train_loader:
batch = batch.to(device)
optimizer_sage.zero_grad()
out = model_sage(batch.x, batch.edge_index)
# Only the first batch_size nodes are training nodes
loss = F.cross_entropy(out[:batch.batch_size], batch.y[:batch.batch_size])
loss.backward()
optimizer_sage.step()
total_loss += loss.item()
return total_loss / len(train_loader)
GAT (Graph Attention Network)
GAT uses attention mechanisms to assign different weights to each neighbor. It implements the intuition that "not all neighbors are equally important."
Attention coefficient computation:
Attention score: e_ij = LeakyReLU(a^T [Wh_i || Wh_j])
Softmax normalization: alpha_ij = exp(e_ij) / sum_k(exp(e_ik))
Update: h_i' = sigma(sum_j alpha_ij _ W _ h_j)
from torch_geometric.nn import GATConv, GATv2Conv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GAT(nn.Module):
"""Graph Attention Network"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=8, dropout=0.6):
super().__init__()
self.dropout = dropout
# First layer: multi-head attention
self.conv1 = GATConv(
in_channels,
hidden_channels,
heads=heads,
dropout=dropout,
concat=True # Concatenate heads
)
# Second layer: average heads
self.conv2 = GATConv(
hidden_channels * heads,
out_channels,
heads=1,
dropout=dropout,
concat=False # Average heads
)
def forward(self, x, edge_index):
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv1(x, edge_index)
x = F.elu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv2(x, edge_index)
return x
class GATv2(nn.Module):
"""
GATv2 - Improved attention mechanism
GATv2 computes dynamic attention, giving higher expressiveness
"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=8, dropout=0.6):
super().__init__()
self.conv1 = GATv2Conv(
in_channels,
hidden_channels,
heads=heads,
dropout=dropout,
concat=True
)
self.conv2 = GATv2Conv(
hidden_channels * heads,
out_channels,
heads=1,
dropout=dropout,
concat=False
)
self.dropout = dropout
def forward(self, x, edge_index):
x = F.dropout(x, p=self.dropout, training=self.training)
x = F.elu(self.conv1(x, edge_index))
x = F.dropout(x, p=self.dropout, training=self.training)
return self.conv2(x, edge_index)
# GAT training
model_gat = GAT(
in_channels=dataset.num_features,
hidden_channels=8,
out_channels=dataset.num_classes,
heads=8
).to(device)
optimizer_gat = torch.optim.Adam(model_gat.parameters(), lr=0.005, weight_decay=5e-4)
Graph Transformer
Graph Transformer applies global Transformer attention to graphs.
from torch_geometric.nn import TransformerConv
import torch
import torch.nn as nn
import torch.nn.functional as F
class GraphTransformer(nn.Module):
"""Graph Transformer Layer"""
def __init__(self, in_channels, hidden_channels, out_channels,
heads=4, num_layers=3, dropout=0.3):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.convs.append(
TransformerConv(in_channels, hidden_channels // heads, heads=heads,
dropout=dropout, beta=True)
)
for _ in range(num_layers - 2):
self.convs.append(
TransformerConv(hidden_channels, hidden_channels // heads,
heads=heads, dropout=dropout, beta=True)
)
self.convs.append(
TransformerConv(hidden_channels, out_channels // heads,
heads=heads, dropout=dropout, beta=True)
)
self.norms = nn.ModuleList([
nn.LayerNorm(hidden_channels) for _ in range(num_layers - 1)
])
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.norms[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
return self.convs[-1](x, edge_index)
5. Graph-Level Prediction
While node classification predicts individual nodes, graph classification predicts entire graphs. For example: predicting whether a molecule is toxic.
Global Pooling
from torch_geometric.nn import (
global_mean_pool,
global_max_pool,
global_add_pool
)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
class GraphClassifier(nn.Module):
"""Graph classification model"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=3, dropout=0.5, pooling="mean"):
super().__init__()
self.dropout = dropout
self.pooling = pooling
self.convs = nn.ModuleList()
self.convs.append(GCNConv(in_channels, hidden_channels))
for _ in range(num_layers - 1):
self.convs.append(GCNConv(hidden_channels, hidden_channels))
self.bns = nn.ModuleList([
nn.BatchNorm1d(hidden_channels) for _ in range(num_layers)
])
# Graph-level classifier
self.classifier = nn.Sequential(
nn.Linear(hidden_channels, hidden_channels),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_channels, out_channels)
)
def forward(self, x, edge_index, batch):
"""
batch: index vector indicating which graph each node belongs to
"""
# Node embedding
for conv, bn in zip(self.convs, self.bns):
x = conv(x, edge_index)
x = bn(x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# Graph-level pooling
if self.pooling == "mean":
x = global_mean_pool(x, batch)
elif self.pooling == "max":
x = global_max_pool(x, batch)
elif self.pooling == "sum":
x = global_add_pool(x, batch)
# Classification
return self.classifier(x)
DiffPool (Differentiable Pooling)
from torch_geometric.nn import dense_diff_pool
import torch
import torch.nn as nn
import torch.nn.functional as F
class DiffPoolLayer(nn.Module):
"""Hierarchical graph pooling"""
def __init__(self, in_channels, hidden_channels, num_clusters):
super().__init__()
# GNN for node embedding
self.gnn_embed = nn.Sequential(
nn.Linear(in_channels, hidden_channels),
nn.ReLU()
)
# GNN for cluster assignment
self.gnn_pool = nn.Sequential(
nn.Linear(in_channels, num_clusters),
)
def forward(self, x, adj, mask=None):
embed = self.gnn_embed(x)
# Cluster assignment matrix
s = torch.softmax(self.gnn_pool(x), dim=-1)
# DiffPool
out, out_adj, link_loss, entropy_loss = dense_diff_pool(embed, adj, s, mask)
return out, out_adj, link_loss, entropy_loss
6. Link Prediction
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.utils import negative_sampling
class LinkPredictor(nn.Module):
"""Link prediction model"""
def __init__(self, in_channels, hidden_channels, out_channels):
super().__init__()
# Node embedding encoder
self.encoder = nn.ModuleList([
GCNConv(in_channels, hidden_channels),
GCNConv(hidden_channels, out_channels)
])
# Edge decoder
self.decoder = nn.Sequential(
nn.Linear(out_channels * 2, out_channels),
nn.ReLU(),
nn.Linear(out_channels, 1)
)
def encode(self, x, edge_index):
for i, conv in enumerate(self.encoder):
x = conv(x, edge_index)
if i < len(self.encoder) - 1:
x = F.relu(x)
return x
def decode(self, z, edge_index):
# Concatenate source/target node embeddings
src, dst = edge_index
edge_feat = torch.cat([z[src], z[dst]], dim=1)
return self.decoder(edge_feat).squeeze()
def forward(self, x, edge_index, pos_edge_index, neg_edge_index):
z = self.encode(x, edge_index)
pos_pred = self.decode(z, pos_edge_index)
neg_pred = self.decode(z, neg_edge_index)
return pos_pred, neg_pred
def train_link_prediction(model, data, optimizer):
model.train()
optimizer.zero_grad()
# Node embedding
z = model.encode(data.x, data.edge_index)
# Positive edges
pos_edge = data.train_pos_edge_index
# Negative edge sampling
neg_edge = negative_sampling(
edge_index=pos_edge,
num_nodes=data.num_nodes,
num_neg_samples=pos_edge.size(1)
)
pos_pred = model.decode(z, pos_edge)
neg_pred = model.decode(z, neg_edge)
# Binary cross-entropy loss
pred = torch.cat([pos_pred, neg_pred])
labels = torch.cat([
torch.ones(pos_pred.size(0)),
torch.zeros(neg_pred.size(0))
]).to(pred.device)
loss = F.binary_cross_entropy_with_logits(pred, labels)
loss.backward()
optimizer.step()
return loss.item()
7. PyTorch Geometric (PyG) Complete Guide
Installation
pip install torch-geometric
pip install torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu121.html
Data Object
from torch_geometric.data import Data
import torch
# Create graph data
x = torch.randn(6, 3) # 6 nodes, 3-dimensional features
edge_index = torch.tensor([
[0, 1, 2, 3, 4, 0],
[1, 2, 3, 4, 0, 3]
], dtype=torch.long)
y = torch.tensor([0, 1, 0, 1, 0, 1]) # Node labels
edge_attr = torch.randn(6, 2) # Edge features
data = Data(
x=x,
edge_index=edge_index,
y=y,
edge_attr=edge_attr
)
print(data)
print(f"Nodes: {data.num_nodes}")
print(f"Edges: {data.num_edges}")
print(f"Node feature dim: {data.num_node_features}")
print(f"Edge feature dim: {data.num_edge_features}")
print(f"Has self-loops: {data.has_self_loops()}")
print(f"Is directed: {data.is_directed()}")
# Validation
print(f"Valid data: {data.validate()}")
DataLoader and Mini-batching
from torch_geometric.data import Data, DataLoader
import torch
# Create graph dataset
dataset = []
for _ in range(100):
n = torch.randint(5, 20, (1,)).item() # 5-20 nodes
e = torch.randint(10, 40, (1,)).item() # 10-40 edges
data = Data(
x=torch.randn(n, 8),
edge_index=torch.randint(0, n, (2, e)),
y=torch.randint(0, 3, (1,)) # Graph label
)
dataset.append(data)
# DataLoader: batch multiple graphs into one disconnected graph
loader = DataLoader(dataset, batch_size=32, shuffle=True)
for batch in loader:
print(f"Number of graphs in batch: {batch.num_graphs}")
print(f"Total nodes: {batch.num_nodes}")
print(f"Total edges: {batch.num_edges}")
print(f"Batch vector: {batch.batch.shape}") # Graph index per node
break
Built-in Datasets
from torch_geometric.datasets import (
Planetoid, # Cora, Citeseer, PubMed
TUDataset, # Molecular datasets (MUTAG, ENZYMES, etc.)
)
from torch_geometric.transforms import NormalizeFeatures
# Cora citation network
cora = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
print(f"Cora - Nodes: {cora[0].num_nodes}, Edges: {cora[0].num_edges}")
# MUTAG molecular dataset
mutag = TUDataset(root='/tmp/TUDataset', name='MUTAG')
print(f"MUTAG - Graphs: {len(mutag)}, Classes: {mutag.num_classes}")
# Open Graph Benchmark (large-scale)
try:
from ogb.nodeproppred import PygNodePropPredDataset
dataset_ogb = PygNodePropPredDataset(name='ogbn-arxiv')
split_idx = dataset_ogb.get_idx_split()
data_ogb = dataset_ogb[0]
print(f"OGB-Arxiv - Nodes: {data_ogb.num_nodes}, Edges: {data_ogb.num_edges}")
except ImportError:
print("ogb not installed. pip install ogb")
Complete Node Classification Pipeline
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv, SAGEConv
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
# Load data
dataset = Planetoid(root='/tmp/Cora', name='Cora', transform=NormalizeFeatures())
data = dataset[0]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data = data.to(device)
class MultiLayerGNN(nn.Module):
"""Model combining multiple GNN layers"""
def __init__(self, in_channels, hidden_channels, out_channels,
gnn_type="gcn", num_layers=3, dropout=0.5):
super().__init__()
self.dropout = dropout
self.gnn_type = gnn_type
self.convs = nn.ModuleList()
self.bns = nn.ModuleList()
# Input layer
self.convs.append(self._make_conv(in_channels, hidden_channels, gnn_type))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# Hidden layers
for _ in range(num_layers - 2):
self.convs.append(self._make_conv(hidden_channels, hidden_channels, gnn_type))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# Output layer
self.convs.append(self._make_conv(hidden_channels, out_channels, gnn_type))
def _make_conv(self, in_ch, out_ch, gnn_type):
if gnn_type == "gcn":
return GCNConv(in_ch, out_ch)
elif gnn_type == "sage":
return SAGEConv(in_ch, out_ch)
elif gnn_type == "gat":
return GATConv(in_ch, out_ch, heads=1)
else:
raise ValueError(f"Unknown GNN type: {gnn_type}")
def forward(self, x, edge_index):
for i, conv in enumerate(self.convs[:-1]):
x = conv(x, edge_index)
x = self.bns[i](x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
return self.convs[-1](x, edge_index)
def run_experiment(gnn_type, epochs=200):
model = MultiLayerGNN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes,
gnn_type=gnn_type,
num_layers=3
).to(device)
optimizer = torch.optim.Adam(
model.parameters(), lr=0.01, weight_decay=5e-4
)
train_losses = []
val_accs = []
for epoch in range(epochs):
# Training
model.train()
optimizer.zero_grad()
out = model(data.x, data.edge_index)
loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
loss.backward()
optimizer.step()
train_losses.append(loss.item())
# Evaluation
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
val_acc = pred[data.val_mask].eq(data.y[data.val_mask]).sum().item()
val_acc /= data.val_mask.sum().item()
val_accs.append(val_acc)
# Final test
model.eval()
with torch.no_grad():
out = model(data.x, data.edge_index)
pred = out.argmax(dim=1)
test_acc = pred[data.test_mask].eq(data.y[data.test_mask]).sum().item()
test_acc /= data.test_mask.sum().item()
return test_acc, train_losses, val_accs
# Compare different GNNs
results = {}
for gnn_type in ["gcn", "sage", "gat"]:
test_acc, losses, val_accs = run_experiment(gnn_type)
results[gnn_type] = test_acc
print(f"{gnn_type.upper():10s}: Test Accuracy = {test_acc:.4f}")
Complete Graph Classification Example
from torch_geometric.datasets import TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import (
GINConv, global_mean_pool, global_add_pool
)
import torch
import torch.nn as nn
import torch.nn.functional as F
# Load MUTAG dataset
dataset = TUDataset(root='/tmp/TUDataset', name='MUTAG')
dataset = dataset.shuffle()
# Train/test split
n = len(dataset)
train_dataset = dataset[:int(0.8 * n)]
test_dataset = dataset[int(0.8 * n):]
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)
class GIN(nn.Module):
"""
Graph Isomorphism Network (GIN) - most expressive GNN
Has stronger discriminative power than GCN
"""
def __init__(self, in_channels, hidden_channels, out_channels,
num_layers=5, dropout=0.5):
super().__init__()
self.dropout = dropout
self.convs = nn.ModuleList()
self.bns = nn.ModuleList()
for i in range(num_layers):
in_ch = in_channels if i == 0 else hidden_channels
# MLP for GIN
mlp = nn.Sequential(
nn.Linear(in_ch, hidden_channels),
nn.BatchNorm1d(hidden_channels),
nn.ReLU(),
nn.Linear(hidden_channels, hidden_channels)
)
self.convs.append(GINConv(mlp, train_eps=True))
self.bns.append(nn.BatchNorm1d(hidden_channels))
# Graph classifier
self.classifier = nn.Sequential(
nn.Linear(hidden_channels * num_layers, hidden_channels),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_channels, out_channels)
)
def forward(self, x, edge_index, batch):
# Store outputs from each layer
xs = []
for conv, bn in zip(self.convs, self.bns):
x = conv(x, edge_index)
x = bn(x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
xs.append(global_add_pool(x, batch)) # Graph-level aggregation
# Concatenate graph representations from all layers
out = torch.cat(xs, dim=1)
return self.classifier(out)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_gin = GIN(
in_channels=dataset.num_features,
hidden_channels=64,
out_channels=dataset.num_classes
).to(device)
optimizer = torch.optim.Adam(model_gin.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
def train_gin():
model_gin.train()
total_loss = 0
for batch in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
out = model_gin(batch.x, batch.edge_index, batch.batch)
loss = F.cross_entropy(out, batch.y)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def test_gin(loader):
model_gin.eval()
correct = 0
for batch in loader:
batch = batch.to(device)
with torch.no_grad():
pred = model_gin(batch.x, batch.edge_index, batch.batch).argmax(dim=1)
correct += pred.eq(batch.y).sum().item()
return correct / len(loader.dataset)
for epoch in range(1, 201):
loss = train_gin()
train_acc = test_gin(train_loader)
test_acc = test_gin(test_loader)
scheduler.step()
if epoch % 20 == 0:
print(f"Epoch {epoch:03d} | Loss: {loss:.4f} | "
f"Train: {train_acc:.4f} | Test: {test_acc:.4f}")
8. DGL (Deep Graph Library) Comparison
# DGL example - comparison with PyG
# pip install dgl
try:
import dgl
import dgl.nn as dglnn
import torch
import torch.nn as nn
import torch.nn.functional as F
class DGLGCN(nn.Module):
"""GCN implemented with DGL"""
def __init__(self, in_feats, hidden_size, num_classes):
super().__init__()
self.conv1 = dglnn.GraphConv(in_feats, hidden_size)
self.conv2 = dglnn.GraphConv(hidden_size, num_classes)
def forward(self, g, features):
x = F.relu(self.conv1(g, features))
x = F.dropout(x, training=self.training)
return self.conv2(g, x)
# Create DGL graph
src = torch.tensor([0, 1, 2, 3, 4])
dst = torch.tensor([1, 2, 3, 4, 0])
g = dgl.graph((src, dst))
g.ndata['feat'] = torch.randn(5, 16)
model_dgl = DGLGCN(16, 32, 4)
out = model_dgl(g, g.ndata['feat'])
print(f"DGL GCN output: {out.shape}")
except ImportError:
print("DGL not installed. pip install dgl")
PyG vs DGL Comparison:
| Feature | PyTorch Geometric (PyG) | Deep Graph Library (DGL) |
|---|---|---|
| API style | PyTorch-native | Framework-agnostic |
| Data representation | edge_index (COO) | DGLGraph object |
| Speed | Very fast | Fast |
| Community | Large | Large |
| Available models | Very extensive | Extensive |
| Learning curve | Low | Medium |
9. Real-World Applications
Molecular Property Prediction (OGB)
try:
from ogb.graphproppred import PygGraphPropPredDataset, Evaluator
from torch_geometric.loader import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GINEConv, global_mean_pool
# Load HIV molecule dataset
dataset_mol = PygGraphPropPredDataset(name='ogbg-molhiv')
split_idx = dataset_mol.get_idx_split()
train_loader_mol = DataLoader(
dataset_mol[split_idx["train"]],
batch_size=32,
shuffle=True
)
class MoleculeGNN(nn.Module):
"""Molecular property prediction model"""
def __init__(self, hidden_channels=300, num_layers=5):
super().__init__()
self.atom_encoder = nn.Embedding(100, hidden_channels)
self.bond_encoder = nn.Embedding(10, hidden_channels)
self.convs = nn.ModuleList()
for _ in range(num_layers):
mlp = nn.Sequential(
nn.Linear(hidden_channels, hidden_channels * 2),
nn.BatchNorm1d(hidden_channels * 2),
nn.ReLU(),
nn.Linear(hidden_channels * 2, hidden_channels)
)
self.convs.append(GINEConv(mlp))
self.pool = global_mean_pool
self.predictor = nn.Linear(hidden_channels, 1)
def forward(self, x, edge_index, edge_attr, batch):
x = self.atom_encoder(x.squeeze())
edge_attr = self.bond_encoder(edge_attr.squeeze())
for conv in self.convs:
x = conv(x, edge_index, edge_attr)
x = F.relu(x)
graph_embed = self.pool(x, batch)
return self.predictor(graph_embed)
print("OGB molecular dataset loaded successfully")
except ImportError:
print("ogb not installed. pip install ogb")
Recommendation System
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import LightGCN
class RecommendationSystem(nn.Module):
"""
LightGCN-based collaborative filtering
Learn embeddings on a user-item bipartite graph
"""
def __init__(self, num_users, num_items, embedding_dim=64, num_layers=3):
super().__init__()
self.num_users = num_users
self.num_items = num_items
self.embedding_dim = embedding_dim
self.num_layers = num_layers
# User/item embeddings
self.user_emb = nn.Embedding(num_users, embedding_dim)
self.item_emb = nn.Embedding(num_items, embedding_dim)
# LightGCN: simple aggregation without non-linear transforms
self.lightgcn = LightGCN(
num_nodes=num_users + num_items,
embedding_dim=embedding_dim,
num_layers=num_layers
)
self._init_weights()
def _init_weights(self):
nn.init.normal_(self.user_emb.weight, std=0.01)
nn.init.normal_(self.item_emb.weight, std=0.01)
def forward(self, edge_index):
# Full node embeddings
x = torch.cat([self.user_emb.weight, self.item_emb.weight], dim=0)
# LightGCN propagation
embeddings = self.lightgcn(x, edge_index)
return embeddings[:self.num_users], embeddings[self.num_users:]
def predict(self, user_ids, item_ids, edge_index):
user_embs, item_embs = self(edge_index)
u = user_embs[user_ids]
i = item_embs[item_ids]
return (u * i).sum(dim=1)
# BPR Loss
def bpr_loss(pos_scores, neg_scores):
"""Bayesian Personalized Ranking Loss"""
return -F.logsigmoid(pos_scores - neg_scores).mean()
10. Graph Generative Models
GraphVAE
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
class GraphVAE(nn.Module):
"""Graph Variational Autoencoder"""
def __init__(self, in_channels, hidden_channels, latent_dim):
super().__init__()
# Encoder (GNN)
self.conv1 = GCNConv(in_channels, hidden_channels)
self.conv_mu = GCNConv(hidden_channels, latent_dim)
self.conv_logvar = GCNConv(hidden_channels, latent_dim)
def encode(self, x, edge_index):
h = F.relu(self.conv1(x, edge_index))
mu = self.conv_mu(h, edge_index)
logvar = self.conv_logvar(h, edge_index)
return mu, logvar
def reparameterize(self, mu, logvar):
if self.training:
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
return mu
def decode(self, z):
# Compute edge probabilities via inner product
adj_pred = torch.sigmoid(z @ z.t())
return adj_pred
def forward(self, x, edge_index):
mu, logvar = self.encode(x, edge_index)
z = self.reparameterize(mu, logvar)
adj_pred = self.decode(z)
return adj_pred, mu, logvar
def loss(self, adj_pred, adj_target, mu, logvar):
# Reconstruction loss
recon_loss = F.binary_cross_entropy(adj_pred, adj_target)
# KL divergence
kl_loss = -0.5 * torch.mean(
1 + logvar - mu.pow(2) - logvar.exp()
)
return recon_loss + kl_loss
Quiz
Q1. What is the main difference between GCN and GAT?
Answer: GCN aggregates all neighbors with fixed weights based on node degree, while GAT dynamically learns different attention weights for each neighbor.
Explanation: In GCN, the aggregation weights are fixed by the degree normalization of the adjacency matrix. In GAT, attention coefficients are computed dynamically based on the feature vectors of the two connected nodes. This allows GAT to assign higher weights to more important neighbors. Multi-head attention further improves stability, which is another advantage of GAT.
Q2. Why is GraphSAGE better suited for inductive learning than GCN?
Answer: GraphSAGE learns an aggregator function that can generate embeddings for new nodes by sampling and aggregating their neighbors.
Explanation: GCN requires the full adjacency matrix of the graph at training time, making it a transductive method that needs retraining when new nodes are added. GraphSAGE learns an aggregation function that samples and aggregates neighbor features, so it can generate embeddings for unseen nodes by applying the same function to their neighborhoods. This is why GraphSAGE is used in production systems like Pinterest and LinkedIn that deal with dynamically changing large-scale graphs.
Q3. What are the three stages of the Message Passing Neural Network (MPNN) framework?
Answer: Message (computing messages), Aggregate (aggregating messages), and Update (updating node embeddings).
Explanation: In the Message stage, a message is computed for each edge based on the source node features. In the Aggregate stage, each node collects all incoming messages from its neighbors using sum, mean, or max aggregation. In the Update stage, the aggregated message is combined with the current node embedding to produce a new embedding. Most GNN variants including GCN, GAT, GraphSAGE, and GIN can all be unified under this framework.
Q4. What is the over-smoothing problem in GNNs, and how can it be mitigated?
Answer: As GNN depth increases, all node embeddings converge to similar values. It can be mitigated with residual connections, JK-Net, or DropEdge.
Explanation: A K-layer GNN aggregates information from K-hop neighborhoods. As layers increase, increasingly larger neighborhoods are included, causing all node representations to converge toward the same global average. Residual connections preserve unique node information by directly passing previous layer outputs. Jumping Knowledge Networks (JK-Net) use embeddings from all layers in the final representation. DropEdge randomly removes edges during training to reduce neighbor overlap.
Q5. What does it mean that GNN expressiveness is equivalent to the WL test?
Answer: Standard GNNs embed two graphs identically if the Weisfeiler-Leman (WL) graph isomorphism test cannot distinguish them.
Explanation: The WL test is an algorithm for determining whether two graphs are isomorphic by iteratively aggregating and hashing neighbor labels. Xu et al. (2019) proved through the Graph Isomorphism Network (GIN) that standard GNNs are at most as powerful as the 1-WL test. This means GNNs cannot distinguish graph pairs that the WL test also fails on, such as regular graphs with the same degree sequence. To overcome this limitation, research is ongoing into higher-order k-WL equivalent GNNs, port numbering, and random feature augmentation.
Summary
This guide covered the entire GNN ecosystem:
- Graph Theory Fundamentals: Nodes, edges, adjacency matrices, graph properties
- Message Passing Paradigm: The core principle of GNNs
- Key Architectures: GCN, GraphSAGE, GAT, Graph Transformer, GIN
- Graph-Level Prediction: Global Pooling, DiffPool
- Link Prediction: Knowledge graphs, recommendation systems
- PyTorch Geometric: Complete node and graph classification examples
- Real-World Applications: Molecular design, recommendation systems, fraud detection
- Graph Generative Models: GraphVAE
GNNs are delivering revolutionary results in molecular design, drug discovery, social network analysis, traffic prediction, recommendation systems, and many other fields. Libraries like PyTorch Geometric and DGL make implementation increasingly accessible, and benchmarks like OGB enable fair comparisons across methods.