Skip to content

Split View: Spring Boot Batch 완전 가이드: Job, Step, Chunk 처리와 실전 패턴

|

Spring Boot Batch 완전 가이드: Job, Step, Chunk 처리와 실전 패턴

1. Spring Batch 아키텍처

Spring Batch는 대용량 데이터를 처리하기 위한 경량 배치 프레임워크입니다. ETL(Extract, Transform, Load), 데이터 마이그레이션, 리포트 생성, 정산 처리 등 다양한 배치 작업에 활용됩니다.

핵심 아키텍처 구성요소

Job
└── Step 1
│   └── Chunk (ChunkSize: 100)
│       ├── ItemReader   (읽기)
│       ├── ItemProcessor (변환/필터)
│       └── ItemWriter   (쓰기)
└── Step 2
    └── Tasklet (단일 작업)

주요 컴포넌트:

  • Job: 배치 작업의 최상위 단위. 하나 이상의 Step으로 구성
  • Step: Job의 실행 단위. Chunk 기반 또는 Tasklet 기반
  • Chunk: 지정된 크기의 데이터를 읽고, 처리하고, 쓰는 단위
  • ItemReader: 데이터 소스에서 아이템을 하나씩 읽는 인터페이스
  • ItemProcessor: 읽은 아이템을 변환하거나 필터링
  • ItemWriter: 처리된 아이템을 저장소에 기록

JobRepository, JobLauncher, JobExplorer

JobLauncher ──→ Job (실행 요청)
JobRepository (실행 이력 저장/조회)
JobExplorer (읽기 전용 이력 조회)

Spring Batch 메타 테이블

-- 주요 메타데이터 테이블
BATCH_JOB_INSTANCE    -- Job 인스턴스 정보
BATCH_JOB_EXECUTION   -- Job 실행 정보 (상태, 시작/종료 시간)
BATCH_JOB_PARAMS      -- Job 파라미터
BATCH_STEP_EXECUTION  -- Step 실행 정보
BATCH_STEP_EXECUTION_CONTEXT -- Step 컨텍스트 데이터
BATCH_JOB_EXECUTION_CONTEXT  -- Job 컨텍스트 데이터

2. 의존성 설정

Maven 의존성

<!-- Spring Boot Batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- 배치 메타 테이블용 DB (예: PostgreSQL) -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
</dependency>

<!-- 테스트 -->
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
</dependency>

application.yml 설정

spring:
  batch:
    job:
      enabled: false # 애플리케이션 시작 시 자동 실행 방지
    jdbc:
      initialize-schema: always # 메타 테이블 자동 생성
  datasource:
    url: jdbc:postgresql://localhost:5432/batchdb
    username: batchuser
    password: batchpass

logging:
  level:
    org.springframework.batch: DEBUG

3. 기본 Job 구성

UserMigrationJobConfig - 전체 구성 예시

@Configuration
@EnableBatchProcessing
public class UserMigrationJobConfig {

    // Job 정의
    @Bean
    public Job userMigrationJob(JobRepository jobRepository,
                                 Step migrationStep) {
        return new JobBuilder("userMigrationJob", jobRepository)
                .start(migrationStep)
                .listener(jobExecutionListener())
                .build();
    }

    // Step 정의 (Chunk 기반)
    @Bean
    public Step migrationStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               ItemReader<User> userItemReader,
                               ItemProcessor<User, UserDto> userItemProcessor,
                               ItemWriter<UserDto> userItemWriter) {
        return new StepBuilder("migrationStep", jobRepository)
                .<User, UserDto>chunk(100, txManager)
                .reader(userItemReader)
                .processor(userItemProcessor)
                .writer(userItemWriter)
                .faultTolerant()
                .skipLimit(10)
                .skip(DataIntegrityViolationException.class)
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .listener(stepExecutionListener())
                .build();
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("Job 시작: " + jobExecution.getJobInstance().getJobName());
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.printf("Job 완료: %s, 상태: %s%n",
                    jobExecution.getJobInstance().getJobName(),
                    jobExecution.getStatus());
            }
        };
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StepExecutionListener() {
            @Override
            public void beforeStep(StepExecution stepExecution) {
                System.out.println("Step 시작: " + stepExecution.getStepName());
            }

            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                System.out.printf("Step 완료: 읽기=%d, 처리=%d, 쓰기=%d%n",
                    stepExecution.getReadCount(),
                    stepExecution.getProcessSkipCount(),
                    stepExecution.getWriteCount());
                return stepExecution.getExitStatus();
            }
        };
    }
}

Tasklet 기반 Step

@Bean
public Step cleanupStep(JobRepository jobRepository,
                         PlatformTransactionManager txManager) {
    return new StepBuilder("cleanupStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // 단순한 일회성 작업에 적합
                System.out.println("임시 파일 정리 중...");
                // 파일 삭제 로직
                return RepeatStatus.FINISHED;
            }, txManager)
            .build();
}

4. ItemReader 구현

JdbcCursorItemReader - 대용량 DB 읽기

@Bean
@StepScope
public JdbcCursorItemReader<User> userCursorReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<User>()
            .name("userCursorReader")
            .dataSource(dataSource)
            .sql("SELECT id, username, email, status FROM users WHERE status = 'ACTIVE' ORDER BY id")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .fetchSize(1000)  // 한 번에 가져올 레코드 수
            .build();
}

JdbcPagingItemReader - 페이지 단위 읽기

@Bean
@StepScope
public JdbcPagingItemReader<User> userPagingReader(DataSource dataSource) {
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("SELECT id, username, email, status");
    queryProvider.setFromClause("FROM users");
    queryProvider.setWhereClause("WHERE status = 'ACTIVE'");
    queryProvider.setSortKeys(sortKeys);

    return new JdbcPagingItemReaderBuilder<User>()
            .name("userPagingReader")
            .dataSource(dataSource)
            .queryProvider(queryProvider)
            .pageSize(100)
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

FlatFileItemReader - CSV 파일 읽기

@Bean
@StepScope
public FlatFileItemReader<UserCsvDto> csvUserReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {

    return new FlatFileItemReaderBuilder<UserCsvDto>()
            .name("csvUserReader")
            .resource(new FileSystemResource(inputFile))
            .linesToSkip(1)  // 헤더 행 건너뜀
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "createdAt")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(UserCsvDto.class);
            }})
            .build();
}

커스텀 ItemReader 구현

@Component
@StepScope
public class ApiCallItemReader implements ItemReader<UserData> {

    private final UserApiClient apiClient;
    private int page = 0;
    private List<UserData> currentPageData = new ArrayList<>();
    private int currentIndex = 0;
    private boolean exhausted = false;

    public ApiCallItemReader(UserApiClient apiClient) {
        this.apiClient = apiClient;
    }

    @Override
    public UserData read() throws Exception {
        if (exhausted) return null;

        if (currentIndex >= currentPageData.size()) {
            // 다음 페이지 로드
            currentPageData = apiClient.fetchUsers(page++, 100);
            currentIndex = 0;

            if (currentPageData.isEmpty()) {
                exhausted = true;
                return null;
            }
        }

        return currentPageData.get(currentIndex++);
    }
}

5. ItemProcessor 구현

데이터 변환 및 필터링

@Component
@StepScope
public class UserMigrationProcessor implements ItemProcessor<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserMigrationProcessor.class);

    @Override
    public UserDto process(User user) throws Exception {
        // null 반환 = 해당 아이템 건너뜀 (skip)
        if (!isEligibleForMigration(user)) {
            log.debug("Skipping user: {}", user.getUsername());
            return null;
        }

        UserDto dto = new UserDto();
        dto.setId(user.getId());
        dto.setUsername(user.getUsername().toLowerCase().trim());
        dto.setEmail(user.getEmail().toLowerCase());
        dto.setDisplayName(formatDisplayName(user.getFirstName(), user.getLastName()));
        dto.setMigratedAt(LocalDateTime.now());

        return dto;
    }

    private boolean isEligibleForMigration(User user) {
        return user.getStatus() != null
            && "ACTIVE".equals(user.getStatus())
            && user.getEmail() != null
            && user.getEmail().contains("@");
    }

    private String formatDisplayName(String firstName, String lastName) {
        return Stream.of(firstName, lastName)
                .filter(s -> s != null && !s.isBlank())
                .collect(Collectors.joining(" "));
    }
}

CompositeItemProcessor - 여러 Processor 체인

@Bean
public CompositeItemProcessor<User, UserDto> compositeProcessor() {
    List<ItemProcessor<?, ?>> processors = new ArrayList<>();
    processors.add(new ValidationProcessor());
    processors.add(new EnrichmentProcessor(externalService));
    processors.add(new TransformationProcessor());

    CompositeItemProcessor<User, UserDto> composite = new CompositeItemProcessor<>();
    composite.setDelegates(processors);
    return composite;
}

6. ItemWriter 구현

JdbcBatchItemWriter - bulk INSERT/UPDATE

@Bean
public JdbcBatchItemWriter<UserDto> userDtoWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<UserDto>()
            .dataSource(dataSource)
            .sql("""
                INSERT INTO users_new (id, username, email, display_name, migrated_at)
                VALUES (:id, :username, :email, :displayName, :migratedAt)
                ON CONFLICT (id) DO UPDATE
                SET username = EXCLUDED.username,
                    email = EXCLUDED.email,
                    migrated_at = EXCLUDED.migrated_at
                """)
            .beanMapped()
            .build();
}

JpaItemWriter

@Bean
public JpaItemWriter<UserDto> jpaUserWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<UserDto> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

FlatFileItemWriter - 결과 파일 출력

@Bean
@StepScope
public FlatFileItemWriter<UserDto> csvResultWriter(
        @Value("#{jobParameters['outputFile']}") String outputFile) {

    return new FlatFileItemWriterBuilder<UserDto>()
            .name("csvResultWriter")
            .resource(new FileSystemResource(outputFile))
            .headerCallback(writer -> writer.write("id,username,email,migrated_at"))
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "migratedAt")
            .build();
}

CompositeItemWriter - 여러 대상에 동시 쓰기

@Bean
public CompositeItemWriter<UserDto> compositeWriter(
        JdbcBatchItemWriter<UserDto> dbWriter,
        FlatFileItemWriter<UserDto> fileWriter) {

    CompositeItemWriter<UserDto> writer = new CompositeItemWriter<>();
    writer.setDelegates(Arrays.asList(dbWriter, fileWriter));
    return writer;
}

7. 고급 기능

Partitioning - 대용량 데이터 병렬 처리

@Configuration
public class PartitionedJobConfig {

    @Bean
    public Step masterStep(JobRepository jobRepository,
                            Partitioner partitioner,
                            Step workerStep) {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("workerStep", partitioner)
                .step(workerStep)
                .gridSize(4)  // 파티션 수 (스레드 수)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Partitioner columnRangePartitioner(DataSource dataSource) {
        return gridSize -> {
            Map<String, ExecutionContext> result = new HashMap<>();
            int totalCount = getTotalCount(dataSource);
            int rangeSize = totalCount / gridSize;

            for (int i = 0; i < gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.putLong("minId", (long) i * rangeSize + 1);
                context.putLong("maxId", i == gridSize - 1 ? totalCount : (long)(i + 1) * rangeSize);
                result.put("partition" + i, context);
            }
            return result;
        };
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {

        // 파티션 범위에 맞는 데이터만 읽기
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("minId", minId);
        parameterValues.put("maxId", maxId);

        return new JdbcPagingItemReaderBuilder<User>()
                .name("partitionedReader")
                .dataSource(dataSource)
                // ... 쿼리 설정
                .parameterValues(parameterValues)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}

Multi-threaded Step

@Bean
public Step multiThreadedStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               ItemReader<User> reader,
                               ItemWriter<UserDto> writer) {
    return new StepBuilder("multiThreadedStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader)     // 주의: thread-safe한 Reader 필요 (SynchronizedItemStreamReader 사용)
            .writer(writer)
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .throttleLimit(4)   // 동시 실행 스레드 수
            .build();
}

// Thread-safe Reader 래핑
@Bean
public SynchronizedItemStreamReader<User> synchronizedReader(
        JdbcCursorItemReader<User> reader) {
    SynchronizedItemStreamReader<User> synchronizedReader = new SynchronizedItemStreamReader<>();
    synchronizedReader.setDelegate(reader);
    return synchronizedReader;
}

AsyncItemProcessor/AsyncItemWriter

@Bean
public AsyncItemProcessor<User, UserDto> asyncProcessor(
        UserMigrationProcessor delegateProcessor) {

    AsyncItemProcessor<User, UserDto> asyncProcessor = new AsyncItemProcessor<>();
    asyncProcessor.setDelegate(delegateProcessor);
    asyncProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
    return asyncProcessor;
}

@Bean
public AsyncItemWriter<UserDto> asyncWriter(
        JdbcBatchItemWriter<UserDto> delegateWriter) {

    AsyncItemWriter<UserDto> asyncWriter = new AsyncItemWriter<>();
    asyncWriter.setDelegate(delegateWriter);
    return asyncWriter;
}

JobParameters로 동적 설정

@Bean
@StepScope
public JdbcCursorItemReader<User> dynamicReader(
        DataSource dataSource,
        @Value("#{jobParameters['startDate']}") String startDate,
        @Value("#{jobParameters['endDate']}") String endDate) {

    return new JdbcCursorItemReaderBuilder<User>()
            .name("dynamicReader")
            .dataSource(dataSource)
            .sql("SELECT * FROM users WHERE created_at BETWEEN ? AND ?")
            .preparedStatementSetter(ps -> {
                ps.setString(1, startDate);
                ps.setString(2, endDate);
            })
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

8. 재시작 및 재처리 전략

Skip/Retry 정책

@Bean
public Step robustStep(JobRepository jobRepository,
                        PlatformTransactionManager txManager) {
    return new StepBuilder("robustStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .faultTolerant()
            // Skip 설정: 최대 10건 건너뛰기 허용
            .skipLimit(10)
            .skip(ValidationException.class)
            .skip(DataIntegrityViolationException.class)
            // Retry 설정: 최대 3회 재시도
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .retry(DeadlockLoserDataAccessException.class)
            // Skip 불가 예외
            .noSkip(FatalBatchException.class)
            .build();
}

SkipListener 구현

@Component
public class UserSkipListener implements SkipListener<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserSkipListener.class);

    @Override
    public void onSkipInRead(Throwable t) {
        log.error("읽기 단계에서 건너뜀: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(User user, Throwable t) {
        log.warn("처리 건너뜀 - 사용자 ID: {}, 오류: {}", user.getId(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(UserDto dto, Throwable t) {
        log.error("쓰기 건너뜀 - 사용자 ID: {}, 오류: {}", dto.getId(), t.getMessage());
    }
}

Job 재시작 제어

@Bean
public Job restartableJob(JobRepository jobRepository, Step step1) {
    return new JobBuilder("restartableJob", jobRepository)
            .start(step1)
            .preventRestart()  // 실패 후 재시작 방지 (기본값: 허용)
            .build();
}

// 재시작 시 같은 JobParameters 사용 → 이전 실패 지점부터 재시작
// JobLauncher에서 동일 파라미터로 실행하면 자동으로 재시작

9. 스케줄러 연동

@Scheduled + JobLauncher

@Configuration
@EnableScheduling
public class BatchSchedulerConfig {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Scheduled(cron = "0 0 2 * * *")  // 매일 새벽 2시
    public void runDailyBatch() {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("date", LocalDate.now().toString())
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            System.out.println("배치 실행 상태: " + execution.getStatus());
        } catch (Exception e) {
            System.err.println("배치 실행 실패: " + e.getMessage());
        }
    }
}

Quartz Scheduler 연동

@Configuration
public class QuartzBatchConfig {

    @Bean
    public JobDetail batchJobDetail() {
        return JobBuilder.newJob(BatchQuartzJob.class)
                .withIdentity("batchJob")
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger batchJobTrigger() {
        return TriggerBuilder.newTrigger()
                .forJob(batchJobDetail())
                .withIdentity("batchTrigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
                .build();
    }
}

@Component
public class BatchQuartzJob implements org.quartz.Job {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(userMigrationJob, params);
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }
}

REST API로 Job 수동 실행

@RestController
@RequestMapping("/api/batch")
public class BatchController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @PostMapping("/run")
    public ResponseEntity<String> runJob(
            @RequestParam String startDate,
            @RequestParam String endDate) {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("startDate", startDate)
                    .addString("endDate", endDate)
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            return ResponseEntity.ok("Job 실행 ID: " + execution.getId()
                + ", 상태: " + execution.getStatus());
        } catch (Exception e) {
            return ResponseEntity.internalServerError()
                .body("Job 실행 실패: " + e.getMessage());
        }
    }

    @GetMapping("/status/{executionId}")
    public ResponseEntity<String> getJobStatus(@PathVariable Long executionId) {
        // JobExplorer로 실행 이력 조회
        return ResponseEntity.ok("구현 필요");
    }
}

10. 테스트

@SpringBatchTest 설정

@SpringBatchTest
@SpringBootTest(classes = {UserMigrationJobConfig.class, TestBatchConfig.class})
@ActiveProfiles("test")
class UserMigrationJobTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @BeforeEach
    void clearMetadata() {
        jobRepositoryTestUtils.removeJobExecutions();
    }

    @Test
    void testCompleteJob() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(
            new JobParametersBuilder()
                .addString("date", "2026-03-17")
                .toJobParameters()
        );

        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
    }

    @Test
    void testStep() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchStep("migrationStep");

        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        assertThat(stepExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(stepExecution.getWriteCount()).isGreaterThan(0);
    }
}

StepScopeTestExecutionListener 활용

@RunWith(SpringRunner.class)
@SpringBootTest
@TestExecutionListeners({
    DependencyInjectionTestExecutionListener.class,
    StepScopeTestExecutionListener.class
})
class ItemReaderTest {

    @Autowired
    private JdbcCursorItemReader<User> userReader;

    public StepExecution getStepExecution() {
        StepExecution execution = MetaDataInstanceFactory.createStepExecution();
        execution.getExecutionContext().putString("inputFile", "classpath:test-users.csv");
        return execution;
    }

    @Test
    void testReader() throws Exception {
        List<User> users = new ArrayList<>();
        User user;
        while ((user = userReader.read()) != null) {
            users.add(user);
        }
        assertThat(users).isNotEmpty();
    }
}

통합 테스트 예시

@SpringBatchTest
@SpringBootTest
@Testcontainers
class BatchIntegrationTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");

    @DynamicPropertySource
    static void setProps(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", postgres::getJdbcUrl);
        registry.add("spring.datasource.username", postgres::getUsername);
        registry.add("spring.datasource.password", postgres::getPassword);
    }

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private UserRepository userRepository;

    @Test
    void testFullMigrationPipeline() throws Exception {
        // Given: 테스트 데이터 삽입
        insertTestUsers(100);

        // When: 배치 실행
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then: 결과 검증
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(userRepository.countByMigratedTrue()).isEqualTo(100);
    }
}

11. 모니터링

Spring Batch Actuator 엔드포인트

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,batch
  endpoint:
    batch:
      enabled: true

Micrometer + Prometheus 메트릭

@Configuration
public class BatchMetricsConfig {

    @Bean
    public BatchMetrics batchMetrics(MeterRegistry meterRegistry,
                                      JobRepository jobRepository) {
        return new BatchMetrics(meterRegistry);
    }
}
# Grafana 대시보드용 주요 메트릭
# spring.batch.job.* - Job 실행 시간, 성공/실패 수
# spring.batch.step.* - Step별 읽기/처리/쓰기 건수

퀴즈: Spring Batch 지식 점검

Q1. Spring Batch의 Chunk 처리 모델에서 ItemProcessor가 null을 반환하면 어떻게 됩니까?

정답: 해당 아이템이 건너뛰어져(skip) ItemWriter로 전달되지 않습니다.

설명: Chunk 처리에서 ItemProcessor가 특정 아이템을 처리할 때 null을 반환하면, Spring Batch는 해당 아이템을 자동으로 필터링하여 Writer로 넘기지 않습니다. 이는 조건부 필터링을 구현하는 가장 깔끔한 방법으로, SkipLimit 카운트에도 포함되지 않습니다. 단, 예외를 발생시켜 skip 처리하는 것과는 다르게 처리됩니다.

Q2. JdbcCursorItemReader와 JdbcPagingItemReader의 주요 차이점은 무엇이며, 각각 어떤 상황에 적합한가요?

정답: JdbcCursorItemReader는 DB 커서를 통해 연속 스트리밍 방식으로 읽고, JdbcPagingItemReader는 LIMIT/OFFSET 쿼리로 페이지 단위로 읽습니다.

설명: JdbcCursorItemReader는 단일 연결에서 커서를 유지하며 데이터를 스트리밍으로 읽어 메모리 효율이 높지만, 단일 스레드에서만 안전합니다(Multi-threaded Step에 부적합). JdbcPagingItemReader는 페이지마다 새 쿼리를 실행하여 연결 풀과 잘 어울리고, 멀티스레드 환경에서도 안전합니다. 대용량 단일 처리에는 Cursor, 병렬 처리나 재시작이 중요한 경우에는 Paging이 권장됩니다.

Q3. Spring Batch에서 Job을 재실행(Restart)할 때 이전 실패 지점부터 이어서 처리하려면 어떻게 해야 하나요?

정답: 동일한 JobParameters로 Job을 다시 실행하면 Spring Batch가 자동으로 마지막 실패한 Step부터 재시작합니다.

설명: Spring Batch는 JobRepository에 실행 이력을 저장합니다. 동일한 JobParameters로 Job을 재실행하면, BATCH_JOB_EXECUTION 테이블에서 마지막으로 실패한 JobExecution을 찾아 해당 Step부터 처리를 재개합니다. 단, preventRestart()를 호출하면 재시작이 방지됩니다. 또한 Chunk 처리에서는 이미 성공적으로 커밋된 Chunk는 다시 처리하지 않고, 실패한 Chunk부터 재시작합니다.

Q4. Partitioning Step을 사용하는 이유와 gridSize 파라미터의 의미를 설명하세요.

정답: 대용량 데이터를 여러 파티션으로 나누어 병렬로 처리하기 위해 사용하며, gridSize는 생성할 파티션(병렬 실행 단위)의 수를 지정합니다.

설명: Partitioning Step은 대용량 데이터셋을 논리적으로 여러 청크로 분할하고 각 청크를 별도 스레드나 프로세스에서 병렬로 처리합니다. gridSize는 파티션 수를 결정하며, 일반적으로 사용 가능한 CPU 코어 수나 DB 연결 풀 크기에 맞게 설정합니다. Partitioner 인터페이스를 구현하여 ID 범위, 날짜 범위, 파일 목록 등 다양한 기준으로 파티션을 정의할 수 있습니다.

Q5. application.yml에서 spring.batch.job.enabled: false로 설정하는 이유는 무엇인가요?

정답: 애플리케이션 시작 시 등록된 모든 Job이 자동으로 실행되는 것을 방지하기 위해서입니다.

설명: Spring Batch는 기본적으로 애플리케이션 컨텍스트가 로드될 때 등록된 모든 Job을 자동으로 실행합니다. 웹 애플리케이션에 배치 처리를 포함시키거나, API 요청이나 스케줄러를 통해 명시적으로 Job을 실행하고 싶을 때 이 설정을 false로 지정합니다. 개발 환경에서도 서버를 시작할 때마다 배치가 실행되는 것을 방지할 수 있습니다. Job을 실행하려면 JobLauncher를 통해 명시적으로 실행해야 합니다.

Spring Boot Batch Complete Guide: Job, Step, Chunk Processing and Production Patterns

1. Spring Batch Architecture

Spring Batch is a lightweight batch processing framework designed for handling large volumes of data. It is widely used for ETL (Extract, Transform, Load), data migration, report generation, settlement processing, and more.

Core Architecture Components

Job
└── Step 1
│   └── Chunk (ChunkSize: 100)
│       ├── ItemReader   (read)
│       ├── ItemProcessor (transform/filter)
│       └── ItemWriter   (write)
└── Step 2
    └── Tasklet (single operation)

Key components:

  • Job: Top-level unit of a batch process. Composed of one or more Steps
  • Step: Execution unit within a Job — either Chunk-based or Tasklet-based
  • Chunk: A fixed-size unit of read, process, and write operations
  • ItemReader: Interface that reads items one at a time from a data source
  • ItemProcessor: Transforms or filters read items
  • ItemWriter: Persists processed items to a target store

JobRepository, JobLauncher, JobExplorer

JobLauncher ──→ Job (launch request)
JobRepository (store/retrieve execution history)
JobExplorer (read-only history access)

Spring Batch Meta Tables

-- Key metadata tables
BATCH_JOB_INSTANCE    -- Job instance information
BATCH_JOB_EXECUTION   -- Job execution info (status, start/end time)
BATCH_JOB_PARAMS      -- Job parameters
BATCH_STEP_EXECUTION  -- Step execution info
BATCH_STEP_EXECUTION_CONTEXT -- Step context data
BATCH_JOB_EXECUTION_CONTEXT  -- Job context data

2. Dependencies and Configuration

Maven Dependencies

<!-- Spring Boot Batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- Database for batch meta tables (e.g., PostgreSQL) -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
</dependency>

<!-- Testing -->
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
</dependency>

application.yml Configuration

spring:
  batch:
    job:
      enabled: false # prevent auto-run on startup
    jdbc:
      initialize-schema: always # auto-create meta tables
  datasource:
    url: jdbc:postgresql://localhost:5432/batchdb
    username: batchuser
    password: batchpass

logging:
  level:
    org.springframework.batch: DEBUG

3. Basic Job Configuration

UserMigrationJobConfig - Full Configuration Example

@Configuration
@EnableBatchProcessing
public class UserMigrationJobConfig {

    // Job definition
    @Bean
    public Job userMigrationJob(JobRepository jobRepository,
                                 Step migrationStep) {
        return new JobBuilder("userMigrationJob", jobRepository)
                .start(migrationStep)
                .listener(jobExecutionListener())
                .build();
    }

    // Step definition (Chunk-based)
    @Bean
    public Step migrationStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               ItemReader<User> userItemReader,
                               ItemProcessor<User, UserDto> userItemProcessor,
                               ItemWriter<UserDto> userItemWriter) {
        return new StepBuilder("migrationStep", jobRepository)
                .<User, UserDto>chunk(100, txManager)
                .reader(userItemReader)
                .processor(userItemProcessor)
                .writer(userItemWriter)
                .faultTolerant()
                .skipLimit(10)
                .skip(DataIntegrityViolationException.class)
                .retryLimit(3)
                .retry(TransientDataAccessException.class)
                .listener(stepExecutionListener())
                .build();
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("Job started: "
                    + jobExecution.getJobInstance().getJobName());
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.printf("Job finished: %s, status: %s%n",
                    jobExecution.getJobInstance().getJobName(),
                    jobExecution.getStatus());
            }
        };
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StepExecutionListener() {
            @Override
            public void beforeStep(StepExecution stepExecution) {
                System.out.println("Step started: " + stepExecution.getStepName());
            }

            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                System.out.printf("Step finished: read=%d, skipped=%d, written=%d%n",
                    stepExecution.getReadCount(),
                    stepExecution.getProcessSkipCount(),
                    stepExecution.getWriteCount());
                return stepExecution.getExitStatus();
            }
        };
    }
}

Tasklet-Based Step

@Bean
public Step cleanupStep(JobRepository jobRepository,
                         PlatformTransactionManager txManager) {
    return new StepBuilder("cleanupStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // Suitable for simple one-off operations
                System.out.println("Cleaning up temporary files...");
                // file deletion logic here
                return RepeatStatus.FINISHED;
            }, txManager)
            .build();
}

4. ItemReader Implementations

JdbcCursorItemReader - High-Volume DB Reading

@Bean
@StepScope
public JdbcCursorItemReader<User> userCursorReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<User>()
            .name("userCursorReader")
            .dataSource(dataSource)
            .sql("SELECT id, username, email, status FROM users WHERE status = 'ACTIVE' ORDER BY id")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .fetchSize(1000)
            .build();
}

JdbcPagingItemReader - Page-Based Reading

@Bean
@StepScope
public JdbcPagingItemReader<User> userPagingReader(DataSource dataSource) {
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("SELECT id, username, email, status");
    queryProvider.setFromClause("FROM users");
    queryProvider.setWhereClause("WHERE status = 'ACTIVE'");
    queryProvider.setSortKeys(sortKeys);

    return new JdbcPagingItemReaderBuilder<User>()
            .name("userPagingReader")
            .dataSource(dataSource)
            .queryProvider(queryProvider)
            .pageSize(100)
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

FlatFileItemReader - CSV File Reading

@Bean
@StepScope
public FlatFileItemReader<UserCsvDto> csvUserReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {

    return new FlatFileItemReaderBuilder<UserCsvDto>()
            .name("csvUserReader")
            .resource(new FileSystemResource(inputFile))
            .linesToSkip(1)  // skip header row
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "createdAt")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(UserCsvDto.class);
            }})
            .build();
}

Custom ItemReader Implementation

@Component
@StepScope
public class ApiCallItemReader implements ItemReader<UserData> {

    private final UserApiClient apiClient;
    private int page = 0;
    private List<UserData> currentPageData = new ArrayList<>();
    private int currentIndex = 0;
    private boolean exhausted = false;

    public ApiCallItemReader(UserApiClient apiClient) {
        this.apiClient = apiClient;
    }

    @Override
    public UserData read() throws Exception {
        if (exhausted) return null;

        if (currentIndex >= currentPageData.size()) {
            currentPageData = apiClient.fetchUsers(page++, 100);
            currentIndex = 0;

            if (currentPageData.isEmpty()) {
                exhausted = true;
                return null;
            }
        }

        return currentPageData.get(currentIndex++);
    }
}

5. ItemProcessor Implementations

Data Transformation and Filtering

@Component
@StepScope
public class UserMigrationProcessor implements ItemProcessor<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserMigrationProcessor.class);

    @Override
    public UserDto process(User user) throws Exception {
        // Returning null skips this item — it will not be passed to the writer
        if (!isEligibleForMigration(user)) {
            log.debug("Skipping user: {}", user.getUsername());
            return null;
        }

        UserDto dto = new UserDto();
        dto.setId(user.getId());
        dto.setUsername(user.getUsername().toLowerCase().trim());
        dto.setEmail(user.getEmail().toLowerCase());
        dto.setDisplayName(formatDisplayName(user.getFirstName(), user.getLastName()));
        dto.setMigratedAt(LocalDateTime.now());

        return dto;
    }

    private boolean isEligibleForMigration(User user) {
        return user.getStatus() != null
            && "ACTIVE".equals(user.getStatus())
            && user.getEmail() != null
            && user.getEmail().contains("@");
    }

    private String formatDisplayName(String firstName, String lastName) {
        return Stream.of(firstName, lastName)
                .filter(s -> s != null && !s.isBlank())
                .collect(Collectors.joining(" "));
    }
}

CompositeItemProcessor - Chaining Multiple Processors

@Bean
public CompositeItemProcessor<User, UserDto> compositeProcessor() {
    List<ItemProcessor<?, ?>> processors = new ArrayList<>();
    processors.add(new ValidationProcessor());
    processors.add(new EnrichmentProcessor(externalService));
    processors.add(new TransformationProcessor());

    CompositeItemProcessor<User, UserDto> composite = new CompositeItemProcessor<>();
    composite.setDelegates(processors);
    return composite;
}

6. ItemWriter Implementations

JdbcBatchItemWriter - Bulk INSERT/UPDATE

@Bean
public JdbcBatchItemWriter<UserDto> userDtoWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<UserDto>()
            .dataSource(dataSource)
            .sql("""
                INSERT INTO users_new (id, username, email, display_name, migrated_at)
                VALUES (:id, :username, :email, :displayName, :migratedAt)
                ON CONFLICT (id) DO UPDATE
                SET username = EXCLUDED.username,
                    email = EXCLUDED.email,
                    migrated_at = EXCLUDED.migrated_at
                """)
            .beanMapped()
            .build();
}

JpaItemWriter

@Bean
public JpaItemWriter<UserDto> jpaUserWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<UserDto> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

FlatFileItemWriter - Result File Output

@Bean
@StepScope
public FlatFileItemWriter<UserDto> csvResultWriter(
        @Value("#{jobParameters['outputFile']}") String outputFile) {

    return new FlatFileItemWriterBuilder<UserDto>()
            .name("csvResultWriter")
            .resource(new FileSystemResource(outputFile))
            .headerCallback(writer -> writer.write("id,username,email,migrated_at"))
            .delimited()
            .delimiter(",")
            .names("id", "username", "email", "migratedAt")
            .build();
}

CompositeItemWriter - Write to Multiple Targets

@Bean
public CompositeItemWriter<UserDto> compositeWriter(
        JdbcBatchItemWriter<UserDto> dbWriter,
        FlatFileItemWriter<UserDto> fileWriter) {

    CompositeItemWriter<UserDto> writer = new CompositeItemWriter<>();
    writer.setDelegates(Arrays.asList(dbWriter, fileWriter));
    return writer;
}

7. Advanced Features

Partitioning - Parallel Processing for Large Datasets

@Configuration
public class PartitionedJobConfig {

    @Bean
    public Step masterStep(JobRepository jobRepository,
                            Partitioner partitioner,
                            Step workerStep) {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("workerStep", partitioner)
                .step(workerStep)
                .gridSize(4)  // number of partitions (threads)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Partitioner columnRangePartitioner(DataSource dataSource) {
        return gridSize -> {
            Map<String, ExecutionContext> result = new HashMap<>();
            int totalCount = getTotalCount(dataSource);
            int rangeSize = totalCount / gridSize;

            for (int i = 0; i < gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.putLong("minId", (long) i * rangeSize + 1);
                context.putLong("maxId",
                    i == gridSize - 1 ? totalCount : (long)(i + 1) * rangeSize);
                result.put("partition" + i, context);
            }
            return result;
        };
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> partitionedReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("minId", minId);
        parameterValues.put("maxId", maxId);

        return new JdbcPagingItemReaderBuilder<User>()
                .name("partitionedReader")
                .dataSource(dataSource)
                .parameterValues(parameterValues)
                // ... configure query
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("batch-partition-");
        executor.initialize();
        return executor;
    }
}

Multi-Threaded Step

@Bean
public Step multiThreadedStep(JobRepository jobRepository,
                               PlatformTransactionManager txManager,
                               SynchronizedItemStreamReader<User> reader,
                               ItemWriter<UserDto> writer) {
    return new StepBuilder("multiThreadedStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader)   // must be thread-safe
            .writer(writer)
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .throttleLimit(4)
            .build();
}

// Wrapping a reader for thread safety
@Bean
public SynchronizedItemStreamReader<User> synchronizedReader(
        JdbcCursorItemReader<User> reader) {
    SynchronizedItemStreamReader<User> synchronizedReader = new SynchronizedItemStreamReader<>();
    synchronizedReader.setDelegate(reader);
    return synchronizedReader;
}

AsyncItemProcessor / AsyncItemWriter

@Bean
public AsyncItemProcessor<User, UserDto> asyncProcessor(
        UserMigrationProcessor delegateProcessor) {

    AsyncItemProcessor<User, UserDto> asyncProcessor = new AsyncItemProcessor<>();
    asyncProcessor.setDelegate(delegateProcessor);
    asyncProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
    return asyncProcessor;
}

@Bean
public AsyncItemWriter<UserDto> asyncWriter(
        JdbcBatchItemWriter<UserDto> delegateWriter) {

    AsyncItemWriter<UserDto> asyncWriter = new AsyncItemWriter<>();
    asyncWriter.setDelegate(delegateWriter);
    return asyncWriter;
}

Dynamic Configuration with JobParameters

@Bean
@StepScope
public JdbcCursorItemReader<User> dynamicReader(
        DataSource dataSource,
        @Value("#{jobParameters['startDate']}") String startDate,
        @Value("#{jobParameters['endDate']}") String endDate) {

    return new JdbcCursorItemReaderBuilder<User>()
            .name("dynamicReader")
            .dataSource(dataSource)
            .sql("SELECT * FROM users WHERE created_at BETWEEN ? AND ?")
            .preparedStatementSetter(ps -> {
                ps.setString(1, startDate);
                ps.setString(2, endDate);
            })
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
}

8. Restart and Retry Strategies

Skip/Retry Policy

@Bean
public Step robustStep(JobRepository jobRepository,
                        PlatformTransactionManager txManager) {
    return new StepBuilder("robustStep", jobRepository)
            .<User, UserDto>chunk(100, txManager)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .faultTolerant()
            .skipLimit(10)
            .skip(ValidationException.class)
            .skip(DataIntegrityViolationException.class)
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .retry(DeadlockLoserDataAccessException.class)
            .noSkip(FatalBatchException.class)
            .build();
}

SkipListener Implementation

@Component
public class UserSkipListener implements SkipListener<User, UserDto> {

    private static final Logger log = LoggerFactory.getLogger(UserSkipListener.class);

    @Override
    public void onSkipInRead(Throwable t) {
        log.error("Skipped during read: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(User user, Throwable t) {
        log.warn("Skipped during process - userId: {}, error: {}",
            user.getId(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(UserDto dto, Throwable t) {
        log.error("Skipped during write - userId: {}, error: {}",
            dto.getId(), t.getMessage());
    }
}

Controlling Job Restartability

@Bean
public Job nonRestartableJob(JobRepository jobRepository, Step step1) {
    return new JobBuilder("nonRestartableJob", jobRepository)
            .start(step1)
            .preventRestart()  // disallow restart after failure
            .build();
}

9. Scheduler Integration

@Scheduled + JobLauncher

@Configuration
@EnableScheduling
public class BatchSchedulerConfig {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Scheduled(cron = "0 0 2 * * *")  // every day at 2:00 AM
    public void runDailyBatch() {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("date", LocalDate.now().toString())
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            System.out.println("Batch execution status: " + execution.getStatus());
        } catch (Exception e) {
            System.err.println("Batch execution failed: " + e.getMessage());
        }
    }
}

Quartz Scheduler Integration

@Configuration
public class QuartzBatchConfig {

    @Bean
    public JobDetail batchJobDetail() {
        return JobBuilder.newJob(BatchQuartzJob.class)
                .withIdentity("batchJob")
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger batchJobTrigger() {
        return TriggerBuilder.newTrigger()
                .forJob(batchJobDetail())
                .withIdentity("batchTrigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
                .build();
    }
}

@Component
public class BatchQuartzJob implements org.quartz.Job {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(userMigrationJob, params);
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }
}

REST API for Manual Job Triggering

@RestController
@RequestMapping("/api/batch")
public class BatchController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job userMigrationJob;

    @PostMapping("/run")
    public ResponseEntity<String> runJob(
            @RequestParam String startDate,
            @RequestParam String endDate) {
        try {
            JobParameters params = new JobParametersBuilder()
                    .addString("startDate", startDate)
                    .addString("endDate", endDate)
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution execution = jobLauncher.run(userMigrationJob, params);
            return ResponseEntity.ok("Job execution ID: " + execution.getId()
                + ", status: " + execution.getStatus());
        } catch (Exception e) {
            return ResponseEntity.internalServerError()
                .body("Job execution failed: " + e.getMessage());
        }
    }
}

10. Testing

@SpringBatchTest Setup

@SpringBatchTest
@SpringBootTest(classes = {UserMigrationJobConfig.class, TestBatchConfig.class})
@ActiveProfiles("test")
class UserMigrationJobTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JobRepositoryTestUtils jobRepositoryTestUtils;

    @BeforeEach
    void clearMetadata() {
        jobRepositoryTestUtils.removeJobExecutions();
    }

    @Test
    void testCompleteJob() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(
            new JobParametersBuilder()
                .addString("date", "2026-03-17")
                .toJobParameters()
        );

        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
    }

    @Test
    void testSingleStep() throws Exception {
        JobExecution jobExecution = jobLauncherTestUtils.launchStep("migrationStep");

        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        assertThat(stepExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(stepExecution.getWriteCount()).isGreaterThan(0);
    }
}

StepScopeTestExecutionListener

@RunWith(SpringRunner.class)
@SpringBootTest
@TestExecutionListeners({
    DependencyInjectionTestExecutionListener.class,
    StepScopeTestExecutionListener.class
})
class ItemReaderTest {

    @Autowired
    private JdbcCursorItemReader<User> userReader;

    public StepExecution getStepExecution() {
        StepExecution execution = MetaDataInstanceFactory.createStepExecution();
        execution.getExecutionContext().putString("inputFile", "classpath:test-users.csv");
        return execution;
    }

    @Test
    void testReader() throws Exception {
        List<User> users = new ArrayList<>();
        User user;
        while ((user = userReader.read()) != null) {
            users.add(user);
        }
        assertThat(users).isNotEmpty();
    }
}

Integration Test Example

@SpringBatchTest
@SpringBootTest
@Testcontainers
class BatchIntegrationTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");

    @DynamicPropertySource
    static void setProps(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", postgres::getJdbcUrl);
        registry.add("spring.datasource.username", postgres::getUsername);
        registry.add("spring.datasource.password", postgres::getPassword);
    }

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private UserRepository userRepository;

    @Test
    void testFullMigrationPipeline() throws Exception {
        // Given: insert test data
        insertTestUsers(100);

        // When: run the batch job
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // Then: verify results
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        assertThat(userRepository.countByMigratedTrue()).isEqualTo(100);
    }
}

11. Monitoring

Spring Batch Actuator Endpoints

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,batch
  endpoint:
    batch:
      enabled: true

Micrometer + Prometheus Metrics

@Configuration
public class BatchMetricsConfig {

    @Bean
    public BatchMetrics batchMetrics(MeterRegistry meterRegistry) {
        return new BatchMetrics(meterRegistry);
    }
}

Key metrics to monitor:

  • spring.batch.job.* — job execution time, success/failure count
  • spring.batch.step.* — read, process, and write counts per step
  • spring.batch.item.* — per-item processing latency

Quiz: Test Your Spring Batch Knowledge

Q1. What happens when an ItemProcessor returns null in a Chunk-based Step?

Answer: The item is filtered out and not passed to the ItemWriter.

Explanation: When an ItemProcessor returns null for a given item, Spring Batch automatically filters that item and does not forward it to the writer. This is the cleanest way to implement conditional filtering and does not count toward the SkipLimit counter. This behavior is distinct from throwing an exception to trigger a skip, which does affect skip counting.

Q2. What is the key difference between JdbcCursorItemReader and JdbcPagingItemReader, and when is each appropriate?

Answer: JdbcCursorItemReader streams data via a DB cursor on a single connection; JdbcPagingItemReader fetches data page by page using LIMIT/OFFSET queries.

Explanation: JdbcCursorItemReader holds a cursor open on a single connection and streams data efficiently in memory, but is not thread-safe (unsuitable for multi-threaded Steps). JdbcPagingItemReader issues a new query per page, works well with connection pools, and is safe in multi-threaded environments. Use Cursor for single-threaded high-volume processing; use Paging when parallel processing or restartability is important.

Q3. How does Spring Batch resume processing from the point of failure when a Job is restarted?

Answer: Running the Job again with the same JobParameters causes Spring Batch to automatically resume from the last failed Step.

Explanation: Spring Batch stores execution history in the JobRepository. When the same JobParameters are used to re-run a Job, it looks up the most recent failed JobExecution in the BATCH_JOB_EXECUTION table and resumes processing from that Step. Calling preventRestart() disables this behavior. In Chunk processing, chunks that were already successfully committed are not reprocessed — only the failed chunk onward is retried.

Q4. Why is Partitioning used in Spring Batch, and what does the gridSize parameter control?

Answer: Partitioning divides a large dataset into sub-partitions for parallel execution, and gridSize specifies how many partitions (concurrent execution units) to create.

Explanation: A Partitioning Step logically divides a large dataset into segments and processes each in a separate thread or process concurrently. The gridSize determines the partition count and is typically set based on available CPU cores or DB connection pool size. By implementing the Partitioner interface, you can define partitions by ID range, date range, file list, or any other criterion.

Q5. Why is spring.batch.job.enabled: false recommended in application.yml?

Answer: To prevent all registered Jobs from automatically running when the application starts up.

Explanation: By default, Spring Batch runs all registered Jobs when the application context loads. Setting this to false is essential when embedding batch processing in a web application or when you want Jobs to be triggered explicitly via a REST API or scheduler rather than at every startup. It also prevents accidental batch runs during development when restarting the server. Jobs are then launched explicitly through JobLauncher.