Apache Flink + RocksDB 튜닝으로 광고 Frequency Capping 실시간 집계를 일주일까지 확장하기

2 hours ago 2

안녕하세요, 토스 Data Service Platform Team 이승민, 최원용입니다.

저희 팀에서는 광고 노출 횟수의 슬라이딩 집계를 제공하고 있습니다. 짧은 구간(1분~1시간)은 Flink로, 장기 구간은 Airflow 배치로 운영하는 구조였는데요. 이 글은 장기 구간까지 Flink로 확장하면서 겪은 과정을 기록한 것입니다. 사용자가 광고를 얼마나 봤는지 1분부터 7일 단위까지 실시간으로 집계하고, 서빙 시점에 단일 조회로 제공하는 시스템을 만든 이야기예요.

집계가 부정확하면 광고주 예산이 낭비되거나 노출 기회가 사라집니다. 집계 오차가 곧 비즈니스 오차인 셈이죠. 집계는 특성에 따라 세 개의 Flink 앱(Minutes/Hours/Days)으로 분리했어요. 이후 각 앱의 운영 지표로 병목을 하나씩 확인하고, 대응 과정에서 마주친 이슈들도 함께 풀어나갔습니다. 이 글은 그 여정을 순서대로 따라갑니다.

글은 두 갈래로 읽으실 수 있어요. 광고 시스템 운영자나 도메인이 궁금하신 분이라면 'Frequency Capping'과 '기존 시스템과 개선의 동기'에서 배경을, '결과' 섹션에서 성과를 확인하시면 전체 그림이 잡힙니다. Flink와 RocksDB를 실무로 다루는 엔지니어라면 '아키텍처 결정'부터 'Flink Changelog'까지 순서대로 읽으시길 권해요.

*본문의 코드와 설정은 Apache Flink 1.20.1, flink-connector-kafka 3.3.0 기준입니다.

1. Frequency Capping

광고주가 ‘하루 3회까지만 노출'이라고 설정했는데, 실제로는 7회 노출됐다면 어떨까요?. 예산이 낭비됩니다. 반대로 2회만 노출됐는데 한도에 도달한 것으로 판단되면, 남은 1회의 노출 기회가 사라지죠.

Frequency Capping은 사용자별 광고 노출 횟수를 세고 제어하는 메커니즘입니다. 이 집계가 부정확하면 광고주 예산과 노출 기회 모두에 직접적인 손실이 발생해요. 1분 전 노출도, 7일 전 노출도 빠짐없이 정확해야 합니다.

집계 구간도 단일하지 않습니다. 광고 상품마다 노출 제어 단위가 달라요. 배너 광고는 ‘오늘 하루 기준 3회 이내’, 브랜딩 캠페인은 ‘지난 7일 기준 1회’처럼 구간이 다릅니다. 1분~7일 구간 전체를 하나의 서빙 API로 제공해야 합니다. 이를 위해 기존에는 어떤 구조로 운영했고, 어떤 한계가 있었는지 살펴볼게요.

2. 기존 시스템과 개선의 동기

처음부터 실시간 집계를 쓴 건 아니에요. 검증된 배치 구조가 먼저였습니다. 기존 시스템은 세 계층으로 구성된 Airflow DAG였습니다.

서빙 시점에 API는 이 세 계층을 순서대로 조회해 합산합니다. 예를 들어 16:30에 "최근 7일" 집계를 요청하면 Head(당일) + Head(전일) + Mid(D-2~D-7) + Tail(경계 보정) 순으로 Redis를 최대 4회 조회해 응답을 만들어요. Mid와 Tail 관련 Airflow DAG는 하루에 총 75회 실행됐습니다.

배치 구조는 시간 단위로 집계를 절삭하다 보니, 이벤트 단위의 정밀한 슬라이딩 집계를 제공하기 어려웠어요. 이벤트별로 만료를 추적하는 방식이 필요했습니다.

물론 이 구조에도 분명한 강점이 있어요. 30일 구간의 슬라이딩 집계나 일 단위 고정 집계는 여전히 배치가 적합해서 지금도 병행 운영 중입니다. Kafka 리텐션을 초과하는 구간은 장애 시 이벤트를 다시 읽어 재처리할 수 없고, 고정 구간 집계는 실시간 처리의 이점이 크지 않기 때문이죠.

다만 두 가지 한계가 있었습니다.

짧은 구간의 슬라이딩 집계는 이미 단일 Flink 앱으로 제공하고 있었어요. 신규 구간 요청과 함께 운영 경험이 충분히 쌓이면서, 이 두 한계를 해결하기 위해 장기간 집계까지 Flink로 확장하기로 결정했습니다. 다음은 이 결정에서 출발한 설계예요.


3. 아키텍처 결정

설계 목표는 명확했어요. 1분~7일 슬라이딩 구간을 단일 Redis 조회로 제공하면서, 재처리에도 정확한 집계를 보장하는 것. 이를 위해 State를 집계값의 단일 진실 공급원(SSOT)으로 두고, 장애나 재시작 시에도 State에서 Redis를 재구성할 수 있도록 원칙을 세웠습니다.

가장 먼저 결정한 건 앱 분리였어요. 기존에는 단일 Flink 앱에서 짧은 구간을 처리하고 있었는데, 장기 구간까지 하나의 앱에 몰아넣는 대신 3개 앱(minutes/hours/days)으로 나누기로 했습니다. 구간마다 병목 패턴이 완전히 달랐기 때문이에요.

minutes 앱은 1분~30분의 짧은 윈도우를 다루다 보니 이벤트당 만료 처리가 가장 빈번합니다. Write Buffer Manager 압박이 곧장 Write Stall로 이어지는 Write 집약적 구조라, RocksDB Write 경로 튜닝이 핵심이에요.

hours 앱은 TTL이 최대 12시간까지 길어져 State에 누적되는 광고 ID 수가 많아요. Filter Block Cache Miss로 인한 CPU 포화가 1차 병목이었고, 집계 결과를 Redis에 쓰는 과정의 O(N) 스캔(N = 윈도우 내 광고 ID 수)이 CPU 경쟁을 키웁니다. Filter Block 튜닝과 managed memory 증설이 중심이죠.

days 앱은 State 규모 자체가 차원이 다릅니다. 7일 집계 윈도우 하나의 SST 파일들 크기(live-sst-files-size)만으로 68GB, Savepoint는 220~230GB에 달해요. 이 규모에서는 Checkpoint I/O 자체가 파이프라인 병목이 됩니다. 이를 해결한 Changelog 설계는 'Flink Changelog' 섹션에서 다뤄요.

운영 중인 Grafana 지표에서도 세 윈도우의 규모 차이가 드러납니다.

세 앱은 같은 코드베이스를 공유하지만, RocksDB 설정은 완전히 달라야 했어요. 단일 앱으로 묶으면 한쪽의 최적화가 다른 쪽에 부작용을 낳거든요. 이 규모와 병목 패턴에서는 분리가 자연스러운 결론이었습니다. 코드베이스는 공유하니 배포와 알림 구성은 공통으로 관리할 수 있었고, 설정 분기 부담보다 병목을 독립적으로 풀 수 있다는 이점이 더 컸어요.

전체 데이터 흐름은 다음과 같습니다.

이 구조에서 가장 먼저 해결해야 할 문제는 초기 적재였습니다. 7일치 과거 데이터를 정확하게 채우면서 실시간 서빙으로 전환하는 과정을 다음 섹션에서 다룰게요.

4. 초기 적재와 전환 정합성

기술적으로 가장 어려운 문제는 실시간 전환 이후의 집계가 아니었어요. 전환 순간을 정확하게 통과하는 것이었습니다.

7일치 과거 데이터를 초기화하는 백필(Backfill)은 카운트 증가만 수행하고 만료 타이머를 등록하지 않아요. 새 이벤트를 처리하는 캐치업(Catch-up)은 과거 시점부터 이벤트를 재처리해 집계 State와 만료 타이머를 함께 재구축해요. 두 역할을 하나의 앱에 두면, 과거 데이터를 처리하는 도중에도 만료 타이머가 즉시 발화해 아직 다 쌓이지 않은 집계에서 감소가 먼저 일어나 값이 틀려집니다. 그래서 단일 파이프라인으로 합칠 수 없었어요.. 이 제약에서 출발해 두 파이프라인을 분리하는 2단계 구조를 설계했고, State를 집계값의 SSOT로 두는 원칙을 정했습니다. 두 파이프라인이 만나는 경계에서는 Redis 쓰기 조건, withIdleness, timerState TTL 세 가지가 맞물려야 정합성이 유지돼요.

4.1 단일 파이프라인이 안 되는 이유

두 단계의 로직이 근본적으로 다르기 때문입니다. 과거 데이터를 누적하는 백필 단계에서는 카운트를 올리는 일만 해요. 반면 실시간으로 새 이벤트를 처리하는 캐치업 단계에서는 새 이벤트에 +1을 하면서, 동시에 슬라이딩 윈도우를 벗어난 과거 이벤트에는 -1을 해야 합니다.

두 단계를 하나의 파이프라인에 합치면 문제가 생겨요. 과거 데이터를 처리하는 도중에도 만료 타이머가 발화합니다. 아직 다 쌓이지 않은 집계에서 감소가 먼저 일어나 값이 틀어지게 되죠.

Flink에는 스트리밍 잡을 유한 데이터셋에 대해 배치처럼 실행하는 batchMode가 있어요. 과거 데이터를 배치로 처리해 State를 재구축할 수 있어서 이 방식도 검토했습니다. 하지만 Batch Job이 FINISHED되면 State도 함께 폐기돼요. 재구축된 State를 Redis에 초기값으로 써줄 경로가 없었습니다. batchMode는 이 방식으로는 쓰지 않기로 했어요.

Spark로 과거 집계를 별도 처리하는 방법도 설계 단계에서 검토했습니다. 다른 시스템이 적재한 Hive 테이블을 Spark가 읽어 State를 구성하는 방식이에요. 하지만 정합성이 핵심인 이 앱에 Hive와 Spark 두 시스템이 추가되는 건 부담이 컸습니다. 적재 시점의 차이, 스키마 불일치, 장애 시 어느 시스템의 데이터를 기준으로 복구할지 — 시스템이 늘수록 State SSOT(Single Source of Truth) 보장이 복잡해지거든요. Kafka와 Flink 단일 시스템 안에서 해결하는 것을 원칙으로 삼았습니다. 결국 두 단계를 별도 파이프라인으로 분리하는 게 이 상황에서 가장 확실한 선택이었어요.

4.2 백필 → 캐치업 2단계 구조

days 앱은 두 개의 파이프라인으로 구성돼요.

백필 파이프라인은 세 개의 Consumer Group에서 각각 1일/3일/7일치 이벤트를 읽어 윈도우별 State를 초기화합니다. 카운트를 올리는 일만 수행하고, 완료 시점에 Redis에 1회 동기화한 뒤 FINISHED됩니다.

캐치업 파이프라인은 State를 처음부터 다시 구축해요. 백필의 State를 Savepoint로 살려 이어갈 수는 없습니다. 백필은 카운트만 올렸을 뿐 만료 타이머를 등록하지 않았기 때문이에요. 타이머가 없으면 만료 감소(-1)가 영원히 발화하지 않거든요. 캐치업은 같은 이벤트를 다시 읽으면서 카운트와 만료 타이머를 함께 등록해 완전한 State를 구축합니다. 과거 이벤트를 모두 소비한 시점(SCAN_END)부터 Redis에 반영하며, 이때부터 슬라이딩 윈도우 타이머와 감소 로직이 정상 동작해요.

캐치업 파이프라인이 State를 다시 구축하면서 처리 lag가 줄어듭니다. 7일치 lookback이 확보되면 7일 집계가 활성화되고, 3일이 되면 3일치, 1일이 되면 1일치 집계가 순차적으로 켜져요. 각 윈도우가 충분한 lookback 데이터를 확보한 시점부터 신뢰할 수 있는 값을 제공합니다.

Consumer Group은 백필 파이프라인이 별도로 사용해요. 같은 group을 쓰면 백필 이후 Committed Offset이 앞서 나가서 캐치업 구간이 통째로 누락됩니다.

이 설계에서 State는 아키텍처 결정 섹션에서 정한 SSOT 원칙의 구현이에요. State가 항상 정확한 집계값을 보유하고, Redis는 State의 projection에 불과합니다. State만 올바르게 관리하면 Redis는 언제든 정확한 값으로 갱신돼요. 장애나 재시작 시에도 State에서 Redis를 재구성할 수 있다는 점 — 이것이 운영 신뢰성의 근간입니다.

4.3 전환 정합성 설계

백필과 캐치업의 경계를 정확하게 통과하려면 세 가지 설계가 맞물려야 했어요.

Redis 쓰기 조건: eventTime이 백필 종료 시점 이후

Flink의 watermark는 "여기까지의 이벤트는 모두 도착했다"는 시간 기준선이에요. 여러 파티션 중 가장 느린 watermark가 전체 watermark가 됩니다. 백필 파이프라인이 End Offset까지 읽으면 flink-connector-kafka가 MAX_WATERMARK를 emit하는데, 이게 백필 종료 신호예요.

다만 쓰기 조건을 watermark 기준으로 두면 구조적 문제가 생깁니다. Idle하거나 느린 파티션 하나가 전체 watermark 진행을 잡아 쓰기를 영구 차단하거든요. 각 이벤트의 eventTime을 기준으로 독립 평가하면, 느린 파티션이 다른 파티션의 쓰기를 막지 않아이 문제를 해소할 수 있어요.

withIdleness 설정: 60초

withIdleness는 파티션에 이벤트가 없을 때 watermark가 멈추지 않도록 허용하는 설정이에요. Idle로 감지된 파티션은 watermark 진행에서 제외됩니다.

문제는 Bounded Source가 모든 데이터를 읽고 MAX_WATERMARK를 emit하기 직전, 특정 파티션이 Idle로 감지되면 StatusWatermarkValve가 그 파티션의 MAX_WATERMARK를 downstream으로 전달하지 않는다는 점이에요. 타임아웃이 1초처럼 너무 짧으면 종료 직전에 Idle로 잘못 감지돼 MAX_WATERMARK가 누락됩니다. 60초는 Bounded Source가 FINISHED로 전환되기까지의 시간보다 충분히 길어, 현실적으로 Idle 오감지가 먼저 일어날 수 없어요.

timerState TTL: 슬라이딩 윈도우 만료보다 충분히 길게

timerState는 슬라이딩 윈도우 만료 시 카운트를 감소시키는 타이머 정보를 보관해요. 타이머가 발화하면 이 State를 읽어 어떤 adId를 얼마나 줄일지 결정합니다.

TTL이 짧으면 처리 지연(재시작·장애 복구 등) 시 문제가 생겨요. 타이머는 발화하지만 timerState는 이미 만료된 상태라, onTimer()에서 timerState.get()이 null을 반환하고 감소가 스킵됩니다. 카운트가 줄지 않아 집계가 실제보다 높게 유지되죠. TTL을 슬라이딩 윈도우 만료보다 길게 두면, 지연 상황에서도 타이머 발화 시점에 State가 살아있도록 보장할 수 있어요. 만료 시점은 finally 블록에서 수동으로 정리합니다.


5. RocksDB와 Flink 런타임 튜닝

전환 정합성을 확보하고 세 앱이 실시간 서빙을 시작한 뒤, 운영 지표에서 병목이 하나씩 드러났어요. 같은 코드베이스를 공유하더라도 RocksDB 설정은 완전히 달라야 했습니다. 각 앱의 병목 원인이 달랐기 때문이에요.

이 섹션에서는 minutes의 Write Stall, hours의 Filter Block Cache Miss, days의 레벨 최적화를 시간 구간이 짧은 앱부터 순서대로 다루고, 이후 세 앱에 공통으로 적용한 설정을 소개합니다.

1차 병목

Write Buffer Manager → Write Stall

CPU 포화 → Filter Block Miss

대규모 State I/O (TM당 10GB 규모)

State 규모

~ 1.5 GB

~11.8 GB

~ 110 GB

핵심 튜닝

managed 1200 MB / WBR=0.5 / thread.num=3

managed 3 GB / partitioned-index-filters / 레벨 최적화

managed 12 GB / 레벨 최적화 / 불필요 영역 제거

minutes: Write Stall

처음 마주한 문제는 Write Stall이었어요. RocksDB가 쓰기를 자체적으로 멈추는 현상입니다. 원인은 Write Buffer Manager(WBM) 압박이었어요. 관련 개념부터 짚어볼게요.

RocksDB는 쓰기 데이터를 먼저 메모리의 MemTable에 쌓고, 일정 크기를 넘으면 디스크의 SST(Sorted String Table) 파일로 내려보냅니다. 이 SST 파일들은 L0부터 L6까지 레벨 구조로 관리돼요. Flink는 RocksDB State를 Column Family(CF) 단위로 관리하는데, CF는 같은 RocksDB 인스턴스 안의 독립된 key space로, Flink의 각 State(MapState, ValueState 등)가 별도 CF에 매핑됩니다.

WBM은 여러 CF가 공유하는 메모리 예산을 관리해요. Flink의 managed 설정이 TM(TaskManager)이 RocksDB에 할당하는 총 메모리 예산이고, WBR(Write Buffer Ratio)은 그중 Write Buffer에 쓰는 비율입니다.

WBM max가 CF당 write_buffer_size(기본 64MB)보다 작으면 강제 Flush가 발생합니다. Flush가 쌓이면 L0 파일이 누적되고, L0 파일이 많아지면 RocksDB가 쓰기 속도를 제한(Slowdown)하거나 완전히 중단(Stop)하는 Write Stall이 일어나요. Write Stall이 처리를 지연시키면 그 사이에 또 Flush가 쌓이고요. 자기강화 사이클이죠.

초기 설정에서 CF당 WBM budget은 16MB였어요. write_buffer_size(64MB)보다 훨씬 작아서 강제 Flush가 반복됐습니다.

해결책은 WBM 절대량을 늘리는 것이었어요. managed 500MB → 1200MB, WBR 0.25 → 0.5로 단계적으로 조정했습니다. CF당 WBM budget이 16MB → 75MB로 늘어나면서 write_buffer_size를 초과했고, Write Stall도 사라졌어요. 덕분에 백그라운드 Compaction이 정상화되고 L0 파일이 줄어 read 경로의 탐색 범위가 감소했습니다. Cache Hit Rate도 62~64%에서 99~100%로 개선됐고요.

writebuffer.count=3(기본값 2)과 writebuffer.number-to-merge=2(기본값 1)도 함께 조정했어요. Write Buffer 수를 3개로 늘리면 1개가 Flush되는 동안 나머지 2개가 쓰기를 이어받아 Stall을 완충합니다. number-to-merge=2는 Flush 전 2개의 MemTable을 병합해 L0 파일 누적 속도를 늦춰요. WBM 절대량 증설이 근본 해결이고, 이 두 설정은 그 과정에서 Stall 구간을 줄이는 보조 역할입니다.

write_buffer_size 자체를 올리는 방법도 고려했어요. 하지만 per-CF 크기를 올려도 WBM 압박이 남아 근본 해결이 되지 않습니다. managed/WBR 증설이 WBM 절대량을 늘리는 유일한 방법이에요.

hours: Filter Block Cache Miss

hours 앱의 문제는 진단이 더 어려웠습니다. CPU가 포화됐고, 그 사이 Compaction이 밀렸어요. Flink의 async-profiler로 1,766 samples를 수집해 CPU 프로파일을 분석했습니다.

async-profiler CPU flame graph 1,766 samples hours TaskManager all (1,766 samples, 100%) └─ Flink Task Thread (1,748 samples, 98.9%) ├─ processElement (1,135 samples) └─ ParameterizedProcessor.updateState registerTimer └─ MapState.contains RocksDB.get (1,108) └─ FullFilterKeyMayMatch ReadFilterBlock (1,106) ├─ ReadBlockContents (960) ◀◀ Filter Block 디스크 Read └─ pread syscall (146) ├─ onTimer (605 samples) └─ ParameterizedProcessor.onTimer MapState.get └─ RocksDB.get (597) └─ FullFilterKeyMayMatch ReadFilterBlock (593) ├─ ReadBlockContents (522) ◀◀ Filter Block 디스크 Read └─ pread syscall (71) └─ 기타 (8 samples, 생략) Filter Block Cache Miss 합계: 1,699 / 1,766 = 96.2% processElement 경로: 1,106 samples (960 + 146) onTimer 경로: 593 samples (522 + 71)

processElement와 onTimer 양쪽 경로 모두에서 ReadBlockContents()가 CPU를 지배하고 있었어요. RocksDB가 point lookup을 할 때마다 Filter Block이 Block Cache에 없어, 약 15MB 규모의 FilterBlock을 매번 디스크에서 읽는 구조였습니다.

Filter Block은 특정 key가 SST 파일에 존재하는지를 디스크 read 없이 판단하는 Bloom Filter 구조입니다. 읽기 요청이 들어오면 SST 파일을 열기 전에 Filter Block을 먼저 조회해 "이 파일엔 없음"을 빠르게 걸러내요. Filter Block이 Block Cache에 없으면 이걸 먼저 디스크에서 읽어야 하는데, 이게 바로 Cache Miss입니다.

Filter Block Miss가 이처럼 치명적이었던 건 Direct I/O 환경 때문이었어요.

K8s에서 RocksDB를 운영할 때는 OS Page Cache와 Block Cache의 이중 버퍼링을 막기 위해 Direct I/O를 활성화합니다. Direct I/O는 OS Page Cache를 완전히 우회하기 때문에, Cache Miss 시 Page Cache 도움 없이 반드시 디스크에서 읽어야 합니다. target-file-size-base=256MB, bits-per-key=14 환경에서 SST 파일당 약 800만 개의 key가 들어가고, key당 14bits의 Bloom Filter가 생성되니 monolithic Filter Block이 약 15MB에 달해요. Cache Miss 한 번에 15MB read가 발생하는 셈이고, 이게 ReadBlockContents() CPU 오버헤드의 직접 원인이었습니다.

해결책은 partitioned-index-filters=true 적용이었어요. 이 설정을 켜면 Flink가 bits-per-key를 10으로 강제 덮어써서 Filter Block은 약 11MB가 되고, 이게 ~4KB partition으로 분할됩니다. miss당 read가 약 2,750배 줄어들어요.

한 가지 예상치 못한 함정이 있었어요. bloom-filter.bits-per-key=14를 함께 설정했지만, Flink가 내부 메서드 overwriteFilterIfExist()(RocksDBResourceContainer.java:287-298)로 bits-per-key를 10으로 강제 덮어씁니다. partitioned-index-filters=true 환경에서의 Flink 1.20 제약이에요. 우회 방법이 없었습니다. Filter Block 분할 효과는 얻었지만 bits-per-key 튜닝 효과는 없었어요. 설정 파일에 값이 있다고 반드시 적용되는 게 아니라는 걸 소스 코드에서 확인했습니다.

레벨 최적화도 함께 진행했어요.

target-file-size-base는 64MB → 256MB로 올렸습니다. SST 파일 수를 줄이기 위해서예요. 기본값을 유지하면 ~11.8GB state에서 약 180개 SST 파일이 생기고, Compaction 1회당 처리 부하가 선형으로 증가합니다.

max-size-level-base는 256MB → 1GB로 올리고, use-dynamic-size=true를 함께 설정했어요. 비활성 임계값이 25.6MB → 100MB로 상승하면서 hours 4개 윈도우 모두 L5 이하 목표값이 임계값 미만이 되어 L5가 비활성화됩니다. 활성 레벨이 3→2단계로 줄어드는 거죠. Leveled Compaction의 구조적 Write Amplification은 multiplier(10) × (활성 레벨 수 - 1)로 근사할 수 있는데, L5 비활성화로 WA가 20→10(50%↓)으로 줄었습니다.

Grafana RocksDB 메트릭에서 Block Cache Hit Rate가 여전히 낮은 것을 확인하며 managed를 1GB → 2GB → 3GB로 3단계 증설했어요. 각 단계마다 hit rate 개선 추이를 보고 다음 증설 여부를 판단했습니다.

Redis 쓰기의 O(N) CPU 병목은 남아 있어요. 현재 STRING Key 구조(userId × timeWindow)는 State→Redis 1:1 매핑을 보장해 장애 시 State 기반 복구가 가능한 구조입니다. 이 SSOT 원칙을 유지하기 위해 O(N) 쓰기 비용을 감수하고 있고, 코드 레벨 최적화는 중기 과제로 남겨뒀어요.

days: 레벨 최적화

days 앱의 과제는 규모였어요. 7d 집계만으로도 SST 파일 크기가 68GB이고, TM당 ~10GB에 달합니다. 규모가 클수록 Compaction 비용도 같이 커지기 때문에, use-dynamic-sizetarget-file-size-base 튜닝은 hours 앱과 동일하게 가져갔어요.

target-file-size-base를 256MB로 설정해 SST 파일 수를 160→40개로 줄였습니다(75%↓). 파일이 많을수록 Compaction 1회 처리 부하가 선형으로 증가하기 때문이에요.

max-size-level-base=1GBuse-dynamic-size=true를 함께 적용했어요. L4 비활성 임계값이 25.6MB → 100MB로 올라가면서, 7d/3d 윈도우 state가 임계값을 초과하던 L4가 비활성화됩니다. 활성 레벨 4→3단계, WA(Write Amplification) 30→20(33%↓) 효과를 얻었어요. use-dynamic-size=true는 최하층(L6)의 실제 데이터 크기를 기준으로 상위 레벨의 목표 크기를 역산하는데, 작은 데이터가 불필요하게 모든 레벨을 활성화하는 낭비를 제거해줍니다.

managed memory는 12GB로 설정했어요. LRU(Least Recently Used) Cache 약 9.5GB / WBM 약 2GB 구성으로, TM당 SST 파일 크기 ~10GB의 대부분을 Block Cache에 수용할 수 있는 규모입니다.

공통 설정

Direct I/O를 선택한 이유부터 짚어볼게요.

K8s에서 여러 pod가 노드를 공유하면 OS Page Cache는 전체 노드 메모리를 공유 자원으로 사용해요. Flink가 제어할 수 없는 영역입니다. Page Cache가 예상치 못하게 커지면 K8s가 container memory limit 초과로 판단해 OOM(Out Of Memory) kill할 수 있어요.

Direct I/O(DirectIoRocksDBOptionsFactory, useOsPageCache=false)를 켜면 OS Page Cache를 완전히 우회하고, RocksDB의 자체 Block Cache(LRU Cache)만 사용합니다. K8s limit 내에서 메모리 사용이 예측 가능해지고, Flink가 직접 제어할 수 있어요.

RocksDB에서 Direct I/O는 두 옵션으로 활성화합니다. setUseDirectReads는 read path, setUseDirectIoForFlushAndCompaction은 flush와 compaction write path에 Direct I/O를 켭니다. 두 옵션을 함께 설정해야 모든 경로에서 OS Page Cache를 우회합니다. 저희는 내부적으로 아래처럼 래핑해 사용하고 있어요.

rocksDBBackend.rocksDBOptions = CustomRocksDBOptionsFactory(useOsPageCache = false) if (!useOsPageCache) { dbOptions.setUseDirectReads(true) dbOptions.setUseDirectIoForFlushAndCompaction(true) }

단점도 있습니다. Cache Miss 시 OS Page Cache 도움 없이 반드시 디스크에서 읽어야 해요. 앞서 hours 섹션에서 다룬 Filter Block Cache Miss가 바로 이 문제입니다. Direct I/O 환경에서 대형 SST 파일을 사용한다면 partitioned-index-filters=true는 선택이 아닌 필수예요.

RocksDB 튜닝과 함께 Flink TM 메모리 구조도 정밀하게 조정했어요. 각 영역의 증설·감소는 Grafana의 Flink 메모리 지표(JVM heap 사용률, managed memory 점유율, network buffer 실사용량 등)를 직접 모니터링한 결과를 기반으로 했습니다. 실사용량이 설정값에 비해 여유가 있으면 줄이고, 병목이 확인되면 늘리는 방식으로 반복 조정했어요.

Bloom Filter는 Point Lookup 성능을 개선하는 유효한 기법이에요. 다만 이 앱의 주요 접근 패턴은 entries() 기반 Prefix Scan입니다. RocksDB 지표 기준으로 rocksdb.iter.bytes.read(Range Scan)가 전체 byte read의 약 87%를 차지하고, rocksdb.number.db.seek/next 경로가 지배적이에요. Bloom Filter는 나머지 Point Lookup(~13%)에만 효과가 있어 실효 커버리지가 제한적입니다. Grafana에서도 두 지표의 Y축 단위 자체가 달라요. Iterator Bytes Read Rate는 MB/s 단위인 반면, Point Lookup Bytes Read Rate는 KB/s 단위입니다.

Kryo→POJO 전환은 예상 밖의 성과였어요. 처음에는 직렬화 방식을 별도로 고민하지 않고 구현했는데, 운영 중 CPU 프로파일링(async-profiler itimer)을 해보니 Kryo 관련 연산이 전체 CPU의 약 20%를 차지하고 있었습니다.

State 값 객체와 직렬화 전체를 POJO로 전환하자 약 12%로 줄었어요. 나머지는 집계 결과를 Redis에 쓰기 위해 Map<String, Any>로 변환하는 구간입니다. 여기에 Block Cache 수용량 2.2배 향상, Changelog I/O 60%↓, Checkpoint 크기 3%↓(실측, Snappy 압축 수렴)의 효과도 함께 얻었습니다.

전환 전 — Kryo Matched 20.12%
전환 후 — Kryo Matched 11.56%

State 키 구조 단순화도 초기 대비 달라진 부분이에요. 초기 구현에서 State를 담는 객체인 adFrequencyStatsMapState<String, Map<String, Map<String, Any>>>였습니다. 외부 키가 사용자 ID, 내부가 광고 ID별 통계를 담는 이중 중첩 구조였어요.

keyBy(사용자 ID)로 파티셔닝하면 Flink가 RocksDB state key prefix에 사용자 ID를 자동으로 포함하기 때문에, MapState의 외부 키로 별도 저장할 필요가 없어요. 현재 구현은 MapState<String, AdsFrequencyStats>로 단순화되어 광고 ID를 키로 바로 Point Lookup합니다. 중첩 구조 제거로 직렬화 부하도 함께 줄었습니다.

Redis에는 STRING Key(사용자 ID × timeWindow) 기준으로 State와 1:1 매핑해 동기화해요. 장애나 재시작 시에도 State에서 Redis를 재구성할 수 있습니다.

여기까지 각 앱의 읽기·쓰기 병목을 해소했어요. 남은 과제는 days의 Checkpoint I/O였습니다. State를 원격 스토리지에 업로드하는 이 구간은 코드나 RocksDB 설정만으로는 줄일 수 없는 영역이에요.


6. Flink Changelog

Days 앱의 State 규모부터 짚어볼게요. live-sst-files-size 기준으로 1일 집계는 9GB, 3일은 32GB, 7일은 68GB이며 TM당 10GB 수준입니다. Savepoint는 220~230GB에 달해요. Incremental이 아닌 전체 SST 파일을 포함하고, RocksDB SST에 적용된 블록 압축이 Flink canonical format에는 그대로 유지되지 않기 때문에 원본 대비 약 2~3배 커집니다.

State 크기 측정에 live-sst-files-size를 쓰는 이유는 Incremental Checkpoint의 업로드 대상이 SST 파일이기 때문이에요. live-data-size 지표는 최하위 레벨의 유효 데이터만 측정해 중간 레벨의 중복분을 포함하지 않아서, 실제 Checkpoint 부하를 과소평가합니다. 이 규모에서 매 Checkpoint마다 전체 State를 Snapshot하면 Checkpoint 자체가 병목이 됩니다.

Incremental Checkpoint를 써도 한계는 남아요. RocksDB는 주기적으로 Compaction을 수행하는데, 이때 기존 SST 파일이 병합되어 새 SST 파일이 생성됩니다. 이전 Checkpoint에서 참조되지 않은 신규 파일로 간주되어 전부 업로드되죠. 데이터 변화가 없어도 Compaction이 겹치는 Checkpoint에서 업로드량이 급증하는 Long-tail Latency 문제가 생깁니다.

Flink Changelog(DSTL, Distributed State Transaction Log)는 이 문제를 근본적으로 해결해요. 데이터베이스의 WAL(Write-Ahead Log)과 같은 개념입니다. State 변경마다 변경분을 즉시 로깅해두면, Checkpoint 시점에는 마지막 Checkpoint 이후의 변경분만 업로드하면 돼요. Compaction이 발생해도 업로드량에 영향이 없습니다. SST 파일 단위가 아닌 키 단위 증분이기 때문이에요.

Changelog를 활성화하면 Checkpoint 수행은 두 단계로 나뉘어요. DSTL upload는 동기로, 마지막 Checkpoint 이후 축적된 변경 로그를 업로드합니다. 변경분만 대상이라 밀리초~초 단위로 완료되고, 이 시간만 Checkpoint duration에 산입돼요. Materialization은 비동기로, 별도 주기(minutes=5분 / hours·days=10분)마다 전체 State 스냅샷을 HDFS에 저장하고 이전 Changelog를 truncate합니다. Checkpoint duration과 무관하므로 수 GB State도 Checkpoint를 블로킹하지 않아요.

Materialization 주기가 길수록 다음 DSTL upload 대상(change range)이 커지기 때문에, State 규모에 맞게 조정해야 합니다. 짧으면 DSTL 분량이 작아 Checkpoint가 빠른 대신 Materialization I/O가 빈번해지고, 길면 그 반대예요. 일 단위 구간 집계 Flink 앱은 Checkpoint 주기 2분, min-pause 30초, Materialization 주기 10분으로 설정했습니다. DSTL 업로드 임계값(preemptive-persist-threshold 5MB, batch-persist-size 10MB)은 운영 지표상 기본값으로 충분한 성능을 보여 별도 튜닝 없이 유지하고 있어요. 이 구성으로 Incremental Checkpoint 크기 기준 약 70MB 수준을 유지합니다.

Native Savepoint 교착

초기에 NATIVE Savepoint를 선택했어요. RocksDB SST 파일을 직접 참조해 복구 속도가 빠른 방식이었습니다.

그런데 Changelog와 NATIVE Savepoint를 함께 사용하면 Materialization이 영구적으로 중단되는 교착이 발생합니다. 원인은 내부 ID 불일치입니다. Flink 소스 코드를 추적해 확인했어요. NATIVE Savepoint가 트리거되면 ChangelogKeyedStateBackend의 nativeSavepoint() 메서드가 호출됩니다.

이 시점에 materializedId가 N에서 N+1로 증가해요. 이 ID는 ‘다음 Materialization이 확인해야 할 번호’입니다.

문제는 이 ID를 되돌릴 경로가 없다는 것이에요. CheckpointCoordinator는 비동기 Savepoint에 대해 완료 통지를 보내지 않습니다.

if (!props.isSavepoint() || props.isSynchronous()) { sendAcknowledgeMessages(...); }

이 설계는 의도된 것입니다. Savepoint 완료 시 Kafka Offset Commit 같은 외부 Side Effect가 발생하는 것을 방지하기 위해서죠.

notifyCheckpointComplete가 호출되지 않으면 lastConfirmedMaterializationId가 N-1에 영구 고착돼요. 이후 Materialization 시도마다 아래 조건을 충족해서 skip됩니다.

if (lastConfirmedMaterializationId < materializedId - 1 && lastFailedMaterializationId < materializedId - 1) { return Optional.empty(); }

이전 Materialization이 아직 확인되지 않았다고 판단하기 때문이에요. Materialization이 중단되면 DSTL이 무한 누적됩니다.

일반 Checkpoint가 이 교착을 해소할 수 있을 것 같지만 그렇지 않아요. 일반 Checkpoint의 snapshot() 호출 시 기록되는 materializationId는 여전히 N-1입니다. notifyCheckpointComplete를 수신해도 lastConfirmedMaterializationId의 갱신 조건(> lastConfirmed)을 충족하지 못해요.

유일한 해소 경로는 Job을 재시작하는 것입니다. 재시작 시 completeRestore()가 두 ID를 동기화해요.

lastConfirmedMaterializationId = materializationId; materializedId = materializationId + 1;

CANONICAL Savepoint는 이 문제가 발생하지 않아요. CANONICAL 분기는 savepoint() 메서드를 호출하는데, 이 메서드는 하위 RocksDB Backend에 단순 위임해요.

public RunnableFuture<SnapshotResult<KeyedStateHandle>> savepoint() { return keyedStateBackend.savepoint(); }

materializedId를 건드리지 않으므로 ID 불일치 자체가 발생하지 않습니다. CANONICAL로 전환하여 해소했어요.

이 교착은 어느 쪽의 버그도 아닙니다. nativeSavepoint()가 materializedId를 증가시키는 것은 Savepoint를 Materialization 기준점으로 삼기 위한 설계이고, CheckpointCoordinator가 비동기 Savepoint에서 notifyCheckpointComplete를 호출하지 않는 것도 외부 Side Effect를 방지하기 위한 설계예요. 각각은 올바르지만, 두 설계가 결합되면 ID 갱신 경로가 구조적으로 차단됩니다. CANONICAL Savepoint로 전환해 이 문제를 해소했어요. Changelog 적용과 Savepoint 교착 해소를 마지막으로, 세 앱의 튜닝 여정이 마무리됐습니다.


7. 마무리하며

Head·Mid·Tail 세 계층을 합산하던 서빙 구조가 단일 Redis 조회로 단순해졌어요. Airflow DAG 실행 횟수는 75회/일에서 25회/일로, 서빙 시점의 Redis 조회는 최대 4회에서 1회로 줄었습니다. 복잡도를 줄이는 것이 원래 목표였고, 수치로도 확인됐습니다.

세 앱의 병목은 서로 달랐고, 분리했기 때문에 각각 독립적으로 해결할 수 있었어요. 재처리 시 집계가 틀리던 문제도 Event Time 전환으로 구조적으로 제거했습니다.

이 과정에서 설정의 실제 동작을 소스 코드 수준에서 확인하는 일이 반복됐어요. bloom-filter.bits-per-key 덮어쓰기, withIdleness와 Bounded Source의 상호작용, Native Savepoint와 Changelog의 교착 — 공식 문서만으로는 알 수 없었던 동작들이었습니다.

처음부터 이 모든 걸 알고 시작한 건 아니에요. 지표에서 이상을 확인하고, 소스 코드를 추적하고, 설정을 조정하는 과정을 반복하면서 Flink와 RocksDB의 동작을 깊이 이해하게 됐습니다. 긴 글 읽어주셔서 감사하고, 이 여정을 함께 고민하고 검증해준 팀원들에게도 감사를 전합니다.

참고

*이 글에 사용된 이미지는 생성형 AI를 통해 제작되었습니다.

Read Entire Article