Spark 데이터 처리 10계명

salmonavocado's avatar
Aug 02, 2024
Spark 데이터 처리 10계명

1. 셔플링을 주의하자

  • 데이터 분산 처리시 불필요한 셔플링 발생 주의
  • 클러스터 환경에서 동작하기 때문에 데이터와 연산을 적절히 분산시키기 → 파티셔닝 중요
 
 
 

2. 메모리 관리를 잘하자

  • Disk Spill 주의
  • 데이터가 너무 크면 메모리에 들어가지 못해서 Disk Spill이 발생 → OutOfMemoryError 발생
  • OOM Error 예방
    • 대용량 데이터셋을 조인할 때 메모리 초과 오류 발생 가능
    • 방지하기 위해 데이터셋을 적절히 파티셔닝하고, 메모리 설정을 조정하여 데이터가 메모리에 잘 들어가게 함
    •  
📢 메모리 관리 잘 하기
  • 데이터 파티셔닝 및 병렬 처리
  • 캐싱과 체크포인트
  • 스키마 정의
  • 브로드캐스트 변수
  • 메모리 설정 조정
    • Spark 설정 파일(spark-defaults.conf) or SparkSession 설정을 통해 조정
    • 주요 설정
      • spark = SparkSession.builder \ .appName("MemoryManagementExample") \ .config("spark.executor.memory", "4g") \ # 각 익스큐터의 메모리 크기 .config("spark.driver.memory", "2g") \ # 드라이버의 메모리 크기 .config("spark.memory.fraction", "0.8") \ # 익스큐터 메모리의 사용 비율 .config("spark.memory.storageFraction", "0.3") \ # 저장 영역에 할당된 메모리의 비율 .getOrCreate()
  • 데이터 압축
    • 예. parqeut 파일 형식에서 snappy 압축으로 저장
      • df.write.option("compression", "snappy").parquet("hdfs://path/to/output")
  • 불필요한 객체 제거
    • unpersist() 로 캐시된 데이터 메모리를 제거
 
 
📢 메모리 관련 에러 및 성능 저하 사례
  • OOM (OutOfMemoryError)
    • 주로 큰 데이터 처리 및 복잡한 계산 수행시 익스큐터나 드라이버의 메모리 할당이 부족할 때 발생
    • 작업이 중간에 멈추거나 실패
      • 로그 메세지 : java.lang.OutOfMemoryError
    • 해결 방법
      • spark.executor.memory , spark.driver,memory 설정 늘리기
      • 데이터 파티셔닝을 통해 메모리 사용 분산시키기
      • 불필요한 캐싱 피하고 작업이 끝난 후에는 unpersist()로 메모리 해제하기
  • GC (Garbage collection) Overhead
    • JVM이 자주 GC를 실행하여 메모리를 해제하려고 할 때, 메모리 사용이 최적화되지 않아 불필요한 객체가 많이 생성될 때 발생
    • 작업 실행 속도가 느려지며
      • 로그 메세지 : GC overhead limit exceeded
    • 해결 방법
      • spark.executor.memory , spark.driver,memory 설정 조정하여 적절한 크기 설정하기
      • spark.memory.fraction , spark.memory,storageFraction 설정 조정하기
      • 데이터 처리 작업을 최적화하여 불필요한 객체 생성 줄이기
  • Disk Spill
    • 메모리가 부족하여 중간 데이터를 디스크에 저장할 때, spark.sql.shuffle.partitions 설정이 너무 작을 때 발생
    • 작업 실행 속도가 느려지며
      • 로그 메세지 : spill 관련
    • 해결 방법
      • spark.sql.shuffle.partitions 설정을 늘려 파티션 수 조정하기
      • spark.memory.fraction 설정을 조정하여 메모리 사용 최적화하기
  • Driver Memory Issues
    • 드라이버 프로그램이 너무 많은 데이터를 로드하거나 큰 데이터를 직접 처리할 때 발생
    • 드라이버가 중간에 멈추거나 실패
      • 로그 메세지 : OutOfMemoryError
    • 해결 방법
      • spark.driver.memory 설정 늘리기
      • 드라이버 프로그램에서 직접 데이터를 처리하는 대신 RDD, DataFrame을 사용하여 작업 분산시키기
  • Executor Lost
    • 익스큐터가 메모리를 초과하여 종료될 때, 네트워크 이슈 및 디스크 I/O 병목 현상 발생할 때
    • 작업이 중간에 멈추거나 실패
      • 로그 메세지 : ExecutorLostFailure
    • 해결 방법
      • spark.executor.memory 설정 늘리기
      • 클러스터의 네트워크와 디스크 I/O 성능 확인하고 최적화하기
 
 
 

3. 캐싱과 체크포인팅을 잘 활용하자

  • 자주 사용하는 데이터셋은 cache(), persist() 메서드를 통해 메모리에 캐싱하기
  • 체크포인팅을 사용해서 복잡한 연산의 중간 결과 저장하기
  • 반복적인 데이터 사용
    • 여러번 필터링, 조인을 반복적으로 수행해야하는 데이터셋이 있다면 캐싱을 통해 성능 크게 향상 가능
    • 복잡한 연산의 중간 결과 저장시 checkpoint()를 사용해 안정성 향상
    •  
 
 

4. 파티셔닝을 잘 활용하자

  • 파티션 수 설정 및 조정하여 클러스터 노드 간의 작업 분배를 최적화를 통해 성능 개선하기
  • 데이터 로드 성능 최적화
    • 데이터셋을 읽어올 때 파티션 수를 조정하여 데이터 로드 속도 개선 가능
    • repartition(), coalesce() 메서드 사용하여 파티션 수 조절
    •  
📢 파티셔닝 잘 활용하기
  • 자동 파티셔닝
    • Pyspark에서 DataFrame을 사용할 때 기본적으로 데이터를 자동으로 파티셔닝
  • 특정 컬럼을 기준으로 파티셔닝
    • 하지만 자동 파티셔닝이 아닌 특정 컬럼을 기준으로 파티셔닝 후 데이터를 더 높은 효율성으로 처리 가능
  • 파티션 크기 조정
    • 보통 파티션의 크기를 HDFS 블록 크기(125MB ~ 256MB)에 맞추는 것이 좋음
    • S3에 저장할 때에도 125MB ~ 256MB 정도의 크기에 맞추는 것이 좋음. 너무 작은 파일이 많아도 성능에 영향 !
  • 파티셔닝 후 캐싱
    • 파티셔닝한 데이터를 캐시하여 반복적인 작업에서 성능 향상
  • 작업 분산 확인
    • explain() 메서드를 사용하며 물리적 실행계획 확인을 통해 작업 분산 확인
 
 
 

5. 컬럼 연산을 수행할 때는 되도록 내장 함수를 사용하여 최적화 하자

  • 데이터프레임에서 컬럼 연산을 수행할 때 가능한 벡터화된 연산을 사용하여 성능 높일 수 있음
  • 백터화된 연산을 통한 성능 향상
    • withColumn() 메서드를 사용하여 새로운 컬럼 추가할 때, UDF 대신 내장 함수 (col, lit 등)를 사용하기
    • 이를 통해 JVM 레벨에서 최적화되어 뛰어난 성능 유지 가능
 
 
 

6. 조인 연산을 최적화 하자

  • 브로드캐스트 조인
    • 작은 데이터셋을 큰 데이터셋과 조인할 때 작은 데이터셋을 브로드캐스트하여 모든 노드에 복사하면 셔플링 비용 감소 가능
    • broadcast 함수를 사용하여 브로드캐스트 조인 수행 가능
      • broadcasted_df = broadcast(df)
  • 파티셔닝 활용
    • 데이터를 저장할 때 JOIN Key를 기반으로 파티셔닝 후 저장하면 동일한 키를 가진 레코드들이 같은 파티션에 배치되어 조인 성능 개선
  • Skewed Data 처리
    • JOIN KEY가 편향되어 불균형하게 분포되어 있는 경우, 특정 작업자 노드에 부하 집중 발생
    • SALTING 기법을 사용해 데이터 분포를 균일하게 만들기
 
 
 

7. UDF 사용을 되도록이면 최소화 하자

  • 사용자 정의 함수의 사용은 성능을 저하시킬 수 있으므로 기본 제공 함수, 벡터화된 연산 사용
    • 문자열 처리 작업에서 substring(), concat() 등과 같은 내장 함수 사용
    •  
📢 UDF는 왜 성능이 안좋을까?
  • 데이터 직렬화 및 역직렬화 오버헤드
    • JVM에서 실행되는 Spark와 Python 간의 데이터 직렬화 및 역직렬화 오버헤드를 유발하기 때문에 UDF는 일반적으로 Pyspark의 기본 제공 함수보다 느림
    • 작은 데이터셋에서는 문제가 되지 않지만, 대규모 데이터셋에서 큰 오버헤드 유발
  • Spark의 최적화 기능 제한
    • Pyspark의 Catalyst Optimizer는 내장 함수에 대해 다양한 최적화를 수행하지만, UDF에 대해서는 수행 불가
  • 병렬 처리 제한
    • Spark의 병렬 처리 능력을 충분히 활용하지 못할 수 있고, 특히 복잡한 계산을 수행하는 UDF일 경우 성능 저하
 
 
 

8. 로그 분석과 모니터링 없는 데이터 처리는 물 없는 수영장과 같다

  • 작업 진행 상태 모니터링
  • 문제 발생시 로그를 통해 원인 분석
  • Spark UI를 활용해 작업 상태와 성능 확인
    • 각 작업의 실행 계획과 수행 시간 확인 후 병목 지점을 찾아 최적화 진행
 
 
 

9. 적합한 데이터 포맷을 사용하자

  • 적절한 데이터 포맷(parquet, ORC 등)을 선택하여 효율적으로 데이터를 읽고 쓰기
    • 대용량 데이터를 저장할 때는 높은 압축률과 빠른 읽기 성능을 제공하는 Parquet 포맷 사용하여 처리 속도 개선
 
📢 적절한 데이터 포맷 선택하기
  • CSV (Comma-Seperated Values)
    • 장점 : 높은 호환성, 단순 구조, 높은 가독성
    • 단점 : 문자열로 저장되는 데이터 타입, 낮은 압축 효율성, 메타데이터 부족
    • 성능 : 느린 읽기/쓰기 속, 병렬 처리 어려움
    • → 작은 규모의 데이터셋, 호환성이 중요한 경우, 사람이 직접 데이터를 확인해야 하는 경우에 적합
  • Parquet
    • 장점 : 컬럼 기반 저장으로 매우 뛰어난 읽기 성능, 높은 압축 효율성, 메타데이터 존재, 높은 병렬 처리 성능
    • 단점 : CSV 대비 높은 복잡성, 낮은 호환성
    • 성능 : 높은 읽기 속도에 반해 CSV 보다 느릴 수 있는 쓰기 속도, 높은 병렬 처리 성능
    • → 대규모 데이터셋, 분산처리 환경, 특정 컬럼만 자주 조회하는 경우에 적합
  • JSON (JavaScript Object Notation)
    • 장점 : 높은 호환성, 다양한 데이터 타입 지원, 높은 가독성
    • 단점 : 중첩 구조와 태그로 인한 큰 파일크기, 느린 파싱 속도, 높은 메모리 사용량, 스키마의 부재
    • 성능 : 적당한 읽기/쓰기 속도, 파싱 및 직렬화로 인한 성능 이슈, 낮은 병렬 처리 성능
    • → 웹 애플리케이션과 통합할 때, 중첩된 데이터 구조를 다뤄야 할 때 적합
  • DAT (일반적인 바이너리 데이터 파일)
    • 장점 : 높은 효율성, 빠른 읽기/쓰기 속도, 데이터 타입 유지 가능
    • 단점 : 낮은 가독성, 낮은 호환성, 메타데이터 부재
    • 성능 : 높은 읽기/쓰기 속도, 높은 병렬 처리 성능
    • → 데이터 구조가 단순할 경우 병렬 처리에 적합
       
       
       

10. 클러스터의 리소스 관리를 게을리하지 말자

  • 클러스터 자원을 효율적으로 사용하기 위해 적절한 Executor, Core 개수 설정
  • 클러스터 관리자 (Yarn, K8s 등)를 통해 자원 사용을 모니터링하고 최적화
 
 
 

보너스

📢 DataFrame 스키마 잘 정의하기
  • 데이터 성능 저하
    • 스키마를 정의하지 않은 경우 데이터 타입을 추론하기 위해 추가적인 작업 수행 → 데이터 로딩 속도 지연 발생
    • 데이터를 불러올 때 inferSchema=True 옵션 사용시 Pyspark가 각 데이터 타입을 추론하는데, 이는 대규모 데이터셋에서 성능 저하로 이어짐
  • 잘못된 데이터 처리
    • 숫자형 데이터를 문자열로 설정하면 수치 연산 작업 불가
  • 메모리 사용 비효율
    • 정수형 데이터를 문자열로 저장하면 더 많은 메모리 사용
  • 데이터 무결성 문제
    • 날짜 데이터를 문자열로 저장시 날짜 형식이 일관되지 않을 수 있음
 
📢 분산 시스템에서 데이터 경합 방지하기
  • 데이터 경합 (Data Contention)
    • 여러 노드가 동일한 데이터를 동시에 업데이트,접근하려 할 때 발생
  • 락킹
    • 특정 데이터에 접근하는 동안은 다른 프로세스가 해당 데이터에 접근할 수 없게 막는 방법
    • 예. 분산 락킹
  • 트랜잭션
    • 데이터베이스에서 여러 연산을 하나의 작업 단위로 묶어 일관성 유지하는 방법
    • ACID (원자성, 일관성, 고립성, 지속성) 특성 보장
  • 버전 관리
    • 각 데이터 항목에 버전 번호를 부여하여 충돌을 감지하고 해결
    • 낙관적 동시성 제어(Optimistic Concurrency Control)에서 사용
  • 분할 (Sharding)
    • 데이터를 여러 노드에 분할 저장하여 경합 줄이기
    • 각 노드는 자신이 담당하는 데이터에 대해서만 업데이트 수행
 
 

written by salmonavocado🥑
 
Share article

salmonavocado