У нас вы можете посмотреть бесплатно Building Serverless Data Stream pipeline using Kinesis data streams and Firehose for Snowflake или скачать в максимальном доступном качестве, которое было загружено на ютуб. Для скачивания выберите вариант из формы ниже:
Если кнопки скачивания не
загрузились
НАЖМИТЕ ЗДЕСЬ или обновите страницу
Если возникают проблемы со скачиванием, пожалуйста напишите в поддержку по адресу внизу
страницы.
Спасибо за использование сервиса savevideohd.ru
One of the toughest challenges data professionals face today is streaming data for real-time analytics.
A main barrier to real-time insights remains the complexity of the data itself, where companies do not have the tools and infrastructure to ingest and process structured and unstructured data.
Organizations today need a data warehouse that is able to handle all data types and scale quickly to address growth.
Here is one complete pipeline on Velocity component in Big Data where I have explained how to create Streaming pipeline from scratch
Steps:
-----------
Step 1:Create the Lambda Role
Step 2:Create the Lambda Function to read the data from API Gateway & put in Kinesis Data Stream
Step 3: Create API Gateway & make the integration with AWS Lambda created in Step 2
Step 4:Create the Kinesis Data Stream to consume data from AWS Lambda created in Step 2
Step 5:Create Lambda for processing the data before s3 dump
Step 6:Create firehose Destination s3 bucket
Step 7:Create Kinesis Firehose
Step 8:Create Snowflake Role
Lambda for Step 2:
-------------------------------
import json
import datetime
import random
import boto3
client = boto3.client('kinesis')
def lambda_handler(event, context):
TODO implement
data = json.dumps(event['body'])
client.put_record(StreamName="hellotesting", Data=data, PartitionKey="1")
print("Data Inserted")
Lambda for Firehose Transformation(Step 5):
-------------------------------------------------------------------------
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
print(event)
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "
"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Snowflake Code:
---------------------------
--Specify the role
use role ACCOUNTADMIN;
drop database if exists s3_to_snowflake;
--Database Creation
create database if not exists s3_to_snowflake;
--Specify the active/current database for the session.
use s3_to_snowflake;
--Storage Integration Creation
create or replace storage integration s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '{}'
STORAGE_ALLOWED_LOCATIONS = ('s3://{}')
COMMENT = 'Testing Snowflake getting refresh or not';
--Describe the Integration Object
DESC INTEGRATION s3_int;
--External Stage Creation
create stage mystage
url = 's3://{}'
storage_integration = s3_int;
list @mystage;
--File Format Creation
create or replace file format my_json_format
type = json;
--Table Creation
create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';
--Query the table
select parse_json(VALUE):Age as Age , trim(parse_json(VALUE):Name,'"') as Name from s3_to_snowflake.PUBLIC.Person;
Note:
----------
1)Please delete all used AWS resources if not in use else it will be creating billing!
2)As this is POC , so I gave full access for many roles creation , while moving to Production make sure to provide only that much access which is required!
3)parse_json in Snowflake Interprets an input string as a JSON document, producing a VARIANT value.
Check this playlist for more AWS Projects in Big Data domain:
• Demystifying Data Engineering with Cl...
Know more about AWS Kinesis :
---------------------------------------------------
https://aws.amazon.com/kinesis/