Skip to content
Published on

Spring Boot Batch 완전 가이드: Job, Step, Chunk 처리와 실전 패턴

Authors

1. Spring Batch 아키텍처

Spring Batch는 대용량 데이터를 처리하기 위한 경량 배치 프레임워크입니다. ETL(Extract, Transform, Load), 데이터 마이그레이션, 리포트 생성, 정산 처리 등 다양한 배치 작업에 활용됩니다.

핵심 아키텍처 구성요소

Job
└── Step 1
│   └── Chunk (ChunkSize: 100)
│       ├── ItemReader   (읽기)
│       ├── ItemProcessor (변환/필터)
│       └── ItemWriter   (쓰기)
└── Step 2
    └── Tasklet (단일 작업)

주요 컴포넌트:

  • Job: 배치 작업의 최상위 단위. 하나 이상의 Step으로 구성
  • Step: Job의 실행 단위. Chunk 기반 또는 Tasklet 기반
  • Chunk: 지정된 크기의 데이터를 읽고, 처리하고, 쓰는 단위
  • ItemReader: 데이터 소스에서 아이템을 하나씩 읽는 인터페이스
  • ItemProcessor: 읽은 아이템을 변환하거나 필터링
  • ItemWriter: 처리된 아이템을 저장소에 기록

JobRepository, JobLauncher, JobExplorer

JobLauncher ──→ Job (실행 요청)
JobRepository (실행 이력 저장/조회)
JobExplorer (읽기 전용 이력 조회)

Spring Batch 메타 테이블

-- 주요 메타데이터 테이블
BATCH_JOB_INSTANCE    -- Job 인스턴스 정보
BATCH_JOB_EXECUTION   -- Job 실행 정보 (상태, 시작/종료 시간)
BATCH_JOB_PARAMS      -- Job 파라미터
BATCH_STEP_EXECUTION  -- Step 실행 정보
BATCH_STEP_EXECUTION_CONTEXT -- Step 컨텍스트 데이터
BATCH_JOB_EXECUTION_CONTEXT  -- Job 컨텍스트 데이터

2. 의존성 설정

Maven 의존성

<!-- Spring Boot Batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- 배치 메타 테이블용 DB (예: PostgreSQL) -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
</dependency>

<!-- 테스트 -->
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
</dependency>

application.yml 설정

spring:
  batch:
    job:
      enabled: false # 애플리케이션 시작 시 자동 실행 방지
    jdbc:
      initialize-schema: always # 메타 테이블 자동 생성
  datasource:
    url: jdbc:postgresql://localhost:5432/batchdb
    username: batchuser
    password: batchpass

logging:
  level:
    org.springframework.batch: DEBUG

3. 기본 Job 구성

UserMigrationJobConfig - 전체 구성 예시

@Configuration
@EnableBatchProcessing
public class UserMigrationJobConfig {

    // Job 정의
    @Bean
    public Job userMigrationJob(JobRepository jobRepository,
                                 Step migrationStep) {
        return new JobBuilder("userMigrationJob", jobRepository)
                .start(migrationStep)
                .listener(jobExecutionListener())
                .build();
    }

    // Step 정의 (Chunk 기반)
    @Bean
    public Step migrationStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               ItemReader<User> userItemReader,
                               ItemProcessor<User, UserDto> userItemProcessor,
                               ItemWriter<UserDto> userItemWriter) {
        return new StepBuilder("migrationStep", jobRepository)
                .<User, UserDto>chunk(100, txManager)
                .reader(userItemReader)
                .processor(userItemProcessor)
                .writer(userItemWriter)
                .faultTolerant()
                .skipLimit(10)
                .skip(DataIntegrityViolationException.class)
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .listener(stepExecutionListener())
                .build();
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("Job 시작: " + jobExecution.getJobInstance().getJobName());
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.printf("Job 완료: %s, 상태: %s%n",
                    jobExecution.getJobInstance().getJobName(),
                    jobExecution.getStatus());
            }
        };
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StepExecutionListener() {
            @Override
            public void beforeStep(StepExecution stepExecution) {
                System.out.println("Step 시작: " + stepExecution.getStepName());
            }

            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                System.out.printf("Step 완료: 읽기=%d, 처리=%d, 쓰기=%d%n",
                    stepExecution.getReadCount(),
                    stepExecution.getProcessSkipCount(),
                    stepExecution.getWriteCount());
                return stepExecution.getExitStatus();
            }
        };
    }
}

Tasklet 기반 Step

@Bean
public Step cleanupStep(JobRepository jobRepository,
                         PlatformTransactionManager txManager) {
    return new StepBuilder("cleanupStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // 단순한 일회성 작업에 적합
                System.out.println("임시 파일 정리 중...");
                // 파일 삭제 로직
                return RepeatStatus.FINISHED;
            }, txManager)
            .build();
}

4. ItemReader 구현

JdbcCursorItemReader - 대용량 DB 읽기

@Bean
@StepScope
public JdbcCursorItemReader<User> userCursorReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<User>()
            .name("userCursorReader")
            .dataSource(dataSource)
            .sql("SELECT id, username, email, status FROM users WHERE status = 'ACTIVE' ORDER BY id")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .fetchSize(1000)  // 한 번에 가져올 레코드 수
            .build();
}

JdbcPagingItemReader - 페이지 단위 읽기

@Bean
@StepScope
public JdbcPagingItemReader<User> userPagingReader(DataSource dataSource) {
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("SELECT id, username, email, status");
    queryProvider.setFromClause("FROM users");
    queryProvider.setWhereClause("WHERE status = 'ACTIVE'");
    queryProvider.setSortKeys(sortKeys);

    return new JdbcPagingItemReaderBuilder<User>()
            .name("userPagingReader")
            .dataSource(dataSource)
            .queryProvider(queryProvider)
            .pageSize(100)
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

FlatFileItemReader - CSV 파일 읽기

@Bean
@StepScope
public FlatFileItemReader<UserCsvDto> csvUserReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {

    return new FlatFileItemReaderBuilder<UserCsvDto>()
            .name("csvUserReader")
            .resource(new FileSystemResource(inputFile))
            .linesToSkip(1)  // 헤더 행 건너뜀
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "createdAt")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(UserCsvDto.class);
            }})
            .build();
}

커스텀 ItemReader 구현

@Component
@StepScope
public class ApiCallItemReader implements ItemReader<UserData> {

    private final UserApiClient apiClient;
    private int page = 0;
    private List<UserData> currentPageData = new ArrayList<>();
    private int currentIndex = 0;
    private boolean exhausted = false;

    public ApiCallItemReader(UserApiClient apiClient) {
        this.apiClient = apiClient;
    }

    @Override
    public UserData read() throws Exception {
        if (exhausted) return null;

        if (currentIndex >= currentPageData.size()) {
            // 다음 페이지 로드
            currentPageData = apiClient.fetchUsers(page++, 100);
            currentIndex = 0;

            if (currentPageData.isEmpty()) {
                exhausted = true;
                return null;
            }
        }

        return currentPageData.get(currentIndex++);
    }
}

5. ItemProcessor 구현

데이터 변환 및 필터링

@Component
@StepScope
public class UserMigrationProcessor implements ItemProcessor<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserMigrationProcessor.class);

    @Override
    public UserDto process(User user) throws Exception {
        // null 반환 = 해당 아이템 건너뜀 (skip)
        if (!isEligibleForMigration(user)) {
            log.debug("Skipping user: {}", user.getUsername());
            return null;
        }

        UserDto dto = new UserDto();
        dto.setId(user.getId());
        dto.setUsername(user.getUsername().toLowerCase().trim());
        dto.setEmail(user.getEmail().toLowerCase());
        dto.setDisplayName(formatDisplayName(user.getFirstName(), user.getLastName()));
        dto.setMigratedAt(LocalDateTime.now());

        return dto;
    }

    private boolean isEligibleForMigration(User user) {
        return user.getStatus() != null
            && "ACTIVE".equals(user.getStatus())
            && user.getEmail() != null
            && user.getEmail().contains("@");
    }

    private String formatDisplayName(String firstName, String lastName) {
        return Stream.of(firstName, lastName)
                .filter(s -> s != null && !s.isBlank())
                .collect(Collectors.joining(" "));
    }
}

CompositeItemProcessor - 여러 Processor 체인

@Bean
public CompositeItemProcessor<User, UserDto> compositeProcessor() {
    List<ItemProcessor<?, ?>> processors = new ArrayList<>();
    processors.add(new ValidationProcessor());
    processors.add(new EnrichmentProcessor(externalService));
    processors.add(new TransformationProcessor());

    CompositeItemProcessor<User, UserDto> composite = new CompositeItemProcessor<>();
    composite.setDelegates(processors);
    return composite;
}

6. ItemWriter 구현

JdbcBatchItemWriter - bulk INSERT/UPDATE

@Bean
public JdbcBatchItemWriter<UserDto> userDtoWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<UserDto>()
            .dataSource(dataSource)
            .sql("""
                INSERT INTO users_new (id, username, email, display_name, migrated_at)
                VALUES (:id, :username, :email, :displayName, :migratedAt)
                ON CONFLICT (id) DO UPDATE
                SET username = EXCLUDED.username,
                    email = EXCLUDED.email,
                    migrated_at = EXCLUDED.migrated_at
                """)
            .beanMapped()
            .build();
}

JpaItemWriter

@Bean
public JpaItemWriter<UserDto> jpaUserWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<UserDto> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

FlatFileItemWriter - 결과 파일 출력

@Bean
@StepScope
public FlatFileItemWriter<UserDto> csvResultWriter(
        @Value("#{jobParameters['outputFile']}") String outputFile) {

    return new FlatFileItemWriterBuilder<UserDto>()
            .name("csvResultWriter")
            .resource(new FileSystemResource(outputFile))
            .headerCallback(writer -> writer.write("id,username,email,migrated_at"))
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "migratedAt")
            .build();
}

CompositeItemWriter - 여러 대상에 동시 쓰기

@Bean
public CompositeItemWriter<UserDto> compositeWriter(
        JdbcBatchItemWriter<UserDto> dbWriter,
        FlatFileItemWriter<UserDto> fileWriter) {

    CompositeItemWriter<UserDto> writer = new CompositeItemWriter<>();
    writer.setDelegates(Arrays.asList(dbWriter, fileWriter));
    return writer;
}

7. 고급 기능

Partitioning - 대용량 데이터 병렬 처리

@Configuration
public class PartitionedJobConfig {

    @Bean
    public Step masterStep(JobRepository jobRepository,
                            Partitioner partitioner,
                            Step workerStep) {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("workerStep", partitioner)
                .step(workerStep)
                .gridSize(4)  // 파티션 수 (스레드 수)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Partitioner columnRangePartitioner(DataSource dataSource) {
        return gridSize -> {
            Map<String, ExecutionContext> result = new HashMap<>();
            int totalCount = getTotalCount(dataSource);
            int rangeSize = totalCount / gridSize;

            for (int i = 0; i < gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.putLong("minId", (long) i * rangeSize + 1);
                context.putLong("maxId", i == gridSize - 1 ? totalCount : (long)(i + 1) * rangeSize);
                result.put("partition" + i, context);
            }
            return result;
        };
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {

        // 파티션 범위에 맞는 데이터만 읽기
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("minId", minId);
        parameterValues.put("maxId", maxId);

        return new JdbcPagingItemReaderBuilder<User>()
                .name("partitionedReader")
                .dataSource(dataSource)
                // ... 쿼리 설정
                .parameterValues(parameterValues)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}

Multi-threaded Step

@Bean
public Step multiThreadedStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               ItemReader<User> reader,
                               ItemWriter<UserDto> writer) {
    return new StepBuilder("multiThreadedStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader)     // 주의: thread-safe한 Reader 필요 (SynchronizedItemStreamReader 사용)
            .writer(writer)
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .throttleLimit(4)   // 동시 실행 스레드 수
            .build();
}

// Thread-safe Reader 래핑
@Bean
public SynchronizedItemStreamReader<User> synchronizedReader(
        JdbcCursorItemReader<User> reader) {
    SynchronizedItemStreamReader<User> synchronizedReader = new SynchronizedItemStreamReader<>();
    synchronizedReader.setDelegate(reader);
    return synchronizedReader;
}

AsyncItemProcessor/AsyncItemWriter

@Bean
public AsyncItemProcessor<User, UserDto> asyncProcessor(
        UserMigrationProcessor delegateProcessor) {

    AsyncItemProcessor<User, UserDto> asyncProcessor = new AsyncItemProcessor<>();
    asyncProcessor.setDelegate(delegateProcessor);
    asyncProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
    return asyncProcessor;
}

@Bean
public AsyncItemWriter<UserDto> asyncWriter(
        JdbcBatchItemWriter<UserDto> delegateWriter) {

    AsyncItemWriter<UserDto> asyncWriter = new AsyncItemWriter<>();
    asyncWriter.setDelegate(delegateWriter);
    return asyncWriter;
}

JobParameters로 동적 설정

@Bean
@StepScope
public JdbcCursorItemReader<User> dynamicReader(
        DataSource dataSource,
        @Value("#{jobParameters['startDate']}") String startDate,
        @Value("#{jobParameters['endDate']}") String endDate) {

    return new JdbcCursorItemReaderBuilder<User>()
            .name("dynamicReader")
            .dataSource(dataSource)
            .sql("SELECT * FROM users WHERE created_at BETWEEN ? AND ?")
            .preparedStatementSetter(ps -> {
                ps.setString(1, startDate);
                ps.setString(2, endDate);
            })
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

8. 재시작 및 재처리 전략

Skip/Retry 정책

@Bean
public Step robustStep(JobRepository jobRepository,
                        PlatformTransactionManager txManager) {
    return new StepBuilder("robustStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .faultTolerant()
            // Skip 설정: 최대 10건 건너뛰기 허용
            .skipLimit(10)
            .skip(ValidationException.class)
            .skip(DataIntegrityViolationException.class)
            // Retry 설정: 최대 3회 재시도
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .retry(DeadlockLoserDataAccessException.class)
            // Skip 불가 예외
            .noSkip(FatalBatchException.class)
            .build();
}

SkipListener 구현

@Component
public class UserSkipListener implements SkipListener<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserSkipListener.class);

    @Override
    public void onSkipInRead(Throwable t) {
        log.error("읽기 단계에서 건너뜀: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(User user, Throwable t) {
        log.warn("처리 건너뜀 - 사용자 ID: {}, 오류: {}", user.getId(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(UserDto dto, Throwable t) {
        log.error("쓰기 건너뜀 - 사용자 ID: {}, 오류: {}", dto.getId(), t.getMessage());
    }
}

Job 재시작 제어

@Bean
public Job restartableJob(JobRepository jobRepository, Step step1) {
    return new JobBuilder("restartableJob", jobRepository)
            .start(step1)
            .preventRestart()  // 실패 후 재시작 방지 (기본값: 허용)
            .build();
}

// 재시작 시 같은 JobParameters 사용 → 이전 실패 지점부터 재시작
// JobLauncher에서 동일 파라미터로 실행하면 자동으로 재시작

9. 스케줄러 연동

@Scheduled + JobLauncher

@Configuration
@EnableScheduling
public class BatchSchedulerConfig {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Scheduled(cron = "0 0 2 * * *")  // 매일 새벽 2시
    public void runDailyBatch() {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("date", LocalDate.now().toString())
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            System.out.println("배치 실행 상태: " + execution.getStatus());
        } catch (Exception e) {
            System.err.println("배치 실행 실패: " + e.getMessage());
        }
    }
}

Quartz Scheduler 연동

@Configuration
public class QuartzBatchConfig {

    @Bean
    public JobDetail batchJobDetail() {
        return JobBuilder.newJob(BatchQuartzJob.class)
                .withIdentity("batchJob")
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger batchJobTrigger() {
        return TriggerBuilder.newTrigger()
                .forJob(batchJobDetail())
                .withIdentity("batchTrigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
                .build();
    }
}

@Component
public class BatchQuartzJob implements org.quartz.Job {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(userMigrationJob, params);
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }
}

REST API로 Job 수동 실행

@RestController
@RequestMapping("/api/batch")
public class BatchController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @PostMapping("/run")
    public ResponseEntity<String> runJob(
            @RequestParam String startDate,
            @RequestParam String endDate) {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("startDate", startDate)
                    .addString("endDate", endDate)
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            return ResponseEntity.ok("Job 실행 ID: " + execution.getId()
                + ", 상태: " + execution.getStatus());
        } catch (Exception e) {
            return ResponseEntity.internalServerError()
                .body("Job 실행 실패: " + e.getMessage());
        }
    }

    @GetMapping("/status/{executionId}")
    public ResponseEntity<String> getJobStatus(@PathVariable Long executionId) {
        // JobExplorer로 실행 이력 조회
        return ResponseEntity.ok("구현 필요");
    }
}

10. 테스트

@SpringBatchTest 설정

@SpringBatchTest
@SpringBootTest(classes = {UserMigrationJobConfig.class, TestBatchConfig.class})
@ActiveProfiles("test")
class UserMigrationJobTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @BeforeEach
    void clearMetadata() {
        jobRepositoryTestUtils.removeJobExecutions();
    }

    @Test
    void testCompleteJob() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(
            new JobParametersBuilder()
                .addString("date", "2026-03-17")
                .toJobParameters()
        );

        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
    }

    @Test
    void testStep() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchStep("migrationStep");

        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        assertThat(stepExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(stepExecution.getWriteCount()).isGreaterThan(0);
    }
}

StepScopeTestExecutionListener 활용

@RunWith(SpringRunner.class)
@SpringBootTest
@TestExecutionListeners({
    DependencyInjectionTestExecutionListener.class,
    StepScopeTestExecutionListener.class
})
class ItemReaderTest {

    @Autowired
    private JdbcCursorItemReader<User> userReader;

    public StepExecution getStepExecution() {
        StepExecution execution = MetaDataInstanceFactory.createStepExecution();
        execution.getExecutionContext().putString("inputFile", "classpath:test-users.csv");
        return execution;
    }

    @Test
    void testReader() throws Exception {
        List<User> users = new ArrayList<>();
        User user;
        while ((user = userReader.read()) != null) {
            users.add(user);
        }
        assertThat(users).isNotEmpty();
    }
}

통합 테스트 예시

@SpringBatchTest
@SpringBootTest
@Testcontainers
class BatchIntegrationTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");

    @DynamicPropertySource
    static void setProps(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", postgres::getJdbcUrl);
        registry.add("spring.datasource.username", postgres::getUsername);
        registry.add("spring.datasource.password", postgres::getPassword);
    }

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private UserRepository userRepository;

    @Test
    void testFullMigrationPipeline() throws Exception {
        // Given: 테스트 데이터 삽입
        insertTestUsers(100);

        // When: 배치 실행
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then: 결과 검증
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(userRepository.countByMigratedTrue()).isEqualTo(100);
    }
}

11. 모니터링

Spring Batch Actuator 엔드포인트

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,batch
  endpoint:
    batch:
      enabled: true

Micrometer + Prometheus 메트릭

@Configuration
public class BatchMetricsConfig {

    @Bean
    public BatchMetrics batchMetrics(MeterRegistry meterRegistry,
                                      JobRepository jobRepository) {
        return new BatchMetrics(meterRegistry);
    }
}
# Grafana 대시보드용 주요 메트릭
# spring.batch.job.* - Job 실행 시간, 성공/실패 수
# spring.batch.step.* - Step별 읽기/처리/쓰기 건수

퀴즈: Spring Batch 지식 점검

Q1. Spring Batch의 Chunk 처리 모델에서 ItemProcessor가 null을 반환하면 어떻게 됩니까?

정답: 해당 아이템이 건너뛰어져(skip) ItemWriter로 전달되지 않습니다.

설명: Chunk 처리에서 ItemProcessor가 특정 아이템을 처리할 때 null을 반환하면, Spring Batch는 해당 아이템을 자동으로 필터링하여 Writer로 넘기지 않습니다. 이는 조건부 필터링을 구현하는 가장 깔끔한 방법으로, SkipLimit 카운트에도 포함되지 않습니다. 단, 예외를 발생시켜 skip 처리하는 것과는 다르게 처리됩니다.

Q2. JdbcCursorItemReader와 JdbcPagingItemReader의 주요 차이점은 무엇이며, 각각 어떤 상황에 적합한가요?

정답: JdbcCursorItemReader는 DB 커서를 통해 연속 스트리밍 방식으로 읽고, JdbcPagingItemReader는 LIMIT/OFFSET 쿼리로 페이지 단위로 읽습니다.

설명: JdbcCursorItemReader는 단일 연결에서 커서를 유지하며 데이터를 스트리밍으로 읽어 메모리 효율이 높지만, 단일 스레드에서만 안전합니다(Multi-threaded Step에 부적합). JdbcPagingItemReader는 페이지마다 새 쿼리를 실행하여 연결 풀과 잘 어울리고, 멀티스레드 환경에서도 안전합니다. 대용량 단일 처리에는 Cursor, 병렬 처리나 재시작이 중요한 경우에는 Paging이 권장됩니다.

Q3. Spring Batch에서 Job을 재실행(Restart)할 때 이전 실패 지점부터 이어서 처리하려면 어떻게 해야 하나요?

정답: 동일한 JobParameters로 Job을 다시 실행하면 Spring Batch가 자동으로 마지막 실패한 Step부터 재시작합니다.

설명: Spring Batch는 JobRepository에 실행 이력을 저장합니다. 동일한 JobParameters로 Job을 재실행하면, BATCH_JOB_EXECUTION 테이블에서 마지막으로 실패한 JobExecution을 찾아 해당 Step부터 처리를 재개합니다. 단, preventRestart()를 호출하면 재시작이 방지됩니다. 또한 Chunk 처리에서는 이미 성공적으로 커밋된 Chunk는 다시 처리하지 않고, 실패한 Chunk부터 재시작합니다.

Q4. Partitioning Step을 사용하는 이유와 gridSize 파라미터의 의미를 설명하세요.

정답: 대용량 데이터를 여러 파티션으로 나누어 병렬로 처리하기 위해 사용하며, gridSize는 생성할 파티션(병렬 실행 단위)의 수를 지정합니다.

설명: Partitioning Step은 대용량 데이터셋을 논리적으로 여러 청크로 분할하고 각 청크를 별도 스레드나 프로세스에서 병렬로 처리합니다. gridSize는 파티션 수를 결정하며, 일반적으로 사용 가능한 CPU 코어 수나 DB 연결 풀 크기에 맞게 설정합니다. Partitioner 인터페이스를 구현하여 ID 범위, 날짜 범위, 파일 목록 등 다양한 기준으로 파티션을 정의할 수 있습니다.

Q5. application.yml에서 spring.batch.job.enabled: false로 설정하는 이유는 무엇인가요?

정답: 애플리케이션 시작 시 등록된 모든 Job이 자동으로 실행되는 것을 방지하기 위해서입니다.

설명: Spring Batch는 기본적으로 애플리케이션 컨텍스트가 로드될 때 등록된 모든 Job을 자동으로 실행합니다. 웹 애플리케이션에 배치 처리를 포함시키거나, API 요청이나 스케줄러를 통해 명시적으로 Job을 실행하고 싶을 때 이 설정을 false로 지정합니다. 개발 환경에서도 서버를 시작할 때마다 배치가 실행되는 것을 방지할 수 있습니다. Job을 실행하려면 JobLauncher를 통해 명시적으로 실행해야 합니다.