[스파크 완벽 가이드] Apache Spark 아키텍처

salmonavocado's avatar
Sep 21, 2022
[스파크 완벽 가이드] Apache Spark 아키텍처
📢
본 글은 스파크 완벽 가이드Ch2. Apache Spark 간단히 살펴보기 요약본에 여러 정보를 취합한 글입니다.
 

스파크 아키텍처

스파크 애플리케이션

  • Spark Application
    • Spark Cluster에서 실행되는 독립적인 프로그램
    • 주로 대규모 데이터 (병렬)처리 작업 수행
      • 실시간 스트리밍 분석, 기계 학습 모델 훈련 등
    • 하나의 클러스터에서 여러 개의 스파크 애플리케이션 실행 가능
      • 단, 각 스파크 애플리케이션은 별도의 JVM 프로세스에서 동작하므로 서로 다른 스파크 애플리케이션 간의 직접적인 데이터 공유는 불가능
    • Driver Program과 Executor로 구성
 
  • Driver Program (on Master Node)
    • 클러스터 노드 중 1개의 노드에서 실행
    • 애플리케이션의 수명 주기 동안 관련 정보 모두 유지
    • 전반적인 Executor의 작업과 관련된 분석, 배포, 스케줄링 등의 역할 수행
    • 사용자 프로그램이나 입력에 대한 응답
      • 사용자가 구성한 사용자 프로그램(Job)을 Task 단위로 변환하여 Executor에 전달
 
  • Executor (on Worker Node)
    • 클러스터 노드 중 다수의 노드에서 실행
    • Driver가 할당한 작업(Task) 수행 = Driver가 할당한 코드 실행
    • 진행 상황을 다시 Driver 노드에 보고
    • 할당된 스파크 애플리케이션이 완전히 종료된 후에야 Executor는 할당에서 해방
    •  
       

클러스터 매니저

  • Cluster
    • 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용 가능
    •  
  • Cluster Manager
    • 스파크 애플리케이션의 리소스를 관리하고 작업을 스케줄링(효율적으로 분배)하는 역할
    • 드라이버 프로그램과 워커 노드 간의 중재자 역할
    • Spark 지원 클러스터 매니저
      • Standalone 클러스터 매니저
        • 스파크 자체에 내장된 클러스터 매니저로 단일 컴퓨터에서 스파크 전체를 동작시킴
        • Worker 노드 : Executor 개수 = 1 : 1
        • 리소스 할당, 작업 스케줄링, 모니터링 작업 수행
        • Driver와 Executor가 각각 Thread로 동작
      • Apache Mesos
        • 유연하고 확장 가능하여 여러 프레임워크가 동일한 클러스터에서 실행될 수 있도록 지원
        • Worker 노드 : Executor 개수 = 1 : N
        • 다중 프레임워크 지원, 동적 리소스 할당, 격리 및 보안 작업 수행
      • Hadoop YARN (Yet Another Resource Negotiatior)
        • 하둡에코시스템에서 사용되는 리소스 관리 프레임워크로 하둡 클러스터에서 스파크 애플리케이션 실행 가능
        • Worker 노드 : Executor 개수 = 1 : N
        • 리소스 관리, 작업 스케줄링, 보안 및 멀티 태넌시 작업 수행
        • Deploy Mode
          • Client (Default)
            • 드라이버 프로그램이 클러스터 외부에 위치하여 실행
          • Cluster
            • 드라이버 프로그램이 클러스터 내부 노드들 중 하나 위치하여 실행
      • Kubernetes
        • 컨테이너화된 애플리케이션을 자동으로 배포, 확장 및 관리하는 오픈 소스 시스템
        • Worker 노드 : Executor 개수 = 1 : N
        • 컨테이너 오케스트레이션, 자동화, 리소스 관리 작업 수행
       
       

스파크 애플리케이션 실행 과정

📢
사용자 → 스파크 애플리케이션 제출 (submit) → 클러스터 매니저 → 자원 할당 → 사용자( 할당 받은 자원으로 작업 처리)
  • 사용자가 스파크 애플리케이션 제출
    • spark-submit을 통해 spark application 제출
  • 드라이버 프로그램 시작
    • 사용자 코드가 실행되어 SparkContext or SparkSession 객체 생성
    • SparkContext or SparkSession으로 Cluster Manager와 연결
  • 작업 분할
    • 드라이버 프로그램 내 Spark Context 작업을 여러 개의 Task로 나누어 Cluster manager에게 요청 및 제출
  • 태스크 스케줄링
    • 클러스터 매니저가 각 태스크를 워커 노드의 Executor에 할당
  • 태스크 실행
    • 익스큐터가 태스크를 실행하고, 중간 결과를 저장하며, 최종 결과를 Driver Program에 반환
  • 결과 수집
    • 드라이버 프로그램이 최종 결과를 수집하고 필요에 따라 추가 처리 수행 후 사용자에게 반환
 
 

SparkContext vs SparkSession

  • Entry Point
    • 프로그램 실행이 시작되는 함수나 메서드
      • 예시. main()
    • 스파크 기능이 사용하기 위해 초기에 접근해야 하는 엔트리 포인트인 SparkContext, SparkSession
    •  
  • SparkContext
    • 스파크에서 RDD를 사용하기 위한 주요 엔트리 포인트
    • 스파크 클러스터와 연결 관리
    • RDDs, accumulators, broadcast variables과 같은 기본적인 작업 실행 가능
    •  
  • SparkSession
    • DataFrame, Dataset을 다루는데 더 적합한 엔트리 포인트
    • 다양한 데이터 소스 or 포맷에 접근할 수 있는 통합된 인터페이스 제공
    • SparkSession : Spark Application = 1:1
    • 내부적으로 SparkContext 객체를 생성하기 때문에 SparkSession을 사용해도 SparkContext의 모든 함수나 특징 사용 가능
    • SQL 쿼리를 사용하여 데이터를 조작할 수 있도록 지원
      • RDD를 직접 조작하는 것보다 더 명확하고 간결
      • 필요한 컬럼만 불러와 사용하는 컬럼 가지치기(Column Pruning), 필터 조건에 의해 스토리 레벨에서 불필요한 데이터를 미리 필터링하는 조건자 하향(Predicate Pushdown) 등의 최적화 기법 사용 가
 
 
 

스파크 데이터 구조

스파크의 다양한 언어 API

  • 스파크는 모든 언어에 맞는 핵심 개념을 제공 -> 클러스터 머신에서 실행되는 스파크 코드로 변환
  • SQL : ANSI SQL : 2003 표준 중 일부 지원
 

스파크 API

  • Spark는 데이터 처리 작업을 위해 고수준, 저수준의 API 제공
  • 데이터 구조와 작업의 복잡성에 따라 API 선택
 
  • 저수준의 비구조적 API
    • RDD (Resilient Distributed Dataset)
      • 변하지 않는 분산 데이터
        • 변하지 않는 (Resilient) : 메모리 내부에서 데이터가 손실시 유실된 파티션을 재연산해 복구 가능 == Read Only
          • 불변의 특성으로 인해 특정 동작을 위해서는 기존 RDD가 변형된 새로운 RDD로 생성
          • DAG의 형태를 가지기 때문에 특정 RDD 관련 정보가 메모리에서 유실되었을 경우 그래프 복기 및 재계산을 통해 자동으로 복구 가능
          • 따라서, 스파크는 Fault-Tolerant (결함 허용)를 보장함
        • 분산된 (Distributed) : 스파크 클러스터를 통해 메모리에 분산되어 저장
      • 비구조적 데이터를 처리할 때 매우 유연하게 사용 가능. 복잡한 데이터 변환 및 조작 가능
      • 저수준 제어로 데이터 분할, 캐싱, 파티셔닝 등 세부적인 제어 가능
      • 타입 안정성 제공으로 컴파일 단계에서 오류 발견 가능
      • map, filter, reduce와 같은 함수형 프로그래밍 스타일의 연산 제공
 
  • 고수준의 구조적 API
    • DataFrame
      • RDD의 성능적 이슈로 등장
        • 메모리/디스크 용량 이슈, 스키마 구조의 부재, 직렬화와 가비지컬렉션으로 인한 메모리 오버헤드, 최적화 엔진의 부재
      • 구조화된 데이터 구조
        • 스키마가 있는 분산 데이터 컬렉션으로 SQL 테이블과 유사한 형식
        • 스키마 :각 컬럼의 타입을 정의한 목록
        • 여러 컴퓨터에 데이터를 분산 저장하는 것이 스프레드시트와 비슷하지만 다른점
      • 데이터를 오프-힙 영역에 저장하여 GC 및 직렬화 오버헤드 감소
      • RDD와 마찬가지로 Immutable함
      • 파이썬/R 모두 지원하기 때문에 Pandas 라이브러리의 DataFrame/ R의 DataFrame으로 쉽게 변환 가능
        • Java, Scala, Python, R 지원
      • DataFrame에 필터를 지정하면 모든 작업이 메모리에서 일어남
      • 비타입 안정성을 제공하여 런타임에만 타입 검사 수행
      • SparkSQL 사용 가능
        • Catalyst Optimizer를 사용하여 쿼리 성능 최적화 가능
      • 사용 시기
        • 성능 최적화(Catalyst Optimizer)가 필요할 때
        • 구조적 데이터(csv, parqeut) 다룰 때
        • SQL 쿼리를 사용해 데이터를 리할 때
        • 다양한 내장 함수(집계, 조인, 윈도우 함수 등)를 사용할 때
         
    • DataSet
      • RDD의 장점과 DataFrame의 장점을 합친 것
      • 타입 안정성을 제공하여 컴파일 타임에 타입 검사를 수행하여 오류 방지
      • JVM 객체를 사용하여 데이터 조작이 가능한 객체 지향 프로그래밍
        • Java와 Scala에서만 사용 가능
      • 사용 시기
        • DataFrame 기능만으로는 연산을 수행할 수 없는 경우
          • 복잡한 비즈니스 SQL, DataFrame 대신 단일 함수로 인코딩해야하는 경우
        • 성능 저하를 감수하더라도 타입 안정성을 가진 데이터 타입을 사용하고 싶은 경우
 
 
 

스파크 더 알아보기

RDD 동작 원리

notion image
  • 지연 연산 (lazy evaluation)
    • RDD 동작 원리의 핵심
    • 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미함 = 즉시 실행하지 않음
      • 스파크는 특정 연산 명령이 내려진 즉시 데이터 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획 생성
      • 코드를 실행하는 마지막 순간까지 대기 후, 원형 DataFrame에 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
      • 하둡의 Map Reduce 동작과 반대
      • 간단한 연산에 대한 성능적 이슈를 고려하지 않아도 되는 장점
       
  • Transformation
    • 새로운 RDD를 생성하는 동작
      • 실제 연산작업을 수행하지 않고 논리적 실행 계획을 세움 (리지니 형태로 기록)
    • 대표적인 트랜스포메이션 연산자 종류
      • map(), filter(), distinct(), union(), intersection(), where(), read(), sort(), groupBy(), sum(), withCoulmnRenamed(), orderBy(), select(), join()
      •  
    • 좁은 의존성 (narrow dependency)
      • 좁은 트랜스포메이션은 하나의 입력 파티션이 하나의 출력 파티션에만 영향 = 1:1
      • 좁은 트랜스포메이션을 사용하면 스파크에서 파이프라이닝 자동 수행
        • 파이프라이닝 
          • 명령어의 데이터 경로를 세분화하고, 각기 다른 세부 단계를 동시에 수행하게 함으로써, 여러 명령어들을 중첩 수행 가능하게 만들어 성능을 향상하는 것을 의미
      • filter(), contains()
    • 넓은 의존성 (wide dependency)
      • 넓은 트랜스포메이션은 하나의 입력 파티션이 여러 출력 파티션에 영향 = 1:N
      • 새로운 RDD 생성시 셔플 발생
        • 셔플
          • 스파크가 클러스터에서 파티션 교환하는 작업
          • 스파크는 셔플의 결과를 디스크에 저장
      • groupBy(), orderBy()
 
  • Action
    • 실제 연산을 수행하는 연산자
    • 대표적인 액션 연산자 종류
      • count(), collect(), top(num), takeOrdered(num), reduce(func), show(), take(), save()
       
 

파티션

  • 데이터 구조를 구성하고 있는 최소 단위 객체
  • 서로 다른 노드에서 분산 처리
  • 1 Core = 1 Task = 1 Partition
    • Partition 수 = Core 수
    • Partition 크기 → 메모리 크기
    • 적은 수의 Partition = 크기가 큰 Partition, 많은 수의 Partition = 크기가 작은 Partition
  • 병렬성
    • 파티션 1 , 수천개의 익스큐터 = 병렬성 1
    • 파티션 수백개 , 익스큐터 1 = 병렬성 1
  • DataFrame 사용시 수동/개별적으로 파티션 처리할 필요 없음 (자동)
 
 

스파크 UI

  • 스파크 잡의 진행 상황을 모니터링 할 때 사용
  • 드라이버 노드의 4040 포트로 접속
    • 로컬 = http://localhost:4040
  • 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보 확인
  • 스파크 잡 튜닝/디버깅 할 때 유용
    • 스파크 잡 : 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져있으며 스파크 UI로 잡을 모니터링 할 수 있다
 
 

종합 예제

데이터 연산

flightData2015=spark.read.option("inferschema","true").option("header","true").csv("/data/~~.csv")
  • SparkSession의 DataFrameReader 클래스를 사용해 데이터 읽기
    • 특정 파일 포맷과 몇 가지 옵션 설정
  • inferSchema : 스키마 추론 ( 스파크 DF의 스키마 정보 알아내기 )
  • 스키마 정보를 얻기 위해 데이터 조금 읽음
  • 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석
    • 운영 환경에서는 데이터를 읽을 때 스키마를 엄격하게 지정하는 옵션 사용
  • header : 파일의 첫 로우를 헤더로 지정하는 옵션 설정
  • 로우의 수를 알 수 없는 이유 : 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션
 
flightData2015.sort("count").explain()
  • explain() : DF의 계보나 스파크의 쿼리 실행 계획 알 수 있음
 
spark.conf.set("spark.sql.shuffle.partitions","5") flightData2015.sort("count").take(2)
  • 트랜스포메이션의 논리적 실행 계획은 DF의 계보를 정리함
  • 스파크는 "계보를 통해" 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
  • 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심 !
    • 함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력을 생성
  • 사용자는 물리적 데이터를 직접 다루지 않지만, 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성 제어
    • 값을 변경하면 잡의 실제 실행 특성 제어 가능 / 스파크 UI에 접속해 잡의 실행 상태와 스파크 잡의 물리적, 논리적 실행 특성 확인 가능
 
 

DataFrame과 SQL

  • 스파크 SQL을 사용하면 모든 DF를 테이블이나 뷰(임시 테이블) 로 등록한 후 SQL 쿼리 사용 가능
  • ' craeteOrReplaceTempView' : df를 테이블이나 뷰로 생성
  • spark.sql(""" """) 메서드로 sql 쿼리 실행
  • spark : SparkSession의 변수
 
# DataFrame 구문 from pyspark.sql.functions import desc flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)","destination_total")\ .sort(desc("destination_total").limit(5).show() # SQL 구문 maxSql=spark.sql(""" SELECT DEST_COUNTRY_NAME ,sum(count) AS destination_total FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5 """) maxSql.show()
  • 실행 계획은 트랜스포메이션의 지향성 비순환 그래프 ( DAG ) 이며 액션이 호출되면 결과를 만들어냄
  • 지향성 비순환 그래프의 각 단계는 불변성을 가진 신규 DF를 생성
  • read, groupBy ~ limt 까지 트랜스포메이션으로 프로세스를 만들고 마지막 단계에서 액션을 수행해 DF의 결과를 모으는 프로세스를 시작! 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트나 배열을 반환
  • explain으로 실행 계획을 보면 , partial_sum 함수를 호출할 때 집계가 두 단계로 나눠지는데, 그 이유는 숫자 목록의 합을 구하는 연산이 가환성을 가져 연산 시 파티션별 처리가 가능하기 때문.
  • 가환성 : 연산시 순서를 바꿔도 그 결과가 변하지 않는 일
  • 데이터를 드라이버로 모으는 대신에 스파크가 지원하는 여러 데이터소스로 내보낼 수도 있음
 
 

정리

  • 스파크의 기초
  • 트랜스포메이션과 액션
  • 스파크가 DF의 실행 계획을 최적화하기 위해 트랜스포메이션의 지향성 비순환 그래프를 지연 실행하는 방법
  • 데이터가 파티션으로 구성되는 방법
  • 복잡한 트랜스포메이션 작업을 실행하는 단계
 
 

참고 사이트

 

written by salmonavocado🥑
 
Share article

salmonavocado