Skip to main content

실시간 로그적재 배치 개발

· 8 min read
Eundo Park
Maintainer

최근 프로젝트에서 1분에 한 번씩 실행되는 배치 작업을 개발하여, 실시간으로 쌓이는 대용량 로그 파일을 읽어와 데이터베이스에 적재하는 기능을 구현했습니다. 이 배치는 대용량 데이터를 빠르게 처리해야 했기 때문에 멀티쓰레드 병렬처리를 적용하여 성능을 최적화했습니다.

개발 주요작업

  • Spring Batch를 이용하여 배치 작업을 구성하고, 파티셔닝과 TaskExecutor를 활용하여 멀티쓰레드 환경에서 로그 파일을 병렬로 처리했습니다. 이를 통해 대량의 데이터를 효율적으로 분산 처리하여, 성능을 최적화했습니다.
@Bean("logProcessingJob")
public Job logProcessingJob(JobRepository jobRepository, @Qualifier("partitionedLogProcessingStep") Step partitionedLogProcessingStep, @Qualifier("moveFilesStep") Step moveFilesStep) {
return new JobBuilder(BatchJobNames.LOG_PROCESSING_BATCH.name(), jobRepository)
.listener(jobExecutionListener())
.start(partitionedLogProcessingStep)
.next(moveFilesStep)
.build();
}
  • JSON 형태로 저장된 로그 데이터를 파싱하고, 이를 데이터베이스에 적재하기 위해 컨버팅 작업을 수행했습니다. 이를 통해 실시간으로 수집된 로그 데이터를 효율적으로 처리하고 저장할 수 있었습니다.
@Override
public LogEntry process(JsonNode jsonNode) {
LogEntry logEntry = new LogEntry();
logEntry.setTimestamp(jsonNode.get("timestamp").asText());
logEntry.setLevel(jsonNode.get("level").asText());
logEntry.setMessage(jsonNode.get("message").asText());
// 기타 필요한 필드 세팅
return logEntry;
}
  • 배치 작업 중 마지막으로 읽은 파일과 위치를 기억하는 로직을 구현하여, 배치가 재실행될 때 이전에 처리한 위치부터 이어서 작업할 수 있도록 했습니다. 이를 통해 데이터의 중복 처리를 방지하고, 배치 작업의 안정성을 높였습니다.
@PostConstruct
public void initialize() throws IOException {
log.debug("Initializing reader for file: {}, isLastFile: {}", filePath, isLastFile);
if (filePath != null) {
File file = new File(filePath);
randomAccessFile = new RandomAccessFile(file, "r");
long startPosition = batchState.getFilePosition(filePath);
randomAccessFile.seek(startPosition);
log.debug("File opened: {}, starting from position: {}", filePath, startPosition);
}
}

@Override
public JsonNode read() throws Exception {
if (randomAccessFile == null || fileFullyRead) {
return null;
}

JsonNode jsonNode = readNextJsonNode();
if (jsonNode != null) {
long currentPosition = randomAccessFile.getFilePointer();
batchState.updateFilePosition(filePath, currentPosition);
log.debug("Read JSON node from file: {}, new position: {}", filePath, currentPosition);
return jsonNode;
} else {
fileFullyRead = true;
batchState.addProcessedFile(filePath);
log.debug("Reached end of file: {}", filePath);
return null;
}
}
  • 배치 작업이 종료된 후 로그 파일을 이동시키는 후처리 작업을 구현하여, 중복 처리를 방지하고 로그 관리의 일관성을 유지했습니다.
@Bean
public Step moveFilesStep() {
return stepBuilderFactory.get("moveFilesStep")
.tasklet(logFileMover)
.build();
}

문제 해결

  • 파티셔닝과 TaskExecutor 사용 중 발생한 문제: 처음에는 파티셔닝을 사용해 로그 파일을 Chunk 단위로 병렬 처리하는 과정에서, Reader가 파일에서 한 건만 읽고 나머지 데이터는 읽지 못하는 문제가 발생했습니다. 이 문제는 Reader가 제대로 설정되지 않아 발생한 것이었고, Reader의 설정을 수정하여 문제를 해결했습니다.
@Override
public JsonNode read() throws Exception {
if (randomAccessFile == null || fileFullyRead) {
return null;
}
JsonNode jsonNode = readNextJsonNode();
if (jsonNode != null) {
long currentPosition = randomAccessFile.getFilePointer();
batchState.updateFilePosition(filePath, currentPosition);
log.debug("Read JSON node from file: {}, new position: {}", filePath, currentPosition);
return jsonNode;
} else {
fileFullyRead = true;
batchState.addProcessedFile(filePath);
log.debug("Reached end of file: {}", filePath);
return null;
}
}
  • 트랜잭션 문제: 배치를 Quartz Scheduler와 연동하여 실행하는 과정에서, PlatformTransactionManager를 사용했을 때 트랜잭션이 제대로 동작하지 않아 커밋이 되지 않는 문제가 있었습니다. 이를 해결하기 위해 JPATransactionManager로 변경했으며, 이후 트랜잭션이 정상적으로 작동하여 문제를 해결할 수 있었습니다.
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory); // JPATransactionManager로 변경하여 문제 해결
}

파티셔닝과 병렬처리

  • 데이터 일관성: 파티셔닝과 병렬처리를 적용하면서 가장 주의해야 했던 부분은 데이터의 일관성 유지였습니다. 여러 스레드가 동시에 데이터를 처리하기 때문에, 각 스레드가 중복된 데이터를 처리하거나 동일한 리소스에 동시 접근하여 충돌이 발생하지 않도록 신경을 써야 했습니다.
  • Reader와 Writer의 동기화: 병렬 처리를 적용할 때 Reader가 모든 데이터를 정상적으로 읽고, Writer가 중복 없이 데이터를 처리할 수 있도록 Reader와 Writer 간의 동기화 문제를 해결해야 했습니다. 이 과정에서 Reader의 상태를 정확히 관리하여 파일의 모든 데이터를 빠짐없이 처리할 수 있도록 했습니다.
  • Chunk 크기 조정: 병렬 처리의 성능을 최적화하기 위해 Chunk의 크기를 적절히 조정하는 것도 중요했습니다. 너무 큰 Chunk는 병렬 처리의 이점을 살리지 못하게 하고, 너무 작은 Chunk는 오버헤드를 증가시킬 수 있기 때문에, 적절한 크기를 설정하여 성능을 극대화했습니다.

주요 성과

  • 성능 최적화: 파티셔닝과 TaskExecutor를 활용한 멀티쓰레드 병렬 처리로 대용량 로그 데이터를 신속하게 처리하고, 데이터베이스 적재 속도를 극대화했습니다.
  • 데이터 일관성 유지: 복잡한 JSON 로그 데이터를 정확하게 파싱하여, 데이터 무결성을 유지하며 안정적으로 데이터베이스에 저장했습니다.
  • 안정성 강화: 오류 발생 시에도 시스템이 안정적으로 동작하도록 예외 처리 로직을 강화하여, 데이터 손실 및 중단을 방지했습니다.
  • 연속성 보장: 마지막으로 읽은 파일과 위치를 기억하는 로직을 통해 배치 작업의 연속성을 보장하고, 재실행 시 중복 처리를 방지했습니다.
  • 트랜잭션 문제 해결: JPATransactionManager를 도입해 Quartz Scheduler와의 연동 시 트랜잭션 처리의 안정성을 확보했습니다.

이번 프로젝트를 통해 실시간 데이터 처리의 중요성을 깊이 이해하게 되었으며, 대용량 데이터를 다루는 시스템에서의 성능 최적화와 안정성 확보에 대한 경험을 쌓을 수 있었습니다.

API 비동기호출 데이터처리 개발

· 5 min read
Eundo Park
Maintainer

이번 프로젝트에서는 사용자 로그인 시 대용량 데이터를 비동기로 호출하여 특정 화면에 표시하는 시스템을 구현했습니다. 데이터가 없으면 해당 내용을 알리는 메시지를 표시하는 기능이 필요했습니다.

비동기 호출 프로세스

4개의 API를 통해 데이터를 비동기로 호출했습니다:

  • 데이터 A: 첫 번째 API 호출
  • 데이터 B: 두 번째 API 호출
  • 데이터 C: 세 번째 API 호출
  • 데이터 D: 네 번째 API 호출

데이터 A, B, C의 비동기 호출이 모두 완료되면 이를 합쳐 하나의 총괄 데이터를 생성하고, 이를 세션 객체에 저장하여 애플리케이션 전반에서 사용했습니다.

문제 상황 및 해결 방안

데이터 부재 문제

비동기 호출이 완료되기 전에 화면에 접근할 경우 데이터가 없다는 메시지가 표시되었습니다. 이후 다시 접속하면 데이터가 정상적으로 표시되는 상황이 발생했습니다.

빈 데이터 처리 문제

비동기 호출 중 일부가 실패하면 빈 데이터로 처리되어, 최종 결과에 영향을 미쳤습니다.

해결 방안

모든 비동기 호출이 완료된 후 데이터를 종합하여 한 번에 화면에 표시하는 방식을 채택했습니다. 또한, 데이터 상태를 확인할 수 있는 API를 추가하여 호출 상태를 프론트엔드에서 확인할 수 있도록 했습니다. 오류 발생 시 최대 3회까지 재시도하는 로직도 구현했습니다.

비동기 처리 중 객체 공유 문제

비동기 호출 중 여러 스레드가 동일한 객체에 접근할 때 발생할 수 있는 Race ConditionData Corruption 문제를 방지하기 위해, 실시간 업데이트보다는 안정적인 데이터를 제공하는 방식을 선택했습니다.

추가 고려 사항

프로젝트 진행 중 비동기 호출의 이점을 더욱 극대화하기 위해 다음 사항들을 고려했습니다:

  • 백그라운드 데이터 업데이트: 데이터를 비동기로 가져온 후, 백그라운드에서 정기적으로 업데이트하는 작업 추가.
  • 병렬 처리 최적화: 비동기 호출 간의 의존성이 없는 경우 병렬 처리 최적화.
  • Fallback 전략: 비동기 호출 실패 시 기본값이나 대체 데이터를 제공하는 전략.
  • 모니터링 및 로깅: 비동기 호출의 성능과 안정성을 높이기 위해 모니터링 및 로깅 강화.

결론

이번 프로젝트에서는 안정성과 일관성을 우선시하여 모든 비동기 호출이 완료된 후 데이터를 종합하여 제공하는 방식을 채택했습니다. 추가적인 최적화 방안을 고려하면 비동기 호출의 이점을 더욱 효과적으로 활용할 수 있을 것입니다. 이 회고를 통해 유사한 상황에서 최적의 솔루션을 설계하는 데 중요한 교훈을 얻었습니다.