본 글은 스파크 완벽 가이드의 Ch2. Apache Spark 간단히 살펴보기 요약본에 여러 정보를 취합한 글입니다.
스파크 아키텍처
스파크 애플리케이션
- Spark Application
- Spark Cluster에서 실행되는 독립적인 프로그램
- 주로 대규모 데이터 (병렬)처리 작업 수행
- 실시간 스트리밍 분석, 기계 학습 모델 훈련 등
- 하나의 클러스터에서 여러 개의 스파크 애플리케이션 실행 가능
- 단, 각 스파크 애플리케이션은 별도의 JVM 프로세스에서 동작하므로 서로 다른 스파크 애플리케이션 간의 직접적인 데이터 공유는 불가능
- Driver Program과 Executor로 구성
- Driver Program (on Master Node)
- 클러스터 노드 중 1개의 노드에서 실행
- 애플리케이션의 수명 주기 동안 관련 정보 모두 유지
- 전반적인 Executor의 작업과 관련된 분석, 배포, 스케줄링 등의 역할 수행
- 사용자 프로그램이나 입력에 대한 응답
- 사용자가 구성한 사용자 프로그램(Job)을 Task 단위로 변환하여 Executor에 전달
- Executor (on Worker Node)
- 클러스터 노드 중 다수의 노드에서 실행
- Driver가 할당한 작업(Task) 수행 = Driver가 할당한 코드 실행
- 진행 상황을 다시 Driver 노드에 보고
- 할당된 스파크 애플리케이션이 완전히 종료된 후에야 Executor는 할당에서 해방
클러스터 매니저
- Cluster
- 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용 가능
- Cluster Manager
- 스파크 애플리케이션의 리소스를 관리하고 작업을 스케줄링(효율적으로 분배)하는 역할
- 드라이버 프로그램과 워커 노드 간의 중재자 역할
- Spark 지원 클러스터 매니저
- Standalone 클러스터 매니저
- 스파크 자체에 내장된 클러스터 매니저로 단일 컴퓨터에서 스파크 전체를 동작시킴
- Worker 노드 : Executor 개수 = 1 : 1
- 리소스 할당, 작업 스케줄링, 모니터링 작업 수행
- Driver와 Executor가 각각 Thread로 동작
- Apache Mesos
- 유연하고 확장 가능하여 여러 프레임워크가 동일한 클러스터에서 실행될 수 있도록 지원
- Worker 노드 : Executor 개수 = 1 : N
- 다중 프레임워크 지원, 동적 리소스 할당, 격리 및 보안 작업 수행
- Hadoop YARN (Yet Another Resource Negotiatior)
- 하둡에코시스템에서 사용되는 리소스 관리 프레임워크로 하둡 클러스터에서 스파크 애플리케이션 실행 가능
- Worker 노드 : Executor 개수 = 1 : N
- 리소스 관리, 작업 스케줄링, 보안 및 멀티 태넌시 작업 수행
- Deploy Mode
- Client (Default)
- 드라이버 프로그램이 클러스터 외부에 위치하여 실행
- Cluster
- 드라이버 프로그램이 클러스터 내부 노드들 중 하나 위치하여 실행
- Kubernetes
- 컨테이너화된 애플리케이션을 자동으로 배포, 확장 및 관리하는 오픈 소스 시스템
- Worker 노드 : Executor 개수 = 1 : N
- 컨테이너 오케스트레이션, 자동화, 리소스 관리 작업 수행
스파크 애플리케이션 실행 과정
사용자 → 스파크 애플리케이션 제출 (submit) → 클러스터 매니저 → 자원 할당 → 사용자( 할당 받은 자원으로 작업 처리)
- 사용자가 스파크 애플리케이션 제출
- spark-submit을 통해 spark application 제출
- 드라이버 프로그램 시작
- 사용자 코드가 실행되어 SparkContext or SparkSession 객체 생성
- SparkContext or SparkSession으로 Cluster Manager와 연결
- 작업 분할
- 드라이버 프로그램 내 Spark Context 작업을 여러 개의 Task로 나누어 Cluster manager에게 요청 및 제출
- 태스크 스케줄링
- 클러스터 매니저가 각 태스크를 워커 노드의 Executor에 할당
- 태스크 실행
- 익스큐터가 태스크를 실행하고, 중간 결과를 저장하며, 최종 결과를 Driver Program에 반환
- 결과 수집
- 드라이버 프로그램이 최종 결과를 수집하고 필요에 따라 추가 처리 수행 후 사용자에게 반환
SparkContext vs SparkSession
- Entry Point
- 프로그램 실행이 시작되는 함수나 메서드
- 예시. main()
- 스파크 기능이 사용하기 위해 초기에 접근해야 하는 엔트리 포인트인 SparkContext, SparkSession
- SparkContext
- 스파크에서 RDD를 사용하기 위한 주요 엔트리 포인트
- 스파크 클러스터와 연결 관리
- RDDs, accumulators, broadcast variables과 같은 기본적인 작업 실행 가능
- SparkSession
- DataFrame, Dataset을 다루는데 더 적합한 엔트리 포인트
- 다양한 데이터 소스 or 포맷에 접근할 수 있는 통합된 인터페이스 제공
- SparkSession : Spark Application = 1:1
- 내부적으로 SparkContext 객체를 생성하기 때문에 SparkSession을 사용해도 SparkContext의 모든 함수나 특징 사용 가능
- SQL 쿼리를 사용하여 데이터를 조작할 수 있도록 지원
- RDD를 직접 조작하는 것보다 더 명확하고 간결
- 필요한 컬럼만 불러와 사용하는 컬럼 가지치기(Column Pruning), 필터 조건에 의해 스토리 레벨에서 불필요한 데이터를 미리 필터링하는 조건자 하향(Predicate Pushdown) 등의 최적화 기법 사용 가
스파크 데이터 구조
스파크의 다양한 언어 API
- 스파크는 모든 언어에 맞는 핵심 개념을 제공 -> 클러스터 머신에서 실행되는 스파크 코드로 변환
- SQL : ANSI SQL : 2003 표준 중 일부 지원
스파크 API
- Spark는 데이터 처리 작업을 위해 고수준, 저수준의 API 제공
- 데이터 구조와 작업의 복잡성에 따라 API 선택
- 저수준의 비구조적 API
- RDD (Resilient Distributed Dataset)
- 변하지 않는 분산 데이터
- 변하지 않는 (Resilient) : 메모리 내부에서 데이터가 손실시 유실된 파티션을 재연산해 복구 가능 == Read Only
- 불변의 특성으로 인해 특정 동작을 위해서는 기존 RDD가 변형된 새로운 RDD로 생성
- DAG의 형태를 가지기 때문에 특정 RDD 관련 정보가 메모리에서 유실되었을 경우 그래프 복기 및 재계산을 통해 자동으로 복구 가능
- 따라서, 스파크는 Fault-Tolerant (결함 허용)를 보장함
- 분산된 (Distributed) : 스파크 클러스터를 통해 메모리에 분산되어 저장
- 비구조적 데이터를 처리할 때 매우 유연하게 사용 가능. 복잡한 데이터 변환 및 조작 가능
- 저수준 제어로 데이터 분할, 캐싱, 파티셔닝 등 세부적인 제어 가능
- 타입 안정성 제공으로 컴파일 단계에서 오류 발견 가능
- map, filter, reduce와 같은 함수형 프로그래밍 스타일의 연산 제공
- 고수준의 구조적 API
- DataFrame
- RDD의 성능적 이슈로 등장
- 메모리/디스크 용량 이슈, 스키마 구조의 부재, 직렬화와 가비지컬렉션으로 인한 메모리 오버헤드, 최적화 엔진의 부재
- 구조화된 데이터 구조
- 스키마가 있는 분산 데이터 컬렉션으로 SQL 테이블과 유사한 형식
- 스키마 :각 컬럼의 타입을 정의한 목록
- 여러 컴퓨터에 데이터를 분산 저장하는 것이 스프레드시트와 비슷하지만 다른점
- 데이터를 오프-힙 영역에 저장하여 GC 및 직렬화 오버헤드 감소
- RDD와 마찬가지로 Immutable함
- 파이썬/R 모두 지원하기 때문에 Pandas 라이브러리의 DataFrame/ R의 DataFrame으로 쉽게 변환 가능
- Java, Scala, Python, R 지원
- DataFrame에 필터를 지정하면 모든 작업이 메모리에서 일어남
- 비타입 안정성을 제공하여 런타임에만 타입 검사 수행
- SparkSQL 사용 가능
- Catalyst Optimizer를 사용하여 쿼리 성능 최적화 가능
- 사용 시기
- 성능 최적화(Catalyst Optimizer)가 필요할 때
- 구조적 데이터(csv, parqeut) 다룰 때
- SQL 쿼리를 사용해 데이터를 리할 때
- 다양한 내장 함수(집계, 조인, 윈도우 함수 등)를 사용할 때
- DataSet
- RDD의 장점과 DataFrame의 장점을 합친 것
- 타입 안정성을 제공하여 컴파일 타임에 타입 검사를 수행하여 오류 방지
- JVM 객체를 사용하여 데이터 조작이 가능한 객체 지향 프로그래밍
- Java와 Scala에서만 사용 가능
- 사용 시기
- DataFrame 기능만으로는 연산을 수행할 수 없는 경우
- 복잡한 비즈니스 SQL, DataFrame 대신 단일 함수로 인코딩해야하는 경우
- 성능 저하를 감수하더라도 타입 안정성을 가진 데이터 타입을 사용하고 싶은 경우
스파크 더 알아보기
RDD 동작 원리

- 지연 연산 (lazy evaluation)
- RDD 동작 원리의 핵심
- 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미함 = 즉시 실행하지 않음
- 스파크는 특정 연산 명령이 내려진 즉시 데이터 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획 생성
- 코드를 실행하는 마지막 순간까지 대기 후, 원형 DataFrame에 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
- 하둡의 Map Reduce 동작과 반대
- 간단한 연산에 대한 성능적 이슈를 고려하지 않아도 되는 장점
- Transformation
- 새로운 RDD를 생성하는 동작
- 실제 연산작업을 수행하지 않고 논리적 실행 계획을 세움 (리지니 형태로 기록)
- 대표적인 트랜스포메이션 연산자 종류
- map(), filter(), distinct(), union(), intersection(), where(), read(), sort(), groupBy(), sum(), withCoulmnRenamed(), orderBy(), select(), join()
- 좁은 의존성 (narrow dependency)
- 좁은 트랜스포메이션은 하나의 입력 파티션이 하나의 출력 파티션에만 영향 = 1:1
- 좁은 트랜스포메이션을 사용하면 스파크에서 파이프라이닝 자동 수행
- 파이프라이닝
- 명령어의 데이터 경로를 세분화하고, 각기 다른 세부 단계를 동시에 수행하게 함으로써, 여러 명령어들을 중첩 수행 가능하게 만들어 성능을 향상하는 것을 의미
- filter(), contains()
- 넓은 의존성 (wide dependency)
- 넓은 트랜스포메이션은 하나의 입력 파티션이 여러 출력 파티션에 영향 = 1:N
- 새로운 RDD 생성시 셔플 발생
- 셔플
- 스파크가 클러스터에서 파티션 교환하는 작업
- 스파크는 셔플의 결과를 디스크에 저장
- groupBy(), orderBy()
- Action
- 실제 연산을 수행하는 연산자
- 대표적인 액션 연산자 종류
- count(), collect(), top(num), takeOrdered(num), reduce(func), show(), take(), save()
파티션
- 데이터 구조를 구성하고 있는 최소 단위 객체
- 서로 다른 노드에서 분산 처리
- 1 Core = 1 Task = 1 Partition
- Partition 수 = Core 수
- Partition 크기 → 메모리 크기
- 적은 수의 Partition = 크기가 큰 Partition, 많은 수의 Partition = 크기가 작은 Partition
- 병렬성
- 파티션 1 , 수천개의 익스큐터 = 병렬성 1
- 파티션 수백개 , 익스큐터 1 = 병렬성 1
- DataFrame 사용시 수동/개별적으로 파티션 처리할 필요 없음 (자동)
스파크 UI
- 스파크 잡의 진행 상황을 모니터링 할 때 사용
- 드라이버 노드의 4040 포트로 접속
- 로컬 = http://localhost:4040
- 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보 확인
- 스파크 잡 튜닝/디버깅 할 때 유용
- 스파크 잡 : 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져있으며 스파크 UI로 잡을 모니터링 할 수 있다
종합 예제
데이터 연산
flightData2015=spark.read.option("inferschema","true").option("header","true").csv("/data/~~.csv")
- SparkSession의 DataFrameReader 클래스를 사용해 데이터 읽기
- 특정 파일 포맷과 몇 가지 옵션 설정
- inferSchema : 스키마 추론 ( 스파크 DF의 스키마 정보 알아내기 )
- 스키마 정보를 얻기 위해 데이터 조금 읽음
- 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석
- 운영 환경에서는 데이터를 읽을 때 스키마를 엄격하게 지정하는 옵션 사용
- header : 파일의 첫 로우를 헤더로 지정하는 옵션 설정
- 로우의 수를 알 수 없는 이유 : 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션
flightData2015.sort("count").explain()
- explain() : DF의 계보나 스파크의 쿼리 실행 계획 알 수 있음
spark.conf.set("spark.sql.shuffle.partitions","5") flightData2015.sort("count").take(2)
- 트랜스포메이션의 논리적 실행 계획은 DF의 계보를 정리함
- 스파크는 "계보를 통해" 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
- 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심 !
- 함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력을 생성
- 사용자는 물리적 데이터를 직접 다루지 않지만, 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성 제어
- 값을 변경하면 잡의 실제 실행 특성 제어 가능 / 스파크 UI에 접속해 잡의 실행 상태와 스파크 잡의 물리적, 논리적 실행 특성 확인 가능
DataFrame과 SQL
- 스파크 SQL을 사용하면 모든 DF를 테이블이나 뷰(임시 테이블) 로 등록한 후 SQL 쿼리 사용 가능
- ' craeteOrReplaceTempView' : df를 테이블이나 뷰로 생성
- spark.sql(""" """) 메서드로 sql 쿼리 실행
- spark : SparkSession의 변수
# DataFrame 구문
from pyspark.sql.functions import desc
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)","destination_total")\
.sort(desc("destination_total").limit(5).show()
# SQL 구문
maxSql=spark.sql("""
SELECT DEST_COUNTRY_NAME ,sum(count) AS destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
- 실행 계획은 트랜스포메이션의 지향성 비순환 그래프 ( DAG ) 이며 액션이 호출되면 결과를 만들어냄
- 지향성 비순환 그래프의 각 단계는 불변성을 가진 신규 DF를 생성
- read, groupBy ~ limt 까지 트랜스포메이션으로 프로세스를 만들고 마지막 단계에서 액션을 수행해 DF의 결과를 모으는 프로세스를 시작! 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트나 배열을 반환
- explain으로 실행 계획을 보면 , partial_sum 함수를 호출할 때 집계가 두 단계로 나눠지는데, 그 이유는 숫자 목록의 합을 구하는 연산이 가환성을 가져 연산 시 파티션별 처리가 가능하기 때문.
- 가환성 : 연산시 순서를 바꿔도 그 결과가 변하지 않는 일
- 데이터를 드라이버로 모으는 대신에 스파크가 지원하는 여러 데이터소스로 내보낼 수도 있음
정리
- 스파크의 기초
- 트랜스포메이션과 액션
- 스파크가 DF의 실행 계획을 최적화하기 위해 트랜스포메이션의 지향성 비순환 그래프를 지연 실행하는 방법
- 데이터가 파티션으로 구성되는 방법
- 복잡한 트랜스포메이션 작업을 실행하는 단계
참고 사이트
written by salmonavocado🥑
Share article