Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/terraform.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: Terraform CI/CD

on:
pull_request:
branches: [ main ]
push:
branches: [ main ]

permissions:
contents: read
id-token: write # Needed for OIDC auth to AWS

jobs:
terraform:
name: Terraform Pipeline
runs-on: ubuntu-latest

env:
AWS_REGION: us-east-2
TF_VERSION: 1.13.3

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}

- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.AWS_REGION }}

- name: Terraform Format Check
run: terraform fmt -check -recursive

- name: Terraform Init
run: terraform init

- name: Terraform Validate
run: terraform validate -no-color

- name: Terraform Plan
if: github.event_name == 'pull_request'
run: terraform plan -no-color -input=false
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lambda_venv
87 changes: 87 additions & 0 deletions Readme1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Serverless ETL Pipeline with Terraform

This project provisions a fully serverless data ingestion and analytics system on AWS using Terraform.
It creates the following components:

* Amazon S3 – to store uploaded CSV files
* AWS Lambda – triggered by S3 PUT events, aggregates sales data and inserts into PostgreSQL
* Amazon RDS (PostgreSQL) – stores aggregated sales results
* Amazon API Gateway – exposes a REST endpoint to retrieve the aggregated data
* Networking (VPC, Subnets, Security Groups) – ensures resources are deployed in private subnets
* IAM Roles & Policies – provide secure and least-privilege access between services

## Diagram

![Diagram](images/image.png)


## Folder Structure

```
lambda.zip/ # lambda.zip(it can be versioned for future changes)
├── main.tf # Root module to call all submodules
├── provider.tf # AWS provider configuration and region setup
├── lambda_iam_role.tf # Iam role with perssions for lambda
├── modules/
│ ├── api_gateway/ # Module API Gateway setup and Lambda integration
│ │ ├── main.tf
│ │ └── variables.tf
│ ├── lambda_s3/ # Module Lambda function
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── network/ # Module VPC, subnets, route tables, and NAT gateway setup
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── rds/ # Module RDS instance and subnet group
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ └── security/ # Module Security groups
│ ├── main.tf
│ ├── outputs.tf
│ └── variables.tf
```

# Module and folders* Overview
| Module/Folder | Description |
|:-------:| :----------:|
|network | Creates a VPC with public and private subnets across 2 Availability Zones. |
| security | Manages security groups for Lambda and RDS access.|
| lambda_s3 | Builds and deploys the Lambda function from an S3 zip package.|
| rds | Deploys a PostgreSQL RDS instance in private subnets.|
| api_gateway | Configures an API Gateway to expose the Lambda endpoint.|
| lambda* | Configurates the lambda function and its requirements|
| data* | Contains a csv file for testing and sql script to create the table.|

## Package Lambda Function

Create virtual environment with the corresponding engine python3.11 for this case
```
python3.11 -m venv venv
source venv/bin/activate
```

Install requirements
```
cd lambda
pip install requirements.txt -t .
```

Create deployment package
```
zip -r9 ../lambda.zip .
cd ..
```

## Testing

Upload file:
```
aws s3 cp sales.csv s3://my-private-bucket-terraform-test-nanlab1/
```

Calling API gateway:

![API](images/api.png)
10 changes: 10 additions & 0 deletions data/sales.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
date,product,quantity,price
2025-10-01,ProductA,10,15.5
2025-10-01,ProductB,5,20.0
2025-10-01,ProductB,8,30.0
2025-10-02,ProductA,8,15.5
2025-10-02,ProductA,8,15.8
2025-10-02,ProductC,12,30.0
2025-10-03,ProductB,7,20.0
2025-10-03,ProductC,10,80.0
2025-10-03,ProductC,3,30.0
7 changes: 7 additions & 0 deletions data/sales_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS public.sales_agg (
sale_date DATE NOT NULL,
product TEXT NOT NULL,
total_quantity INT NOT NULL,
total_amount NUMERIC(10,2) NOT NULL,
PRIMARY KEY (sale_date, product)
);
Binary file added images/api.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added lambda.zip
Binary file not shown.
99 changes: 99 additions & 0 deletions lambda/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import io
import csv
import pg8000.native
import boto3
import os
from collections import defaultdict

# Environment variables set in Lambda
DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
DB_USER = os.environ["DB_USER"]
DB_PASSWORD = os.environ["DB_PASSWORD"]
DB_PORT = int(os.environ.get("DB_PORT", 5432))

s3_client = boto3.client("s3")

def handler(event, context):
try:
# Check if this Lambda is invoked via S3 or API Gateway
if "Records" in event: # S3 trigger
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]

response = s3_client.get_object(Bucket=bucket, Key=key)
file_content = response["Body"].read().decode("utf-8")

reader = csv.DictReader(io.StringIO(file_content))
aggregation = defaultdict(lambda: {"total_quantity": 0, "total_amount": 0.0})

print(f"Aggrr {aggregation}")
for row in reader:
sale_date = row["date"]
product = row["product"]
quantity = int(row["quantity"])
amount = float(row["price"])
key_agg = (sale_date, product)
aggregation[key_agg]["total_quantity"] += quantity
aggregation[key_agg]["total_amount"] += amount*quantity

# Connect to Postgres
conn = pg8000.native.Connection(
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
port=DB_PORT
)

# Insert/Update aggregated data
for (sale_date, product), values in aggregation.items():
conn.run(
"""
INSERT INTO sales_agg (sale_date, product, total_quantity, total_amount)
VALUES (:sale_date, :product, :total_quantity, :total_amount)
ON CONFLICT (sale_date, product)
DO UPDATE SET
total_quantity = sales_agg.total_quantity + EXCLUDED.total_quantity,
total_amount = sales_agg.total_amount + EXCLUDED.total_amount;
""",
sale_date=sale_date,
product=product,
total_quantity=values["total_quantity"],
total_amount=values["total_amount"]
)

conn.close()
return {"statusCode": 200, "body": "CSV processed successfully."}

else: # API Gateway trigger
conn = pg8000.native.Connection(
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
port=DB_PORT
)
result = conn.run("SELECT * FROM sales_agg;")
columns = [desc[0] for desc in conn.run("SELECT column_name FROM information_schema.columns WHERE table_name='sales_agg'")]

# Map tuples to dicts
res = []
for row in result:
res.append({columns[0]: str(row[0]),
columns[1]: str(row[1]),
columns[2]: str(row[2]),
columns[3]: str(row[3])})

conn.close()

return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(res)
}

except Exception as e:
return {"statusCode": 500, "body": str(e)}
1 change: 1 addition & 0 deletions lambda/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pg8000
52 changes: 52 additions & 0 deletions lambda_iam_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# IAM Role for Lambda
resource "aws_iam_role" "lambda_exec" {
name = "lambda_exec_role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.lambda_exec.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "lambda_s3" {
role = aws_iam_role.lambda_exec.name
policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}

# Inline policy for VPC and RDS permissions
resource "aws_iam_role_policy" "lambda_vpc_access" {
name = "lambda-vpc-access"
role = aws_iam_role.lambda_exec.id

policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "NetworkInterfaceManagement"
Effect = "Allow"
Action = [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
]
Resource = "*"
},
{
Sid = "RDSConnection"
Effect = "Allow"
Action = [
"rds-db:connect"
]
Resource = "*"
}
]
})
}
Loading