refactor: Remove legacy consolidation methods from MySQLConnector
Remove unused fetch_all_rows() and fetch_rows_ordered_for_consolidation() methods. These were part of the old migration strategy before partition-based consolidation. The current implementation uses fetch_consolidation_groups_from_partition() which handles keyset pagination and consolidation group buffering more efficiently. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -71,57 +71,6 @@ class MySQLConnector:
|
|||||||
logger.error(f"Failed to get row count for {table}: {e}")
|
logger.error(f"Failed to get row count for {table}: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def fetch_all_rows(
|
|
||||||
self,
|
|
||||||
table: str,
|
|
||||||
batch_size: Optional[int] = None
|
|
||||||
) -> Generator[List[Dict[str, Any]], None, None]:
|
|
||||||
"""Fetch all rows from a table in batches.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table: Table name
|
|
||||||
batch_size: Number of rows per batch (uses config default if None)
|
|
||||||
|
|
||||||
Yields:
|
|
||||||
Batches of row dictionaries
|
|
||||||
"""
|
|
||||||
if batch_size is None:
|
|
||||||
batch_size = self.settings.migration.batch_size
|
|
||||||
|
|
||||||
offset = 0
|
|
||||||
max_retries = 3
|
|
||||||
|
|
||||||
while True:
|
|
||||||
retries = 0
|
|
||||||
while retries < max_retries:
|
|
||||||
try:
|
|
||||||
with self.connection.cursor() as cursor:
|
|
||||||
query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s"
|
|
||||||
cursor.execute(query, (batch_size, offset))
|
|
||||||
rows = cursor.fetchall()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
return
|
|
||||||
|
|
||||||
yield rows
|
|
||||||
offset += len(rows)
|
|
||||||
break # Success, exit retry loop
|
|
||||||
|
|
||||||
except pymysql.Error as e:
|
|
||||||
retries += 1
|
|
||||||
if retries >= max_retries:
|
|
||||||
logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}")
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}")
|
|
||||||
# Reconnect and retry
|
|
||||||
try:
|
|
||||||
self.disconnect()
|
|
||||||
self.connect()
|
|
||||||
except Exception as reconnect_error:
|
|
||||||
logger.error(f"Failed to reconnect: {reconnect_error}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def fetch_rows_since(
|
def fetch_rows_since(
|
||||||
self,
|
self,
|
||||||
table: str,
|
table: str,
|
||||||
@@ -219,78 +168,6 @@ class MySQLConnector:
|
|||||||
logger.error(f"Failed to fetch rows from {table}: {e}")
|
logger.error(f"Failed to fetch rows from {table}: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def fetch_rows_ordered_for_consolidation(
|
|
||||||
self,
|
|
||||||
table: str,
|
|
||||||
start_id: Optional[int] = None,
|
|
||||||
batch_size: Optional[int] = None
|
|
||||||
) -> Generator[List[Dict[str, Any]], None, None]:
|
|
||||||
"""Fetch rows using keyset pagination for efficient consolidation.
|
|
||||||
|
|
||||||
Fetches rows by ID for fast, efficient keyset pagination.
|
|
||||||
Rows are then sorted by consolidation key in Python after fetching to ensure
|
|
||||||
all nodes with the same (Unit, Tool, Date, Time) stay together in the same batch.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table: Table name ('RAWDATACOR' or 'ELABDATADISP')
|
|
||||||
start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning
|
|
||||||
batch_size: Number of rows per batch (uses config default if None)
|
|
||||||
|
|
||||||
Yields:
|
|
||||||
Batches of row dictionaries
|
|
||||||
"""
|
|
||||||
if table not in ("RAWDATACOR", "ELABDATADISP"):
|
|
||||||
raise ValueError(f"Consolidation ordering only supported for RAWDATACOR and ELABDATADISP, got {table}")
|
|
||||||
|
|
||||||
if batch_size is None:
|
|
||||||
batch_size = self.settings.migration.batch_size
|
|
||||||
|
|
||||||
# Determine the ID column name based on table
|
|
||||||
id_column = "idElabData" if table == "ELABDATADISP" else "id"
|
|
||||||
|
|
||||||
last_id = start_id
|
|
||||||
max_retries = 3
|
|
||||||
|
|
||||||
while True:
|
|
||||||
retries = 0
|
|
||||||
while retries < max_retries:
|
|
||||||
try:
|
|
||||||
with self.connection.cursor() as cursor:
|
|
||||||
# Fetch by ID using keyset pagination (fast, uses primary key)
|
|
||||||
# Python will sort by consolidation key after fetching
|
|
||||||
|
|
||||||
if last_id is None:
|
|
||||||
query = f"SELECT * FROM `{table}` ORDER BY `{id_column}` ASC LIMIT %s"
|
|
||||||
cursor.execute(query, (batch_size,))
|
|
||||||
else:
|
|
||||||
# For resume: fetch rows with id > last_id
|
|
||||||
query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY `{id_column}` ASC LIMIT %s"
|
|
||||||
cursor.execute(query, (last_id, batch_size))
|
|
||||||
|
|
||||||
rows = cursor.fetchall()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
return
|
|
||||||
|
|
||||||
yield rows
|
|
||||||
last_id = rows[-1][id_column]
|
|
||||||
break # Success, exit retry loop
|
|
||||||
|
|
||||||
except pymysql.Error as e:
|
|
||||||
retries += 1
|
|
||||||
if retries >= max_retries:
|
|
||||||
logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}")
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}")
|
|
||||||
# Reconnect and retry
|
|
||||||
try:
|
|
||||||
self.disconnect()
|
|
||||||
self.connect()
|
|
||||||
except Exception as reconnect_error:
|
|
||||||
logger.error(f"Failed to reconnect: {reconnect_error}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def get_table_structure(self, table: str) -> Dict[str, Any]:
|
def get_table_structure(self, table: str) -> Dict[str, Any]:
|
||||||
"""Get table structure (column info).
|
"""Get table structure (column info).
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user