Русские видео

Сейчас в тренде

Иностранные видео


Скачать с ютуб ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering в хорошем качестве

ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering 10 дней назад


Если кнопки скачивания не загрузились НАЖМИТЕ ЗДЕСЬ или обновите страницу
Если возникают проблемы со скачиванием, пожалуйста напишите в поддержку по адресу внизу страницы.
Спасибо за использование сервиса savevideohd.ru



ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering

In this video, I will show , how we can create a ETL pipeline that processes CSV file uploaded to GCS bucket, using Bigquery Spark Stored Procedure and cloud function. This is simple Data Engineering Project use case. Cloud function code: import functions_framework from google.cloud import bigquery Triggered by a change in a storage bucket @functions_framework.cloud_event def hello_gcs(cloud_event): data = cloud_event.data bucket = data["bucket"] name = data["name"] print(f"Bucket: {bucket}") print(f"File: {name}") client=bigquery.Client() query_string="""CALL `{PROJECT_ID}.spark_proc_dataset.spark_proc`("{}","{}")""".format(bucket,name) query_job=client.query(query_string) ------------------------------------- APACHE SPARK STORED PROCEDURE CREATE OR REPLACE PROCEDURE spark_proc_dataset.spark_proc(bucket STRING, file STRING) WITH CONNECTION `{CONNECTION_ID}` OPTIONS(engine="SPARK",runtime_version="2.1") LANGUAGE PYTHON AS R''' import json import os from pyspark.sql import SparkSession from pyspark.sql.functions import col,count spark=SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() bucket_name=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.bucket"])) file=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.file"])) file_uri='gs://{}/{}'.format(bucket_name,file) customers=spark.read.csv(file_uri,inferSchema=True,header=True) customers_filtered=customers.filter(~col("Country").isin(["Tuvalu"])) customers_agg=customers_filtered.groupby("Country").agg(count("Customer_Id").alias("Customer_count")) customers_agg.write.mode("append").format("bigquery").option("temporaryGcsbucket","spark_bq_temp_321").save("output_dataset.customer_agg") '''

Comments