Files
mysql2postgres/src/transformers/schema_transformer.py
alex 49dbd98bff fix: Add last_completed_partition column to migration_state table schema
The migration_state table was missing the last_completed_partition column
that was referenced in the migration update queries. This column tracks
which partition was last completed to enable accurate resume capability.

To apply this change to existing databases:
  ALTER TABLE migration_state ADD COLUMN last_completed_partition VARCHAR(255);

For new databases, the table will be created with the column automatically.
2025-12-26 11:39:30 +01:00

171 lines
4.9 KiB
Python

"""PostgreSQL schema creation from MySQL structure."""
from config import PARTITION_YEARS
from src.utils.logger import get_logger
logger = get_logger(__name__)
def create_rawdatacor_schema() -> str:
"""Create PostgreSQL schema for RAWDATACOR table.
Returns:
SQL script to create the table with partitions
"""
sql = """
-- Create sequence for id auto-increment
CREATE SEQUENCE IF NOT EXISTS rawdatacor_id_seq;
-- Create RAWDATACOR table with partitioning
-- Note: node_num is stored in measurements JSONB, not as a separate column
CREATE TABLE IF NOT EXISTS rawdatacor (
id BIGINT NOT NULL DEFAULT nextval('rawdatacor_id_seq'),
unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL,
event_timestamp TIMESTAMP NOT NULL,
bat_level NUMERIC(4,2) NOT NULL,
temperature NUMERIC(5,2) NOT NULL,
measurements JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
bat_level_module NUMERIC(4,2),
temperature_module NUMERIC(5,2),
rssi_module INTEGER
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp));
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id auto-increment.
-- Create partitions for each year
"""
# Add partition creation statements
for year in PARTITION_YEARS:
next_year = year + 1
sql += f"""
CREATE TABLE IF NOT EXISTS rawdatacor_{year}
PARTITION OF rawdatacor
FOR VALUES FROM ({year}) TO ({next_year});
"""
# Add default partition for records outside the defined year range
sql += """
CREATE TABLE IF NOT EXISTS rawdatacor_default
PARTITION OF rawdatacor
DEFAULT;
"""
# Add indexes
sql += """
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_raw
ON rawdatacor(unit_name, tool_name_id, event_timestamp);
CREATE INDEX IF NOT EXISTS idx_unit_tool_raw
ON rawdatacor(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw
ON rawdatacor USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw
ON rawdatacor(event_timestamp);
"""
return sql
def create_elabdatadisp_schema() -> str:
"""Create PostgreSQL schema for ELABDATADISP table.
Returns:
SQL script to create the table with partitions
"""
sql = """
-- Create sequence for id_elab_data auto-increment
CREATE SEQUENCE IF NOT EXISTS elabdatadisp_id_seq;
-- Create ELABDATADISP table with partitioning
-- Note: node_num, state, and calc_err are stored in measurements JSONB, not as separate columns
CREATE TABLE IF NOT EXISTS elabdatadisp (
id_elab_data BIGINT NOT NULL DEFAULT nextval('elabdatadisp_id_seq'),
unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL,
event_timestamp TIMESTAMP NOT NULL,
measurements JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp));
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment.
-- Create partitions for each year
"""
# Add partition creation statements
for year in PARTITION_YEARS:
next_year = year + 1
sql += f"""
CREATE TABLE IF NOT EXISTS elabdatadisp_{year}
PARTITION OF elabdatadisp
FOR VALUES FROM ({year}) TO ({next_year});
"""
# Add default partition for records outside the defined year range
sql += """
CREATE TABLE IF NOT EXISTS elabdatadisp_default
PARTITION OF elabdatadisp
DEFAULT;
"""
# Add indexes
sql += """
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_elab
ON elabdatadisp(unit_name, tool_name_id, event_timestamp);
CREATE INDEX IF NOT EXISTS idx_unit_tool_elab
ON elabdatadisp(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab
ON elabdatadisp USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab
ON elabdatadisp(event_timestamp);
"""
return sql
def create_migration_state_table() -> str:
"""Create table to track migration state.
Returns:
SQL to create migration_state table
"""
sql = """
-- Create table to track migration state
CREATE TABLE IF NOT EXISTS migration_state (
table_name VARCHAR(255) PRIMARY KEY,
last_migrated_timestamp TIMESTAMP,
last_migrated_id BIGINT,
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
migration_completed_at TIMESTAMP,
total_rows_migrated BIGINT DEFAULT 0,
status VARCHAR(32) DEFAULT 'pending',
last_completed_partition VARCHAR(255)
);
"""
return sql
def get_full_schema_script() -> str:
"""Get complete schema creation script for PostgreSQL.
Returns:
Full SQL script to create all tables and indexes
"""
return (
create_rawdatacor_schema() +
"\n\n" +
create_elabdatadisp_schema() +
"\n\n" +
create_migration_state_table()
)