diff --git a/src/connectors/postgres_connector.py b/src/connectors/postgres_connector.py index 05243f4..d1358a5 100644 --- a/src/connectors/postgres_connector.py +++ b/src/connectors/postgres_connector.py @@ -101,7 +101,7 @@ class PostgreSQLConnector: rows: List[Dict[str, Any]], columns: List[str] ) -> int: - """Insert a batch of rows using COPY (fast bulk insert). + """Insert a batch of rows using parameterized INSERT. Args: table: Table name @@ -116,27 +116,25 @@ class PostgreSQLConnector: try: with self.connection.cursor() as cursor: - # Prepare COPY data - copy_data = [] + # Prepare values for INSERT + values_list = [] for row in rows: values = [] for col in columns: val = row.get(col) - if val is None: - values.append("\\N") # NULL representation - elif isinstance(val, (dict, list)): + # Convert JSONB dicts to JSON strings + if isinstance(val, (dict, list)): values.append(json.dumps(val)) - elif isinstance(val, str): - # Escape special characters - val = val.replace("\\", "\\\\").replace("\n", "\\n").replace("\t", "\\t") - values.append(val) else: - values.append(str(val)) - copy_data.append("\t".join(values)) + values.append(val) + values_list.append(tuple(values)) - # Use COPY for fast insert - copy_sql = f"COPY {table} ({','.join(columns)}) FROM STDIN" - cursor.copy(copy_sql, "\n".join(copy_data).encode()) + # Build parameterized INSERT query + placeholders = ",".join(["%s"] * len(columns)) + insert_sql = f"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})" + + # Execute batch insert + cursor.executemany(insert_sql, values_list) self.connection.commit() logger.debug(f"Inserted {len(rows)} rows into {table}")