[해결] pyspark로 mongoDB에서 데이터 불러오기
- -
안녕하세요, 이번에 대용량 시계열 데이터를 분석하기 시작하면서 겪었던 문제점과 해결 방안에 대해 남기기 위한 로그입니다. 저와 같은 문제를 겪은 국내/외 개발자들은 많았으나 에러가 왜 생겼는지, 해결 방법은 무엇인지 한 군데에서 깔끔하게 볼 수 있는 페이지는 없어 정리합니다.
Environment
* 사내 보안망으로 분리된 원격 서버 위에서 데이터 로딩이 필요했고, mongoDB는 사내 타 서버에 구축되어있는 상황
- Ubuntu 20.04
- python 3.10으로 셋팅한 Docker container 내부에서 작업 (다운그레이드한 3.8 환경 추가로 구축)
- pyspark 3.5.4
- mongo-spark-connector 2.12-10.2.2
- bson 4.8.2
- mongo-driver 4.8.2
Problems&Result
- 기존에는 간단하게 pymongo 를 사용해 데이터의 날짜로 구간을 설정하여 불러오고, 불러온 데이터를 pandas DataFrame으로 변환하는 함수를 사용
- 데이터 수집 단위가 촘촘해 일주일치의 데이터를 불러오는데 약 5분이 소요되는 상황이 발생함.
- 로딩 시간 개선을 위한 GPT 의 답변 중 Dask, connectorx는 사용 의도와 맞지 않음
최종적으로 pyspark 를 활용하여 mongoDB로부터 대용량의 데이터 로딩을 위해 사용한 코드는 아래와 같습니다. 주석은 지우고 사용해주세요.
connection_uri = "mongodb://{id}:{pwd}@{ip}:{port}/{db}?{options}" # 사용자가 사용하려는 db의 아이디, 패스워드, db 이름 및 옵션 설정 필요
spark = SparkSession.builder \
.appName("myApp") \
.master("local[*]") \
# 버전 확인 필수, spark-submit 설정 시 별도의 config 설정 필요 없음
.config("spark.jars", "mongo-spark/downloads/connector_2.12-10.2.2.jar, \
mongo-spark/downloads/bson-4.8.2.jar, \
mongo-spark/downloads/bson-record-codec-4.8.2.jar, \
mongo-spark/downloads/driver-core-4.8.2.jar, \
mongo-spark/downloads/sync-4.8.2.jar") \
.config("spark.mongodb.read.connection.uri", connection_uri) \
.config("spark.mongodb.input.partitionerOptions.timeoutMS", "120000") \
.getOrCreate()
df = spark.read.format("mongodb") \
.option("collection", "collection name") \ # db 내 조회할 collection 이름
.option("pipeline", "query") \ # 적용할 query 입력
.load()
Errors&Solve (with English translation)
코드가 그리 복잡하지는 않지만 생각보다 수많은 에러를 만나게 되어 기록합니다.
An error occurred while calling o35.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND]
DB 와의 연결이 제대로 안되었거나 spark.read.format 부분에서 설정이 잘못됨
Improper connection to the database or incorrect settings in the spark.read.format section
An error occurred while calling o34.load.
: com.mongodb.spark.sql.connector.exceptions.ConfigException: Invalid connection string: 'mongodb://id:pwd@ip:options'
An error in the connection_uri
An error occurred while calling o34.load.
: com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='infra', source='name', password=<hidden>, mechanismProperties={}}
authenticating, 즉 접근 권한으로 인해 생긴 문제. uri 맨 뒤에 "?authSource=admin&authMechanism=SCRAM-SHA-256" 추가
Issue of acess permissions. Add ?authSource=admin&authMechanism=SCRAM-SHA-256 to the end of URI
An error occurred while calling o122.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms
데이터 로딩 시 설정된 timeout limit에 걸린 것. uri 맨 뒤에 "connectTimeoutMS=120000&socketTimeoutMS=60000" 추가, uri 뒤에 여러 옵션 변경이 필요한 경우 & 로 이어줌.
Data loading exceeded the configured timeout limit.Add connectTimeoutMS=120000&socketTimeoutMS=60000 to the end of URI. When you need to add multiple options, join them with an &
Py4JJavaError: An error occurred while calling o34.load.
: java.lang.NoSuchMethodError: com.mongodb.client.MongoClient.getClusterDescription()Lcom/mongodb/connection/ClusterDescription;
at com.mongodb.spark.connection.MongoClientCache.$anonfun$logClient$1(MongoClientCache.scala:161)
method 를 찾을 수 없음, scala 혹은 spark 버전 확인 필요
A method could not be found; you should check Scala or Spark version dependency.
Py4JJavaError: An error occurred while calling o49.load.
: com.mongodb.spark.sql.connector.exceptions.ConfigException: Missing configuration
config 중에 값이 빠짐, client, collection 값이 둘 다 들어갔는지 확인 필요
Some configuration values might be missing; verify that both the client and collection values are properly provided
Py4JJavaError: An error occurred while calling o41.showString.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(Lorg/apache/spark/sql/types/StructType;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
at com.mongodb.spark.sql.connector.schema.RowToInternalRowFunction.<init>(RowToInternalRowFunction.java:40)
데이터 로드 완료 후df.show()로 데이터 확인하려고 했는데 method를 찾을 수 없다고 함, python 3.8 로 다운그레이드 후 해결
After loading the data, an error occurred when calling df.show() stating that a method could not be found.Downgrading to Python 3.8.
생각보다 챗gpt나 stackoverflow 로부터 얻을 수 있는 정보가 많이 없어서 기록용으로 남깁니다. 저와 비슷한 에러를 겪으시는 분들께 도움이 되었으면 합니다.
'Study > 시행착오' 카테고리의 다른 글
[해결] 기계식 키보드 맥 셋팅 (0) | 2024.06.10 |
---|---|
[해결] ImportError: cannot import name 'NDArray' from 'numpy.typing' (0) | 2024.04.24 |
[해결] Watchdog caught collective operation timeout, Multi-GPU (DDP) NCCL 타임아웃 시간 설정 (0) | 2023.06.23 |
[ongoing] 깃허브 키체인 항목 접근 권한 변경에 대한 암호 입력 (1) | 2023.05.08 |
[해결] CUDA capability sm_86 is not compatible with the current PyTorch installation. (0) | 2023.04.16 |
공감 감사합니다! 좋은하루 되세요 ✨