새소식

Study/시행착오

[해결] pyspark로 mongoDB에서 데이터 불러오기

  • -
728x90

출처 https://thecodebuzz.com/python-pyspark-connect-to-mongodb/

안녕하세요, 이번에 대용량 시계열 데이터를 분석하기 시작하면서 겪었던 문제점과 해결 방안에 대해 남기기 위한 로그입니다. 저와 같은 문제를 겪은 국내/외 개발자들은 많았으나 에러가 왜 생겼는지, 해결 방법은 무엇인지 한 군데에서 깔끔하게 볼 수 있는 페이지는 없어 정리합니다.


Environment

* 사내 보안망으로 분리된 원격 서버 위에서 데이터 로딩이 필요했고, mongoDB는 사내 타 서버에 구축되어있는 상황

  • Ubuntu 20.04
  • python 3.10으로 셋팅한 Docker container 내부에서 작업 (다운그레이드한 3.8 환경 추가로 구축)
  • pyspark 3.5.4
  • mongo-spark-connector 2.12-10.2.2
  • bson 4.8.2
  • mongo-driver 4.8.2

Problems&Result

  1. 기존에는 간단하게 pymongo 를 사용해 데이터의 날짜로 구간을 설정하여 불러오고, 불러온 데이터를 pandas DataFrame으로 변환하는 함수를 사용
  2. 데이터 수집 단위가 촘촘해 일주일치의 데이터를 불러오는데 약 5분이 소요되는 상황이 발생함. 
  3. 로딩 시간 개선을 위한 GPT 의 답변 중 Dask, connectorx는 사용 의도와 맞지 않음

최종적으로 pyspark 를 활용하여 mongoDB로부터 대용량의 데이터 로딩을 위해 사용한 코드는 아래와 같습니다. 주석은 지우고 사용해주세요. 

connection_uri = "mongodb://{id}:{pwd}@{ip}:{port}/{db}?{options}" # 사용자가 사용하려는 db의 아이디, 패스워드, db 이름 및 옵션 설정 필요

spark = SparkSession.builder \
    .appName("myApp") \
    .master("local[*]") \
    # 버전 확인 필수, spark-submit 설정 시 별도의 config 설정 필요 없음
    .config("spark.jars", "mongo-spark/downloads/connector_2.12-10.2.2.jar, \ 
                           mongo-spark/downloads/bson-4.8.2.jar, \
                           mongo-spark/downloads/bson-record-codec-4.8.2.jar, \
                           mongo-spark/downloads/driver-core-4.8.2.jar, \
                           mongo-spark/downloads/sync-4.8.2.jar") \
    .config("spark.mongodb.read.connection.uri", connection_uri) \
    .config("spark.mongodb.input.partitionerOptions.timeoutMS", "120000") \
    .getOrCreate()

df = spark.read.format("mongodb") \
    .option("collection", "collection name") \ # db 내 조회할 collection 이름
    .option("pipeline", "query") \ # 적용할 query 입력
    .load()

Errors&Solve (with English translation)

코드가 그리 복잡하지는 않지만 생각보다 수많은 에러를 만나게 되어 기록합니다. 

 An error occurred while calling o35.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND]

 DB 와의 연결이 제대로 안되었거나 spark.read.format 부분에서 설정이 잘못됨 

Improper connection to the database or incorrect settings in the spark.read.format section

An error occurred while calling o34.load.
: com.mongodb.spark.sql.connector.exceptions.ConfigException: Invalid connection string: 'mongodb://id:pwd@ip:options'
 connection_uri 에 오류 있음

An error in the connection_uri

An error occurred while calling o34.load.
: com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='infra', source='name', password=<hidden>, mechanismProperties={}}

authenticating, 즉 접근 권한으로 인해 생긴 문제. uri 맨 뒤에 "?authSource=admin&authMechanism=SCRAM-SHA-256" 추가

Issue of acess permissions. Add ?authSource=admin&authMechanism=SCRAM-SHA-256 to the end of URI

An error occurred while calling o122.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms

데이터 로딩 시 설정된 timeout limit에 걸린 것. uri 맨 뒤에 "connectTimeoutMS=120000&socketTimeoutMS=60000" 추가, uri 뒤에 여러 옵션 변경이 필요한 경우 & 로 이어줌.

Data loading exceeded the configured timeout limit.Add connectTimeoutMS=120000&socketTimeoutMS=60000 to the end of URI. When you need to add multiple options, join them with an &

Py4JJavaError: An error occurred while calling o34.load.
: java.lang.NoSuchMethodError: com.mongodb.client.MongoClient.getClusterDescription()Lcom/mongodb/connection/ClusterDescription;
	at com.mongodb.spark.connection.MongoClientCache.$anonfun$logClient$1(MongoClientCache.scala:161)

method 를 찾을 수 없음, scala 혹은 spark 버전 확인 필요

A method could not be found; you should check Scala or Spark version dependency.

Py4JJavaError: An error occurred while calling o49.load.
: com.mongodb.spark.sql.connector.exceptions.ConfigException: Missing configuration

config 중에 값이 빠짐, client, collection 값이 둘 다 들어갔는지 확인 필요

Some configuration values might be missing; verify that both the client and collection values are properly provided

Py4JJavaError: An error occurred while calling o41.showString.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(Lorg/apache/spark/sql/types/StructType;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
	at com.mongodb.spark.sql.connector.schema.RowToInternalRowFunction.<init>(RowToInternalRowFunction.java:40)

데이터 로드 완료 후df.show()로 데이터 확인하려고 했는데 method를 찾을 수 없다고 함, python 3.8 로 다운그레이드 후 해결

After loading the data, an error occurred when calling df.show() stating that a method could not be found.Downgrading to Python 3.8.


 생각보다 챗gpt나 stackoverflow 로부터 얻을 수 있는 정보가 많이 없어서 기록용으로 남깁니다. 저와 비슷한 에러를 겪으시는 분들께 도움이 되었으면 합니다. 

728x90
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.