What is Snowflake?
Snowflake is a cloud-native data warehouse that separates storage and compute, allowing you to scale each independently. Unlike traditional data warehouses, Snowflake requires no hardware management, indexing, or tuning.
It runs on AWS, Azure, and GCP, providing a single platform for data warehousing, data lakes, data engineering, and data sharing.
Snowflake Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Snowflake Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Cloud Services Layer │ │
│ │ (Authentication, Metadata, Query Optimization) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Virtual Warehouses (Compute) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ XS │ │ M │ │ XL │ (Scale Up/Down) │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Storage Layer (Cloud Storage) │ │
│ │ S3 / Azure Blob / GCS (Compressed) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Key Benefits: │
│ • Separate scaling of compute and storage │
│ • Pay only for what you use │
│ • Zero management overhead │
│ • Near-unlimited concurrency │
└─────────────────────────────────────────────────────────────────┘
Getting Started
-- Create a database
CREATE DATABASE analytics_db;
USE DATABASE analytics_db;
-- Create a schema
CREATE SCHEMA staging;
CREATE SCHEMA production;
-- Create a warehouse (compute)
CREATE WAREHOUSE etl_wh
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300 -- Suspend after 5 min of inactivity
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3; -- Multi-cluster for concurrency
-- Use the warehouse
USE WAREHOUSE etl_wh;
-- Create a table
CREATE TABLE staging.customers (
customer_id INT,
name VARCHAR(100),
email VARCHAR(200),
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create table with clustering (for large tables)
CREATE TABLE production.events (
event_id VARCHAR(36),
event_type VARCHAR(50),
user_id INT,
event_date DATE,
payload VARIANT
)
CLUSTER BY (event_date, event_type);
Loading Data
-- Create a stage (connection to cloud storage)
CREATE STAGE my_s3_stage
URL = 's3://my-bucket/data/'
CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'xxx');
-- Or use Snowflake internal stage
CREATE STAGE my_internal_stage;
-- List files in stage
LIST @my_s3_stage;
-- Create file format
CREATE FILE FORMAT csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
EMPTY_FIELD_AS_NULL = TRUE;
CREATE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE;
-- COPY INTO - Load data from stage
COPY INTO staging.customers
FROM @my_s3_stage/customers/
FILE_FORMAT = csv_format
PATTERN = '.*\.csv'
ON_ERROR = 'CONTINUE';
-- Load JSON data
COPY INTO staging.events
FROM @my_s3_stage/events/
FILE_FORMAT = json_format;
-- Snowpipe - Continuous data ingestion
CREATE PIPE my_pipe
AUTO_INGEST = TRUE
AS
COPY INTO staging.customers
FROM @my_s3_stage/customers/
FILE_FORMAT = csv_format;
Querying Data
-- Standard SQL queries
SELECT
DATE_TRUNC('month', order_date) AS month,
SUM(amount) AS total_sales,
COUNT(DISTINCT customer_id) AS unique_customers
FROM production.orders
WHERE order_date >= '2024-01-01'
GROUP BY 1
ORDER BY 1;
-- Query semi-structured data (JSON)
SELECT
event_id,
payload:user.name::STRING AS user_name,
payload:user.email::STRING AS email,
payload:items[0].product_id::INT AS first_product
FROM staging.events
WHERE payload:event_type = 'purchase';
-- Flatten nested arrays
SELECT
e.event_id,
item.value:product_id::INT AS product_id,
item.value:quantity::INT AS quantity
FROM staging.events e,
LATERAL FLATTEN(input => e.payload:items) AS item;
-- Window functions
SELECT
customer_id,
order_date,
amount,
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY amount DESC
) AS amount_rank
FROM production.orders;
Time Travel & Fail-Safe
-- Query data as it was at a specific point
SELECT * FROM customers
AT(TIMESTAMP => '2024-12-20 10:00:00'::TIMESTAMP);
-- Query data from X seconds ago
SELECT * FROM customers
AT(OFFSET => -3600); -- 1 hour ago
-- Query specific version
SELECT * FROM customers
BEFORE(STATEMENT => '');
-- Restore a dropped table
DROP TABLE customers;
UNDROP TABLE customers;
-- Clone a table at a point in time
CREATE TABLE customers_backup
CLONE customers
AT(TIMESTAMP => '2024-12-20 10:00:00'::TIMESTAMP);
-- Set retention period (up to 90 days for Enterprise)
ALTER TABLE customers
SET DATA_RETENTION_TIME_IN_DAYS = 30;
Streams & Tasks
-- Create a stream to track changes (CDC)
CREATE STREAM customer_changes
ON TABLE staging.customers;
-- View changes
SELECT * FROM customer_changes;
-- Returns: METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
-- Create a task (scheduled job)
CREATE TASK process_customer_changes
WAREHOUSE = etl_wh
SCHEDULE = 'USING CRON 0 * * * * UTC' -- Every hour
AS
MERGE INTO production.customers t
USING customer_changes s
ON t.customer_id = s.customer_id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'INSERT' THEN UPDATE SET
t.name = s.name,
t.email = s.email
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN INSERT
(customer_id, name, email)
VALUES (s.customer_id, s.name, s.email);
-- Resume the task
ALTER TASK process_customer_changes RESUME;
-- Task dependencies (DAG)
CREATE TASK downstream_task
WAREHOUSE = etl_wh
AFTER process_customer_changes
AS
CALL update_aggregates();
Python Integration
# Install connector
pip install snowflake-connector-python
pip install snowflake-sqlalchemy
# Using Snowflake connector
import snowflake.connector
conn = snowflake.connector.connect(
user='your_user',
password='your_password',
account='your_account', # e.g., 'abc123.us-east-1'
warehouse='etl_wh',
database='analytics_db',
schema='production'
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM customers LIMIT 10")
rows = cursor.fetchall()
for row in rows:
print(row)
cursor.close()
conn.close()
# Using SQLAlchemy
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine(
'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'.format(
user='your_user',
password='your_password',
account='your_account',
database='analytics_db',
schema='production',
warehouse='etl_wh'
)
)
df = pd.read_sql("SELECT * FROM customers", engine)
print(df.head())
# Write DataFrame to Snowflake
df.to_sql('new_table', engine, index=False, if_exists='replace')
Best Practices
- Right-size warehouses: Start small, scale up as needed
- Use auto-suspend: Save costs when warehouse is idle
- Cluster large tables: On frequently filtered columns
- Use stages: For efficient bulk loading
- Leverage streams: For incremental processing
- Separate warehouses: For different workloads (ETL vs reporting)
- Use resource monitors: To control costs
-- Create resource monitor
CREATE RESOURCE MONITOR monthly_limit
WITH CREDIT_QUOTA = 1000
FREQUENCY = MONTHLY
START_TIMESTAMP = IMMEDIATELY
TRIGGERS
ON 75 PERCENT DO NOTIFY
ON 90 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND;
-- Apply to warehouse
ALTER WAREHOUSE etl_wh
SET RESOURCE_MONITOR = monthly_limit;
Master Snowflake
Our Data Engineering program covers Snowflake, cloud warehousing, and modern data platforms.
Explore Data Engineering Program