Search

Spark Streaming 실습

카테고리
Data Engineering & MLOps
Index
Data Engineering
Spark
Stream Data Processing
날짜
2023/04/13

Spark Streaming 실습

Spark Streaming의 Dstream 연산을 활용해 Twitter API의 실시간 해시태그 데이터 시각화 하기

Part 1. 실습 준비

실습 환경

1.
Docker Desktop
2.
Mac 사용자분들은 터미널, Window 사용자분들은 git bash 환경에서 진행해주시면 됩니다.

실습 파일

twitter_streaming.zip
19.8KB

Part 2. 실습 진행

1. 터미널에 jupyter/all-spark-notebook 도커 이미지 다운 및 컨테이너 생성

도커 데스크톱 실행
all-spark-notebook은 주피터에서 공식적으로 제공하는 스파크, 주피터랩이 포함된 도커 이미지
<twitter_streaming폴더위치> : git clone 한 twitter_streaming 폴더의 로컬 위치
→ pwd로 확인가능
# 실습파일 git clone git clone https://github.com/chriss060/twitter_streaming
Bash
복사
docker run -p 8888:8888 -e GRANT_SUDO=yes --user root -e JUPYTER_ENABLE_LAB=yes -v <twitter_streaming폴더위치>:/home/jovyan --name jupyter --restart always jupyter/all-spark-notebook
Docker
복사
→ 로그 하단에 나오는 주피터랩 링크 메모장에 저장
ex) http://127.0.0.1:8888/lab?token=토큰넘버

2. 브라우저로 주피터 랩 링크 접속, 런처에서 터미널 열기

만약 주피터랩 접속 시 토큰 credential 문제가 생긴다면

3. 터미널에서 Twitter API에서 제공한 Bearer Token 등록하기 및

export 'BEARER_TOKEN'='AAAAAAAAAAAAAAAAAAAAADFvmQEAAAAATJmGj%2BjxxR4u86%2FCdP40tp50RbA%3DV0XFEUwru2tQMD76iEVNd4ued1f54PvawXE1FOSqbQCMvqjVKv'
Bash
복사

4. First Twitter App.ipynb에서 SparkSteramingContext 실행 전까지 코드 실행

SparkStreamingContext, Socket 설정 및 SQLContext 쿼리 작업 지정

4. 터미널에서 나눔 폰트 설치

Matplotlib 라이브러리에서 한글 깨짐 방지를 위해 나눔 폰트 설치
sudo apt-get update # 나눔 폰트 설치 sudo apt-get install fonts-nanum # matplotlib에 남아있는 폰트 캐시 삭제 rm -rf ~/.cache/matplotlib/* # 폰트 캐시 생성 fc-cache -fv
Bash
복사

5. 터미널에서 TweetRead.py 실행

python3 TweetRead.py
Python
복사
→ 파일 실행 시 설정해준 5555포트에서 응답 대기

6. First Twitter App.ipynb로 돌아가서 SSC 시작

ssc.start()
Python
복사
→ sparkstreamingcontext 시작 후 다시 터미널로 돌아가면 Twitter API로 부터 트윗 데이터를 실시간으로 받아오는 것을 확인할 수 있음

7. Seaborn 라이브러리의 barplot으로 특정 주제의 상위 10개 해쉬태그 시각화

import time from IPython import display import matplotlib.pyplot as plt import seaborn as sns import pandas # Only works for Jupyter Notebooks! %matplotlib inline # 나눔고딕 폰트 적용 plt.rcParams["font.family"] = 'NanumGothic'
Python
복사
count = 0 while count < 10: time.sleep( 3 ) #tweets 테이블에서 tag, count 필드만 추출 top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' ) #판다스 dataframe으로 변환 top_10_df = top_10_tweets.toPandas() display.clear_output(wait=True) plt.figure( figsize = ( 10, 8 ) ) #barplot 생성 sns.barplot( x="count", y="tag", data=top_10_df) plt.show() count = count + 1
Python
복사
→ count를 10으로 설정해서 10초 동안 SparkStreamingContext와 SQLContext로 집계한 해쉬태그 수를 시각화

8. SparkStreamingContext 소켓 닫기

ssc.stop()
Python
복사

References

Jupyter, Spark 도커 이미지 다운 - https://jupyter-docker-stacks.readthedocs.io/en/latest/
PySpark로 SQLContext 쿼리 작성 및 Seaborn 시각화 - https://www.youtube.com/watch?v=do8w5Txpq34