PySpark Snowflake Data Warehouse Operasi Baca Tulis – Part2 (Baca-Tulis) – Menuju AI – Teknologi, Sains, dan Teknik Terbaik

Penulis: Vivek Chaudhary

Pemrograman

PySpark Snowflake Data Warehouse Operasi Baca Tulis – Part2 (Baca-Tulis)

Tujuan dari cerita ini adalah untuk membangun pemahaman tentang operasi Baca dan Tulis pada tabel gudang Data Snowflake menggunakan Apache Spark API, Pyspark. Sebagai kelanjutan dari blog saya sebelumnya, tautan yang dibagikan di bawah ini, dari operasi Baca Kepingan Salju PySpark, ini adalah blog saya saat ini dan saya telah membahas kasus penggunaan untuk melakukan operasi tulis pada tabel Database Kepingan Salju.

Part1 bisa b

PySpark Snowflake Data Warehouse Operasi Baca Tulis – Part1 (Hanya Baca)

Di blog ini hanya untuk membuat segalanya lebih beragam atau real-time, di mana kami memiliki banyak sumber, saya telah menggunakan sumber data yang berbeda seperti file Apache Parquet yang ada di HDFS (diinstal pada sistem lokal), Oracle Database. Kami akan mengekstrak data melakukan transformasi sederhana pada dataset dan menulis yang sama ke Snowflake DB.

Spark Connectivity dan Import Dataset import pyspark
dari pyspark.sql impor SparkSession
print (‘modules import’) spark = SparkSession.builder.appName (‘Pyspark_snowflake’). getOrCreate ()
print (‘aplikasi dibuat’) #snowflake pengaturan properti spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession (spark._jvm.org.apache.spark.sql.SparkSession.builder (). getOrCreate ())

Impor file Parket dari HDFS lokal:

Parket di HDFS
emp_df = spark.read.parquet (r’hdfs: // localhost: 9000 / learning / emp ‘)
emp_df.show (15)
Data Emp

Impor catatan Tabel Database Oracle:

#
dept_df = spark.read.format (‘jdbc’). option (‘url’, ‘jdbc: oracle: thin: scott /[email protected]//localhost:1522/oracle’).option(‘dbtable ‘,’ dept ‘). option (‘ user ‘,’ scott ‘). option (‘ password ‘,’ scott ‘). option (‘ driver ‘,’ oracle.jdbc.driver.OracleDriver ‘). load () dept_df.show ()
Dept Data

2. Transformasi Data

Dalam Transformasi Data, kami akan melakukan transformasi sederhana seperti mengganti nama kolom dan menggabungkan kumpulan data, karena ruang lingkup cerita ini adalah untuk memahami konektivitas dengan Snowflake.

#rename Dataframe kolom emp_df = emp_df.withColumnRenamed (‘DEPTNO’, ‘DEPTNO_E’) #gabung dengan emp dan dataset dept join_df = dept_df.join (emp_df, emp_df.DEPTNO_E == dept_df.DEPTNO, how = ‘inner’) #c.DEPTNO, how = ‘inner’) #c.DEPTNO final dataframe final_df = join_df.select (‘EMPNO’, ‘ENAME’, ‘SAL’, ‘DEPTNO’, ‘DNAME’)
final_df.show ()
Set Data Akhir

3. Pengaturan kepingan salju

#set properti kepingan salju di bawah ini untuk konektivitas sfOptions = {
“SfURL”: “wa29709.ap-south-1.aws.snowflakecomputing.com”,
“SfAccount”: “xxxxxxx”,
“SfUser”: “xxxxxxxxx”,
“SfPassword”: “xxxxxxxx”,
“SfDatabase”: “learning_db”,
“SfSchema”: “publik”,
“SfWarehouse”: “compute_wh”,
“SfRole”: “sysadmin”,
}

SNOWFLAKE_SOURCE_NAME = “net.snowflake.spark.snowflake”

Buat tabel target Snowflake menggunakan skrip di bawah ini:

buat tabel emp_dept (empno integer,
ename string,
akan berintegrasi,
deptno integer,
string nama d);
Meja Kepingan Salju

4. Muat DataFrame Pyspark ke target Snowflake

#pyspark dataframe to snowflake final_df.write.format (“snowflake”). options (** sfOptions) .option (“dbtable”, “emp_dept”). mode (‘append’). options (header = True) .save ( )

Validasi data dalam kepingan salju menggunakan SnowSQL:

Validasi data

Kami berhasil membaca kumpulan data dari berbagai sumber dan memuat bingkai Spark Data ke tabel Snowflake DataBase.

Terima kasih untuk semua telah membaca blog saya. Bagikan pandangan dan umpan balik Anda.

Operasi tulis PySpark Snowflake Data Warehouse – Part2 (Read-Write) awalnya diterbitkan di Towards AI on Medium, di mana orang-orang melanjutkan percakapan dengan menyorot dan menanggapi cerita ini.

Diterbitkan melalui Towards AI