현재 test 코드 내용 (하단 첨부)
cuda 로 작성된 kernel 코드를 pybinidng 한 pyd 로부터 호출해서 사용
numpy 와 비교하여 실제 수치 검증 ( 일치 확인 )
그렇다면 실제 개별 커널 별 내용에 대해선 어떻게 확인할 수 있을까?
ncu --set full --export report_smoke.ncu-rep --target-processes all *.py
위 명령어를 통해 .ncu-rep 파일로 보고서를 저장하도록 한다.
이후 Nsight Compute GUI 를 실행하여 확인할 수 있다. ( 동시에 CLI 에서 확인하는 방법도 존재 )
ncu --set speedOfLight ^
--section "SpeedOfLight" ^
--section "MemoryWorkloadAnalysis" ^
--export report_smoke.ncu-rep ^
--target-processes all ^
*.py
실제 확인 내용

가장 먼저 Summary 를 통해 제시해주는 최적화 항목 확인 가능
Small Grid - 실질적 병렬성이 낮음
The grid for this launch is configured to execute only 2 blocks...
This can underutilize some multiprocessors...
- 38 개의 SM
- 현재 커널들은 grid = (1, 2, 1) 또는 (2, 1, 1) 같이 블록이 총 2개 밖에 안 됨
- 2개만 사용하는 상태
이를 통해 GPU 활용률이 낮은 이유를 확인 가능
- tiny gemm 의 문제
- 정식 튜닝은 m, n, k 가 훨씬 큰 사이즈에서 해야 의미 있음.
Achieved Occupancy 매우 낮음
The difference between theoretical (100%) and measured achieved occupancy ( 16.8% )
- 구조적 문제일 수도 있음
- tiny GEMM 에서의 현상
- SM 당 active warp 가 적어서 latency hiding 이 불가능
해결 방법
- occupancy 는 문제 크기 확장 & block / tile 설계에서 본격적으로 맞추는것
- tiny 에서는 ㄴㄴ
Long Scoreboard Stall
Warp 가 어떤 load/store 결과가 도착하기를 기다리면서 멈춰 있는 상태
- GPU warp 는 명령어를 실행하다가
- 이전 명령어의 결과가 아직 도착하지 않음, 후속 명령 실행 불가
- scoreboard(레지스터 의존성 관리자) 에서 대기 명령, warp 를 멈추는 현상
메모리 레이턴시 때문에 warp 가 멈춰 있다, 메모리 접근 패턴이 병목이다.
워크로드가 작아, warp 수가 부족,
global load 는 항상 고비용, gemm 의 대부분?
shared memory tiling 이 아주 작은 데이터에서는 비효율적
하지만 큰 사이즈의 GEMM 을 튜닝할 때도 Long Scoreboard stall 이 항상 가장 먼저 해결해야 할 병목
GEMM 의 대부분은 A tile, B tile 을 shared memory 로 불러오는 과정이 반복, 해당 load 가 shared prefech 도착하지 않으면, ALU, MMA 연산이 멈춘다.
현재 분석 내용은 커널 내 구현 문제일 수도 있고, 테스트 크기가 너무 작아 발생하는 특징일 수도 있다.
지금 상태에서 개별 커널을 분석하는 건 너무 이름
진짜 튜닝은 실ㅈ workload 에서 수행해야 함, 테스트 스케일을 키우자
지금의 smoke 테스트는 correctness 와 호출 경로 검증용
import os
import sys
# repo 루트: .../graph_executor_v2
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
# python/ 디렉토리를 sys.path에 추가
PYTHON_ROOT = os.path.join(ROOT, "python")
if PYTHON_ROOT not in sys.path:
sys.path.insert(0, PYTHON_ROOT)
import numpy as np
import cupy as cp
from graph_executor_v2.ops import _ops_gemm as gemm
# ============================================================
# Activation (fwd/bwd) 레퍼런스 구현 (NumPy)
# ============================================================
def act_forward_np(z: np.ndarray, act: str, leaky_slope: float = 0.01) -> np.ndarray:
act = act.lower()
if act == "none":
return z
if act == "relu":
return np.maximum(z, 0.0)
if act in ("leakyrelu", "leaky_relu", "lrelu"):
return np.where(z > 0.0, z, leaky_slope * z)
if act == "sigmoid":
return 1.0 / (1.0 + np.exp(-z))
if act == "tanh":
return np.tanh(z)
if act == "gelu":
# 표준 GELU (approx) 사용
# 0.5 * x * (1 + erf(x / sqrt(2)))
from math import sqrt
return 0.5 * z * (1.0 + np.erf(z / np.float32(sqrt(2.0))))
raise ValueError(f"unsupported act: {act}")
def act_backward_np(z: np.ndarray,
gy: np.ndarray,
act: str,
leaky_slope: float = 0.01) -> np.ndarray:
"""gZ = dL/dZ = dL/dY * dY/dZ"""
act = act.lower()
if act == "none":
return gy
if act == "relu":
mask = (z > 0.0).astype(z.dtype)
return gy * mask
if act in ("leakyrelu", "leaky_relu", "lrelu"):
slope = np.ones_like(z, dtype=z.dtype)
slope[z < 0.0] = leaky_slope
return gy * slope
if act == "sigmoid":
s = 1.0 / (1.0 + np.exp(-z))
return gy * s * (1.0 - s)
if act == "tanh":
t = np.tanh(z)
return gy * (1.0 - t * t)
if act == "gelu":
# GELU 도함수: 근사식 사용 (버전 따라 다를 수 있어 약간 오차 감안)
from math import sqrt, pi
x = z
k = np.sqrt(2.0 / np.pi)
c = 0.044715
x3 = x * x * x
tanh_arg = k * (x + c * x3)
t = np.tanh(tanh_arg)
dtanh = 1.0 - t * t
term1 = 0.5 * (1.0 + t)
term2 = 0.5 * x * dtanh * k * (1.0 + 3.0 * c * x * x)
dgelu = term1 + term2
return gy * dgelu
raise ValueError(f"unsupported act: {act}")
# ============================================================
# GEMM + bias + act 레퍼런스 (NumPy)
# ============================================================
def gemm_forward_ref(A_h: np.ndarray,
B_h: np.ndarray,
bias_h: np.ndarray | None,
act: str,
leaky_slope: float = 0.01):
"""
Z = A @ B + bias
Y = act(Z)
bias: None or shape (1, N)
"""
Z = A_h @ B_h # (M,K) @ (K,N) -> (M,N)
if bias_h is not None:
# bias_h: (1,N) broadcast over rows
Z = Z + bias_h.astype(A_h.dtype)
Y = act_forward_np(Z, act, leaky_slope)
return Y, Z
def gemm_backward_ref(A_h: np.ndarray,
B_h: np.ndarray,
bias_h: np.ndarray | None,
gY_h: np.ndarray,
Z_h: np.ndarray,
act: str,
leaky_slope: float = 0.01):
"""
gZ = gY * act'(Z)
gA = gZ @ B^T
gB = A^T @ gZ
gBias = sum over rows (PerN: (1,N))
"""
gZ = act_backward_np(Z_h, gY_h, act, leaky_slope) # (M,N)
gA = gZ @ B_h.T # (M,N) @ (N,K) -> (M,K)
gB = A_h.T @ gZ # (K,M) @ (M,N) -> (K,N)
if bias_h is not None:
gBias = gZ.sum(axis=0, keepdims=True) # (1,N)
else:
gBias = None
return gA, gB, gBias
# ============================================================
# Raw 포인터 기반 GEMM 테스트
# ============================================================
def run_case_raw(m=32, k=64, n=16,
with_bias=True,
act="relu",
leaky_slope=0.01,
atol=1e-5, rtol=1e-4):
print(f"[smoke-raw] m={m}, k={k}, n={n}, with_bias={with_bias}, act={act}")
rng = np.random.default_rng(2025)
# -------- host 데이터 (reference 계산용) --------
A_h = rng.standard_normal((m, k), dtype=np.float32)
B_h = rng.standard_normal((k, n), dtype=np.float32)
gY_h = rng.standard_normal((m, n), dtype=np.float32)
if with_bias:
bias_h = rng.standard_normal((1, n), dtype=np.float32)
else:
bias_h = None
# 레퍼런스 fwd/bwd
Y_ref, Z_ref = gemm_forward_ref(A_h, B_h, bias_h, act, leaky_slope)
gA_ref, gB_ref, gBias_ref = gemm_backward_ref(
A_h, B_h, bias_h, gY_h, Z_ref, act, leaky_slope
)
# -------- GPU(CuPy) 버퍼 --------
A_d = cp.asarray(A_h)
B_d = cp.asarray(B_h)
gY_d = cp.asarray(gY_h)
Y_d = cp.empty((m, n), dtype=cp.float32)
Z_d = cp.empty((m, n), dtype=cp.float32) # forward_raw 에서 save_z=True 로 채우게 할 것
gA_d = cp.empty((m, k), dtype=cp.float32)
gB_d = cp.empty((k, n), dtype=cp.float32)
if with_bias:
Bias_d = cp.asarray(bias_h)
gBias_d = cp.empty((1, n), dtype=cp.float32)
Bias_ptr = int(Bias_d.data.ptr)
gBias_ptr = int(gBias_d.data.ptr)
else:
Bias_d = None
gBias_d = None
Bias_ptr = 0
gBias_ptr = 0
# C / gC 는 사용하지 않으므로 0 포인터
C_ptr = 0
gC_ptr = 0
# -------- 포인터 추출 --------
A_ptr = int(A_d.data.ptr)
B_ptr = int(B_d.data.ptr)
Y_ptr = int(Y_d.data.ptr)
Z_ptr = int(Z_d.data.ptr)
gY_ptr = int(gY_d.data.ptr)
gA_ptr = int(gA_d.data.ptr)
gB_ptr = int(gB_d.data.ptr)
# -------------------------------------------------
# Forward_raw: save_z=True → Z_d 에 pre-act(Z) 저장
# -------------------------------------------------
print("[smoke-raw] running gemm.forward_raw() ...")
gemm.forward_raw(
A_ptr,
B_ptr,
Bias_ptr,
Y_ptr,
m, k, n,
False, # trans_a
False, # trans_b
act,
with_bias,
leaky_slope,
True, # save_z
Z_ptr,
None, # stream
)
cp.cuda.runtime.deviceSynchronize()
# -------------------------------------------------
# Backward_raw
# -------------------------------------------------
print("[smoke-raw] running gemm.backward_raw() ...")
gemm.backward_raw(
A_ptr,
B_ptr,
C_ptr,
gY_ptr,
Z_ptr,
gA_ptr,
gB_ptr,
gC_ptr,
gBias_ptr,
m, k, n,
False, # trans_a
False, # trans_b
act,
with_bias,
leaky_slope,
None, # stream
)
cp.cuda.runtime.deviceSynchronize()
# -------- 결과 가져오기 --------
Y_out = cp.asnumpy(Y_d)
Z_out = cp.asnumpy(Z_d)
gA_out = cp.asnumpy(gA_d)
gB_out = cp.asnumpy(gB_d)
if with_bias:
gBias_out = cp.asnumpy(gBias_d)
else:
gBias_out = None
# -------- NaN 체크 --------
print("[smoke-raw] Y_out shape:", Y_out.shape, "nan?", np.isnan(Y_out).any())
print("[smoke-raw] gA_out shape:", gA_out.shape, "nan?", np.isnan(gA_out).any())
print("[smoke-raw] gB_out shape:", gB_out.shape, "nan?", np.isnan(gB_out).any())
if with_bias:
print("[smoke-raw] gBias_out shape:", gBias_out.shape, "nan?", np.isnan(gBias_out).any())
# -------- 레퍼런스와 diff --------
max_err_Y = float(np.max(np.abs(Y_out - Y_ref)))
max_err_Z = float(np.max(np.abs(Z_out - Z_ref)))
max_err_gA = float(np.max(np.abs(gA_out - gA_ref)))
max_err_gB = float(np.max(np.abs(gB_out - gB_ref)))
if with_bias:
max_err_gBias = float(np.max(np.abs(gBias_out - gBias_ref)))
else:
max_err_gBias = 0.0
print(f"[check] max|Y_out - Y_ref| = {max_err_Y:.3e}")
print(f"[check] max|Z_out - Z_ref| = {max_err_Z:.3e}")
print(f"[check] max|gA_out - gA_ref| = {max_err_gA:.3e}")
print(f"[check] max|gB_out - gB_ref| = {max_err_gB:.3e}")
if with_bias:
print(f"[check] max|gBias_out - gBias_ref| = {max_err_gBias:.3e}")
# -------- 간단한 assert --------
# GELU 의 경우 커널 쪽 구현이랑 수식이 조금 다를 수 있어서 여유를 조금 더 줘도 됨.
if act.lower() == "gelu":
tol_atol = max(atol, 5e-4)
tol_rtol = max(rtol, 1e-3)
else:
tol_atol = atol
tol_rtol = rtol
def _assert_close(name, out, ref, max_err):
if ref is None and out is None:
return
if ref is None and out is not None:
raise AssertionError(f"{name}: ref is None but out is not None")
ok = np.allclose(out, ref, atol=tol_atol, rtol=tol_rtol)
status = "OK" if ok else "FAIL"
print(f"[assert] {name}: {status} (max_err={max_err:.3e}, atol={tol_atol}, rtol={tol_rtol})")
if not ok:
raise AssertionError(f"{name} mismatch")
_assert_close("Y", Y_out, Y_ref, max_err_Y)
_assert_close("Z", Z_out, Z_ref, max_err_Z)
_assert_close("gA", gA_out, gA_ref, max_err_gA)
_assert_close("gB", gB_out, gB_ref, max_err_gB)
if with_bias:
_assert_close("gBias", gBias_out, gBias_ref, max_err_gBias)
print("[smoke-raw] GEMM binding forward_raw/backward_raw correctness OK.\n")
if __name__ == "__main__":
# 기본 케이스 몇 개 돌려보기
run_case_raw(with_bias=True, act="relu")
run_case_raw(with_bias=False, act="none")
# 필요하면 다른 활성화도 추가로 검증 가능
# run_case_raw(with_bias=True, act="leakyrelu")
# run_case_raw(with_bias=True, act="sigmoid")
# run_case_raw(with_bias=True, act="tanh")
# run_case_raw(with_bias=True, act="gelu")
print("[smoke] all done.")
'dev_AI_framework' 카테고리의 다른 글
| GPU 관련 개념 꽉 잡기 (0) | 2025.11.16 |
|---|---|
| 1024, 1024 gemm 벤치 코드 테스트 (0) | 2025.11.16 |
| CUDA 성능 분석 도구 다른거 사용하자, (NVTX - 성능 분석용 태깅 도구) 실제 커널최적화의 경우 Ncu ( Nsight Compute) (0) | 2025.11.15 |
| pyd(raw) - glue(gemm.py) - layer(Dense) - capture_safe 경로까지 전부 Numpy 레퍼런스와 일치 확인 ( float32 rounding 수준이라는 개념 습득! ) (0) | 2025.11.15 |
| _ops_common 의 구현을 통한 공통 shim 타입 / 규약의 단일 진입점의 역할 수행 (0) | 2025.11.15 |