1. 셔플링을 주의하자
- 데이터 분산 처리시 불필요한 셔플링 발생 주의
- 클러스터 환경에서 동작하기 때문에 데이터와 연산을 적절히 분산시키기 → 파티셔닝 중요
2. 메모리 관리를 잘하자
- Disk Spill 주의
- 데이터가 너무 크면 메모리에 들어가지 못해서 Disk Spill이 발생 → OutOfMemoryError 발생
- OOM Error 예방
- 대용량 데이터셋을 조인할 때 메모리 초과 오류 발생 가능
- 방지하기 위해 데이터셋을 적절히 파티셔닝하고, 메모리 설정을 조정하여 데이터가 메모리에 잘 들어가게 함
📢 메모리 관리 잘 하기
- 데이터 파티셔닝 및 병렬 처리
- 캐싱과 체크포인트
- 스키마 정의
- 브로드캐스트 변수
- 메모리 설정 조정
- Spark 설정 파일(spark-defaults.conf) or SparkSession 설정을 통해 조정
- 주요 설정
spark = SparkSession.builder \
.appName("MemoryManagementExample") \
.config("spark.executor.memory", "4g") \ # 각 익스큐터의 메모리 크기
.config("spark.driver.memory", "2g") \ # 드라이버의 메모리 크기
.config("spark.memory.fraction", "0.8") \ # 익스큐터 메모리의 사용 비율
.config("spark.memory.storageFraction", "0.3") \ # 저장 영역에 할당된 메모리의 비율
.getOrCreate()
- 데이터 압축
- 예. parqeut 파일 형식에서 snappy 압축으로 저장
df.write.option("compression", "snappy").parquet("hdfs://path/to/output")
- 불필요한 객체 제거
unpersist()
로 캐시된 데이터 메모리를 제거
📢 메모리 관련 에러 및 성능 저하 사례
- OOM (OutOfMemoryError)
- 주로 큰 데이터 처리 및 복잡한 계산 수행시 익스큐터나 드라이버의 메모리 할당이 부족할 때 발생
- 작업이 중간에 멈추거나 실패
- 로그 메세지 :
java.lang.OutOfMemoryError
- 해결 방법
spark.executor.memory
,spark.driver,memory
설정 늘리기데이터 파티셔닝
을 통해 메모리 사용 분산시키기불필요한 캐싱
피하고 작업이 끝난 후에는unpersist()
로 메모리 해제하기
- GC (Garbage collection) Overhead
- JVM이 자주 GC를 실행하여 메모리를 해제하려고 할 때, 메모리 사용이 최적화되지 않아 불필요한 객체가 많이 생성될 때 발생
- 작업 실행 속도가 느려지며
- 로그 메세지 :
GC overhead limit exceeded
- 해결 방법
spark.executor.memory
,spark.driver,memory
설정 조정하여 적절한 크기 설정하기spark.memory.fraction
,spark.memory,storageFraction
설정 조정하기- 데이터 처리 작업을 최적화하여 불필요한 객체 생성 줄이기
- Disk Spill
- 메모리가 부족하여 중간 데이터를 디스크에 저장할 때,
spark.sql.shuffle.partitions
설정이 너무 작을 때 발생 - 작업 실행 속도가 느려지며
- 로그 메세지 :
spill
관련 - 해결 방법
spark.sql.shuffle.partitions
설정을 늘려 파티션 수 조정하기spark.memory.fraction
설정을 조정하여 메모리 사용 최적화하기
- Driver Memory Issues
- 드라이버 프로그램이 너무 많은 데이터를 로드하거나 큰 데이터를 직접 처리할 때 발생
- 드라이버가 중간에 멈추거나 실패
- 로그 메세지 :
OutOfMemoryError
- 해결 방법
spark.driver.memory
설정 늘리기- 드라이버 프로그램에서 직접 데이터를 처리하는 대신 RDD, DataFrame을 사용하여 작업 분산시키기
- Executor Lost
- 익스큐터가 메모리를 초과하여 종료될 때, 네트워크 이슈 및 디스크 I/O 병목 현상 발생할 때
- 작업이 중간에 멈추거나 실패
- 로그 메세지 :
ExecutorLostFailure
- 해결 방법
spark.executor.memory
설정 늘리기- 클러스터의 네트워크와 디스크 I/O 성능 확인하고 최적화하기
3. 캐싱과 체크포인팅을 잘 활용하자
- 자주 사용하는 데이터셋은 cache(), persist() 메서드를 통해 메모리에 캐싱하기
- 체크포인팅을 사용해서 복잡한 연산의 중간 결과 저장하기
- 반복적인 데이터 사용
- 여러번 필터링, 조인을 반복적으로 수행해야하는 데이터셋이 있다면 캐싱을 통해 성능 크게 향상 가능
- 복잡한 연산의 중간 결과 저장시 checkpoint()를 사용해 안정성 향상
4. 파티셔닝을 잘 활용하자
- 파티션 수 설정 및 조정하여 클러스터 노드 간의 작업 분배를 최적화를 통해 성능 개선하기
- 데이터 로드 성능 최적화
- 데이터셋을 읽어올 때 파티션 수를 조정하여 데이터 로드 속도 개선 가능
- repartition(), coalesce() 메서드 사용하여 파티션 수 조절
📢 파티셔닝 잘 활용하기
- 자동 파티셔닝
- Pyspark에서 DataFrame을 사용할 때 기본적으로 데이터를 자동으로 파티셔닝
- 특정 컬럼을 기준으로 파티셔닝
- 하지만 자동 파티셔닝이 아닌 특정 컬럼을 기준으로 파티셔닝 후 데이터를 더 높은 효율성으로 처리 가능
- 파티션 크기 조정
- 보통 파티션의 크기를 HDFS 블록 크기(125MB ~ 256MB)에 맞추는 것이 좋음
- S3에 저장할 때에도 125MB ~ 256MB 정도의 크기에 맞추는 것이 좋음. 너무 작은 파일이 많아도 성능에 영향 !
- 파티셔닝 후 캐싱
- 파티셔닝한 데이터를 캐시하여 반복적인 작업에서 성능 향상
- 작업 분산 확인
- explain() 메서드를 사용하며 물리적 실행계획 확인을 통해 작업 분산 확인
5. 컬럼 연산을 수행할 때는 되도록 내장 함수를 사용하여 최적화 하자
- 데이터프레임에서 컬럼 연산을 수행할 때 가능한 벡터화된 연산을 사용하여 성능 높일 수 있음
- 백터화된 연산을 통한 성능 향상
- withColumn() 메서드를 사용하여 새로운 컬럼 추가할 때, UDF 대신 내장 함수 (col, lit 등)를 사용하기
- 이를 통해 JVM 레벨에서 최적화되어 뛰어난 성능 유지 가능
6. 조인 연산을 최적화 하자
- 브로드캐스트 조인
- 작은 데이터셋을 큰 데이터셋과 조인할 때 작은 데이터셋을 브로드캐스트하여 모든 노드에 복사하면 셔플링 비용 감소 가능
- broadcast 함수를 사용하여 브로드캐스트 조인 수행 가능
broadcasted_df = broadcast(df)
- 파티셔닝 활용
- 데이터를 저장할 때 JOIN Key를 기반으로 파티셔닝 후 저장하면 동일한 키를 가진 레코드들이 같은 파티션에 배치되어 조인 성능 개선
- Skewed Data 처리
- JOIN KEY가 편향되어 불균형하게 분포되어 있는 경우, 특정 작업자 노드에 부하 집중 발생
- SALTING 기법을 사용해 데이터 분포를 균일하게 만들기
7. UDF 사용을 되도록이면 최소화 하자
- 사용자 정의 함수의 사용은 성능을 저하시킬 수 있으므로 기본 제공 함수, 벡터화된 연산 사용
- 문자열 처리 작업에서 substring(), concat() 등과 같은 내장 함수 사용
📢 UDF는 왜 성능이 안좋을까?
- 데이터 직렬화 및 역직렬화 오버헤드
- JVM에서 실행되는 Spark와 Python 간의 데이터 직렬화 및 역직렬화 오버헤드를 유발하기 때문에 UDF는 일반적으로 Pyspark의 기본 제공 함수보다 느림
- 작은 데이터셋에서는 문제가 되지 않지만, 대규모 데이터셋에서 큰 오버헤드 유발
- Spark의 최적화 기능 제한
- Pyspark의 Catalyst Optimizer는 내장 함수에 대해 다양한 최적화를 수행하지만, UDF에 대해서는 수행 불가
- 병렬 처리 제한
- Spark의 병렬 처리 능력을 충분히 활용하지 못할 수 있고, 특히 복잡한 계산을 수행하는 UDF일 경우 성능 저하
8. 로그 분석과 모니터링 없는 데이터 처리는 물 없는 수영장과 같다
- 작업 진행 상태 모니터링
- 문제 발생시 로그를 통해 원인 분석
- Spark UI를 활용해 작업 상태와 성능 확인
- 각 작업의 실행 계획과 수행 시간 확인 후 병목 지점을 찾아 최적화 진행
9. 적합한 데이터 포맷을 사용하자
- 적절한 데이터 포맷(parquet, ORC 등)을 선택하여 효율적으로 데이터를 읽고 쓰기
- 대용량 데이터를 저장할 때는 높은 압축률과 빠른 읽기 성능을 제공하는 Parquet 포맷 사용하여 처리 속도 개선
📢 적절한 데이터 포맷 선택하기
- CSV (Comma-Seperated Values)
- 장점 : 높은 호환성, 단순 구조, 높은 가독성
- 단점 : 문자열로 저장되는 데이터 타입, 낮은 압축 효율성, 메타데이터 부족
- 성능 : 느린 읽기/쓰기 속, 병렬 처리 어려움
→ 작은 규모의 데이터셋, 호환성이 중요한 경우, 사람이 직접 데이터를 확인해야 하는 경우에 적합
- Parquet
- 장점 : 컬럼 기반 저장으로 매우 뛰어난 읽기 성능, 높은 압축 효율성, 메타데이터 존재, 높은 병렬 처리 성능
- 단점 : CSV 대비 높은 복잡성, 낮은 호환성
- 성능 : 높은 읽기 속도에 반해 CSV 보다 느릴 수 있는 쓰기 속도, 높은 병렬 처리 성능
→ 대규모 데이터셋, 분산처리 환경, 특정 컬럼만 자주 조회하는 경우에 적합
- JSON (JavaScript Object Notation)
- 장점 : 높은 호환성, 다양한 데이터 타입 지원, 높은 가독성
- 단점 : 중첩 구조와 태그로 인한 큰 파일크기, 느린 파싱 속도, 높은 메모리 사용량, 스키마의 부재
- 성능 : 적당한 읽기/쓰기 속도, 파싱 및 직렬화로 인한 성능 이슈, 낮은 병렬 처리 성능
→ 웹 애플리케이션과 통합할 때, 중첩된 데이터 구조를 다뤄야 할 때 적합
- DAT (일반적인 바이너리 데이터 파일)
- 장점 : 높은 효율성, 빠른 읽기/쓰기 속도, 데이터 타입 유지 가능
- 단점 : 낮은 가독성, 낮은 호환성, 메타데이터 부재
- 성능 : 높은 읽기/쓰기 속도, 높은 병렬 처리 성능
→ 데이터 구조가 단순할 경우 병렬 처리에 적합
10. 클러스터의 리소스 관리를 게을리하지 말자
- 클러스터 자원을 효율적으로 사용하기 위해 적절한 Executor, Core 개수 설정
- 클러스터 관리자 (Yarn, K8s 등)를 통해 자원 사용을 모니터링하고 최적화
보너스
📢 DataFrame 스키마 잘 정의하기
- 데이터 성능 저하
- 스키마를 정의하지 않은 경우 데이터 타입을 추론하기 위해 추가적인 작업 수행 → 데이터 로딩 속도 지연 발생
- 데이터를 불러올 때
inferSchema=True
옵션 사용시 Pyspark가 각 데이터 타입을 추론하는데, 이는 대규모 데이터셋에서 성능 저하로 이어짐
- 잘못된 데이터 처리
- 숫자형 데이터를 문자열로 설정하면 수치 연산 작업 불가
- 메모리 사용 비효율
- 정수형 데이터를 문자열로 저장하면 더 많은 메모리 사용
- 데이터 무결성 문제
- 날짜 데이터를 문자열로 저장시 날짜 형식이 일관되지 않을 수 있음
📢 분산 시스템에서 데이터 경합 방지하기
- 데이터 경합 (Data Contention)
- 여러 노드가 동일한 데이터를 동시에 업데이트,접근하려 할 때 발생
- 락킹
- 특정 데이터에 접근하는 동안은 다른 프로세스가 해당 데이터에 접근할 수 없게 막는 방법
- 예. 분산 락킹
- 트랜잭션
- 데이터베이스에서 여러 연산을 하나의 작업 단위로 묶어 일관성 유지하는 방법
- ACID (원자성, 일관성, 고립성, 지속성) 특성 보장
- 버전 관리
- 각 데이터 항목에 버전 번호를 부여하여 충돌을 감지하고 해결
- 낙관적 동시성 제어(Optimistic Concurrency Control)에서 사용
- 분할 (Sharding)
- 데이터를 여러 노드에 분할 저장하여 경합 줄이기
- 각 노드는 자신이 담당하는 데이터에 대해서만 업데이트 수행
written by salmonavocado🥑
Share article