An open-source set of modules for database migration.
- configs: Loads configuration settings from the
config.yml
file. - csv_loader: Imports data from CSV files into a database.
- data_access: Collection of routines for database access.
- data_quality: Collection of routines for detecting and resolving data quality issues.
- data_migration: Collection of routines for performing database migration.
- system_log: Module responsible for generating execution logs.
First, let's start by installing the module virtualenv (skip this step if you have already installed it):
pip install virtualenv
Now, navigate to your project's folder and run the following command to create the virtual environment:
python<version> -m venv <virtual-environment-name>
If you are using Windows (CMD or Powershell), activate it with:
venv\Scripts\activate
On Linux/Mac (bash/zsh terminal):
source venv/bin/activate
After that, just install the required packages:
pip install -r requirements.txt
To ensure data privacy and prevent the exposure of sensitive database information, the system restricts access to such details. All sensitive data must be stored within a designated directory.
- Create a directory named
private
. - Inside this directory, create a file called
configs.yml
. You can useconfigs-example.yml
as a reference. - Create a directory named
rules
inside theprivate
folder, where the mapping rules between the migrated tables will be defined.
private/
│── rules/
│ │── rule_1.py
│ │── rule_2.py
│── configs.yml
Once all components are configured in the configs.yml file, you can invoke each module individually using the following commands:
cd mapas_migration
To run the CSV loading module:
python -m csv_loader
To run the migration module:
python -m data_migration
Below, we detail each module and how to use them.
The databases_connections
section defines the connection details for multiple databases. Each connection is identified by an alias name and includes the necessary credentials and settings to establish a connection.
databases_connections:
database_alias_name:
host: localhost
port: 5432
user: your_username
database: your_database
password: your_password
schema: public
type: postgresql
another_database:
host: localhost
port: 5432
user: your_username
database: your_database
password: your_password
schema: your_schema
type: postgresql
If needed for some implementation, credential data from the file can be easily retrieved using:
from configs.yaml_manager import load_credentials
# Load database credentials from the configuration file
credentials = load_credentials()
database_alias_name = credentials["database_alias_name"]
print(database_alias_name.user) # your_username
print(database_alias_name.schema) # public
The system_logging
section in the configs.yml
file allows multiple logs to run simultaneously.
system_logging:
console_log:
levels:
- INFO
- ERROR
- WARNING
- DEBUG
By default, there is a simple log implementation called console_log
, which uses Python's print
function to record messages. It is defined in the following class:
class ConsoleLog:
def __init__(self, levels):
self.levels = levels
def record(self, level, msg):
if level in self.levels:
print(f"[{level}] {msg}")
This setup allows filtering log levels as needed. For example, to hide all DEBUG messages, simply comment out that option in the configuration file:
system_logging:
console_log:
levels:
- INFO
- ERROR
- WARNING
# - DEBUG
from system_logging.log_manager import log, Level
log(Level.DEBUG, 'debug message')
log(Level.ERROR, 'error message')
The CSV Loader is a configurable module designed for bulk-loading CSV files into a target database. It allows defining multiple CSV files, specifying target tables, and applying optional transformations such as value replacements.
csv_loader:
target_database: example_db
buffer_size: 10000
bulk_commit: false
csv_files:
- path: path/file_1.csv
target_table: example_table
- path: path/file_2.csv
target_table: another_table
replace_columns_values:
column_1: '[private_data]'
column_2: '[private_data]'
delimiter: ','
quotechar: '"'
encoding: utf-8
-
target_database:
alias name of the target database defined in the databases_connections section. -
buffer_size:
Defines how many records are buffered before insertion. -
bulk_commit:
Determines whether inserts should be committed in bulk. -
csv_files:
List of CSV files with individual configurations.-
path:
Path to the CSV file. -
target_table:
Target table where data will be inserted. -
replace_columns_values:
(Optional) Define columns that can be replaced with default values. Making anonymization easier. -
delimiter:
(Optional) Character used as a delimiter in the CSV file. -
quotechar:
(Optional) Character used to enclose string values. -
encoding:
(Optional) Encoding of the CSV file.
-
To execute the CSV Loader, run:
python -m csv_loader
This will process the CSV files as per the specified configuration in config.yml
file.
The diagram below illustrates the simplified flow of the CSV Loader:
-
Initial Execution
The module starts from thecsv_loader/__main__.py
file. At this point, database credentials and loader configurations are loaded using the following functions:load_credentials()
: Loads the credentials required to connect to the database.load_csv_loader()
: Reads and interprets theconfig.yml
file, retrieving details about the CSV files andtarget_database
.
-
CSV File Processing
Once configurations are loaded, the system calls the main function:csv_importer()
: Iterates over each CSV file listed in the configuration file and processes all its tuples.
-
Data Handling
For each row read from the CSV, the following function is executed:process_row()
: Applies necessary data transformations before insertion into the database.
-
Database Connection
The system uses a factory to establish a connection with the target database:DatabaseFactory.create()
: Creates a facade to interact with the database. Depending on the database type, a specific implementation is returned.- For PostgreSQL, the factory returns a
PostgreSQLFacade
object, which provides the following functions:create_connection()
: Opens a connection to the database.metadata()
: Retrieves columns from the target table for comparisons with the CSV columns.writer()
: Responsible for inserting data using the function:insert()
: Inserts the processed tuples into the target table.
-
Finalization and Commit
After data insertion, two functions ensure that the operation is properly finalized:flush_buffer()
: Ensures that all buffered data is sent to the database.commit()
: Confirms the transaction in the database, making the data persistent.
The Data Migration module is designed to facilitate the transfer of data between databases using SQL queries. It enables defining multiple input queries and specifying target tables for insertion. The source and destination databases must be configured in the databases_connections section of the config.yml
file.
data_migration:
buffer_size: 10000
bulk_commit: false
rules:
rule_1:
inputs:
database_alias_name:
- select * from table_1
- select ... inner join ..
database_1:
- select * from table_2
outputs:
database_2:
- table_3
- table_4
-
buffer_size
: Defines how many records are buffered before insertion. -
bulk_commit
: Determines whether inserts should be committed in bulk. -
rules:
Defines data migration rules, each specifying queries (inputs) and target tables (outputs).inputs:
Lists databases and their corresponding SQL queries to extract data.outputs:
Specifies the destination database and the target tables for inserting the extracted data.
Each migration rule is defined as a Python file inside the private/rules
directory. Each rule must implement a function named exec(inputs, outputs, context)
, where inputs contain the extracted data from SQL queries, and outputs define the target tables for insertion. Multiple rules can be created to handle different migration scenarios, enabling flexible and modular data transformations. The context
dictionary is shared between all rules and can be used to store general information.
Example: private/rules/rule_1.py
def exec(inputs, outputs, context):
The diagram below illustrates the definition of rule_1
presented in the configuration example.
Ensure that the config.yml
file is properly configured before running the migration, as the following command will execute the migration based on the specified settings:
python -m data_migration
def exec(inputs, outputs, context):
input = inputs[0] # select first input
input.create_connection()
reader = input.reader()
for row in reader:
print(row) # print each row result from the query
def exec(inputs, outputs, context):
output = outputs[1] # select first output
output.create_connection()
writer = output.writer()
writer.insert([21, "column_data_1", "column_data_2", 2502.1])
writer.flush_buffer() # flush the buffer
writer.commit() # do not forget to commit the changes
This script is designed to detect the number of new tuples inserted into your databases. It is particularly useful during data migration processes, as it provides a simple way to monitor changes across your tables.
The new_data_sensor
script performs the following tasks:
-
Initial Run: When executed for the first time, the script saves a snapshot in your
private
folder containing the current number of tuples for each table in your database. -
Subsequent Runs: After performing data insertions, running the script again will compare the current state against the saved snapshot and display the tables along with the count of newly inserted tuples in the console.
To run the script using the default configuration file:
python -m data_quality.new_data_sensor -c private/configs.yml
If you prefer to use a different configuration file, pass your custom file as an argument:
python -m data_quality.new_data_sensor -c private/MY_CONFIGS_FILE.yml