Big Data — Working with Massive Datasets
Big data refers to datasets that are too large or too fast-moving to handle with traditional tools. Working with them requires distributed systems and specialised engineering.
The 5 Vs of Big Data
- Volume — terabytes to petabytes.
- Velocity — data arrives continuously.
- Variety — text, images, logs, sensor data.
- Veracity — quality and trust.
- Value — actionable insight.
Technology Stack
HDFS, Apache Spark, Kafka, Flink, Snowflake, BigQuery, Databricks.
Code Examples: Big Data (5 runnable snippets)
Copy any block into a file or notebook and run it end-to-end — each example stands alone.
Example 1: BigQuery aggregation with cost awareness
# Example 1: BigQuery aggregation with cost awareness -- Big Data
from google.cloud import bigquery
client = bigquery.Client(project="my-analytics-project")
query = """
SELECT
DATE(event_time) AS day,
COUNTIF(event = 'signup') AS signups,
COUNTIF(event = 'purchase') AS purchases,
SAFE_DIVIDE(COUNTIF(event = 'purchase'),
COUNTIF(event = 'signup')) AS conversion
FROM `my_project.analytics.events`
WHERE event_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY day
ORDER BY day;
"""
job = client.query(query)
df = job.to_dataframe()
print(df.head(10))
print(f"bytes scanned: {job.total_bytes_processed/1e9:.2f} GB")
Example 2: Azure blob download with managed identity
# Example 2: Azure blob download with managed identity -- Big Data
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
account_url = "https://mydatalake.blob.core.windows.net"
credential = DefaultAzureCredential()
service = BlobServiceClient(account_url, credential=credential)
container = service.get_container_client("raw-events")
for blob in container.list_blobs(name_starts_with="2026/04/"):
print(blob.name, blob.size)
client = container.get_blob_client(blob.name)
with open(f"/tmp/{blob.name.rsplit('/', 1)[-1]}", "wb") as f:
f.write(client.download_blob().readall())
Example 3: Kubernetes job manifest for a batch-scoring run
apiVersion: batch/v1
kind: Job
metadata:
name: nightly-scoring
labels:
app: risk-scorer
spec:
backoffLimit: 2
ttlSecondsAfterFinished: 3600
template:
spec:
restartPolicy: OnFailure
serviceAccountName: risk-scorer-sa
containers:
- name: scorer
image: ghcr.io/example/risk-scorer:1.14.0
args: ["--date", "$(date +%F)", "--output", "s3://ml-outputs/"]
resources:
requests: { cpu: "1", memory: "2Gi" }
limits: { cpu: "4", memory: "8Gi" }
env:
- name: MODEL_URI
value: "s3://ml-registry/risk/v3.2.1/model.joblib"
- name: LOG_LEVEL
value: "INFO"
Example 4: Terraform module for a managed Postgres database
terraform {
required_version = ">= 1.6"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.50" }
}
}
resource "aws_db_subnet_group" "analytics" {
name = "analytics-db-subnets"
subnet_ids = var.private_subnet_ids
}
resource "aws_db_instance" "analytics" {
identifier = "analytics-warehouse"
engine = "postgres"
engine_version = "16.2"
instance_class = "db.m6g.large"
allocated_storage = 100
storage_type = "gp3"
storage_encrypted = true
db_name = "analytics"
username = var.db_username
password = var.db_password
db_subnet_group_name = aws_db_subnet_group.analytics.name
vpc_security_group_ids = [aws_security_group.db.id]
backup_retention_period = 14
deletion_protection = true
performance_insights_enabled = true
tags = { Environment = "prod", Team = "data" }
}
output "db_endpoint" { value = aws_db_instance.analytics.endpoint }
Example 5: S3 upload with retries and listing
# Example 5: S3 upload with retries and listing -- Big Data
import boto3
from botocore.config import Config
s3 = boto3.client(
"s3",
config=Config(retries={"max_attempts": 5, "mode": "standard"}),
)
bucket = "my-datalake-staging"
prefix = "exports/2026/"
s3.upload_file(
"features.parquet", bucket, prefix + "features.parquet",
ExtraArgs={"ServerSideEncryption": "AES256"},
)
total = 0
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
total += obj["Size"]
print(obj["Key"], obj["Size"])
print(f"total bytes in {prefix}: {total:,}")