diff --git a/dbddl/received.ddl b/dbddl/received.ddl index 51c47da..bc6df70 100644 --- a/dbddl/received.ddl +++ b/dbddl/received.ddl @@ -12,8 +12,11 @@ CREATE TABLE `received` ( `status` int DEFAULT '0', `matlab_timestamp` timestamp NULL DEFAULT NULL, `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, - `loaded_at` datetime DEFAULT NULL, + `loaded_at` timestamp DEFAULT NULL, `elaborated_at` timestamp NULL DEFAULT NULL, - `sent_at` timestamp NULL DEFAULT NULL, + `sent_raw_at` timestamp NULL DEFAULT NULL, + `sent_elab_at` timestamp NULL DEFAULT NULL, + `last_update_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; \ No newline at end of file +) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + diff --git a/elab_orchestrator.py b/elab_orchestrator.py index a5f4997..9585546 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -3,18 +3,14 @@ # Import necessary libraries import logging import asyncio -from datetime import datetime # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting -from utils.database import DATA_LOADED, DATA_ELABORATED, DATA_SENT +from utils.database import WorkflowFlags from utils.database.matlab_query import get_matlab_command from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context from utils.database.loader_action import update_status, unlock -from utils.database.elab_query import get_data_as_csv -#from utils.ftp.elab_send import send_csv_to_customer - # Initialize the logger for this module logger = logging.getLogger() @@ -46,7 +42,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: try: logger.info("Inizio elaborazione") - record = await get_next_csv_atomic(pool, cfg.dbrectable, DATA_LOADED) + record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) if record: id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] @@ -56,7 +52,6 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ") matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}" - timestamp_matlab_elab = datetime.now().strftime("%Y-%m-%d %H:%M:%S") proc = await asyncio.create_subprocess_shell( matlab_cmd, cwd=cfg.matlab_func_path, @@ -73,17 +68,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: f.write(stderr.decode().strip()) else: logger.info(stdout.decode().strip()) - await update_status(cfg, id, DATA_ELABORATED, pool) - if tool_elab_info['ftp_send']: - if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab: - if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): - print(elab_csv) - #if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): - #await update_status(cfg, id, DATA_SENT, pool) - await update_status(cfg, id, DATA_SENT, pool) - else: - logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.") - + await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) await unlock(cfg, id, pool) await asyncio.sleep(ELAB_PROCESSING_DELAY) else: diff --git a/env/elab.ini b/env/elab.ini index f2e2690..1fc273e 100644 --- a/env/elab.ini +++ b/env/elab.ini @@ -6,7 +6,7 @@ [tool] # stati in minuscolo - elab_status = active|manual update + elab_status = active|manual upload [matlab] #runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 diff --git a/env/ftp.ini b/env/ftp.ini index 5789825..36794d2 100644 --- a/env/ftp.ini +++ b/env/ftp.ini @@ -2,6 +2,7 @@ # python3 -c 'from hashlib import sha256;print(sha256("????password???".encode("UTF-8")).hexdigest())' [ftpserver] + service_port = 2121 firstPort = 40000 proxyAddr = 0.0.0.0 portRangeWidth = 500 diff --git a/env/send.ini b/env/send.ini new file mode 100644 index 0000000..1396c25 --- /dev/null +++ b/env/send.ini @@ -0,0 +1,5 @@ +[logging] + logFilename = ./send_data.log + +[threads] + max_num = 5 diff --git a/ftp_csv_receiver.py b/ftp_csv_receiver.py index b679a23..e46187a 100755 --- a/ftp_csv_receiver.py +++ b/ftp_csv_receiver.py @@ -136,7 +136,7 @@ def main(): ) # Create and start the FTP server - server = FTPServer(("0.0.0.0", 2121), handler) + server = FTPServer(("0.0.0.0", cfg.service_port), handler) server.serve_forever() except Exception as e: diff --git a/load_orchestrator.py b/load_orchestrator.py index 2c07782..5e665d5 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -7,7 +7,7 @@ import asyncio # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting -from utils.database import CSV_RECEIVED +from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context @@ -33,14 +33,13 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: # Imposta il context per questo worker worker_context.set(f"W{worker_id:02d}") - debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.info("Avviato") while True: try: logger.info("Inizio elaborazione") - record = await get_next_csv_atomic(pool, cfg.dbrectable, CSV_RECEIVED) + record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED) if record: success = await load_csv(record, cfg, pool) @@ -52,7 +51,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: - logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) + logger.error(f"Errore durante l'esecuzione: {e}", exc_info=1) await asyncio.sleep(1) @@ -67,6 +66,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool: Returns: True se l'elaborazione del CSV è avvenuta con successo, False altrimenti. """ + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.debug("Inizio ricerca nuovo CSV da elaborare") diff --git a/send_orchestrator.py b/send_orchestrator.py new file mode 100755 index 0000000..3d8ef50 --- /dev/null +++ b/send_orchestrator.py @@ -0,0 +1,68 @@ +#!.venv/bin/python + +# Import necessary libraries +import logging +import asyncio + +# Import custom modules for configuration and database connection +from utils.config import loader_send_data as setting +from utils.database import WorkflowFlags +from utils.csv.loaders import get_next_csv_atomic +from utils.orchestrator_utils import run_orchestrator, worker_context +from utils.database.loader_action import update_status, unlock +from utils.database.elab_query import get_data_as_csv +#from utils.ftp.elab_send import send_csv_to_customer + + +# Initialize the logger for this module +logger = logging.getLogger() + +# Delay tra un processamento CSV e il successivo (in secondi) +ELAB_PROCESSING_DELAY = 0.2 +# Tempo di attesa se non ci sono record da elaborare +NO_RECORD_SLEEP = 60 + +async def worker(worker_id: int, cfg: object, pool: object) -> None: + + # Imposta il context per questo worker + worker_context.set(f"W{worker_id:02d}") + + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG + logger.info("Avviato") + + while True: + try: + logger.info("Inizio elaborazione") + + record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA) + + if record: + id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] + + ''' + if tool_elab_info['ftp_send']: + if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab: + if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): + print(elab_csv) + #if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): + #await update_status(cfg, id, , pool) + #await update_status(cfg, id, , pool) + else: + logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.") + ''' + + else: + logger.info("Nessun record disponibile") + await asyncio.sleep(NO_RECORD_SLEEP) + + except Exception as e: + logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) + await asyncio.sleep(1) + + +async def main(): + """Funzione principale che avvia il send_orchestrator.""" + await run_orchestrator(setting.Config, worker) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/utils/config/loader_ftp_csv.py b/utils/config/loader_ftp_csv.py index a8c0a47..6a9b8a3 100644 --- a/utils/config/loader_ftp_csv.py +++ b/utils/config/loader_ftp_csv.py @@ -10,6 +10,7 @@ class Config: c.read(["env/ftp.ini", "env/db.ini"]) # FTP setting + self.service_port = c.getint("ftpserver", "service_port") self.firstport = c.getint("ftpserver", "firstPort") self.proxyaddr = c.get("ftpserver", "proxyAddr") self.portrangewidth = c.getint("ftpserver", "portRangeWidth") diff --git a/utils/config/loader_send_data.py b/utils/config/loader_send_data.py new file mode 100644 index 0000000..04db549 --- /dev/null +++ b/utils/config/loader_send_data.py @@ -0,0 +1,31 @@ +"""set configurations + +""" +from configparser import ConfigParser + +class Config: + def __init__(self): + + c = ConfigParser() + c.read(["env/send.ini", "env/db.ini"]) + + # LOG setting + self.logfilename = c.get("logging", "logFilename") + + # Worker setting + self.max_threads = c.getint("threads", "max_num") + + # DB setting + self.dbhost = c.get("db", "hostname") + self.dbport = c.getint("db", "port") + self.dbuser = c.get("db", "user") + self.dbpass = c.get("db", "password") + self.dbname = c.get("db", "dbName") + self.max_retries = c.getint("db", "maxRetries") + + # Tables + self.dbusertable = c.get("tables", "userTableName") + self.dbrectable = c.get("tables", "recTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbnodes = c.get("tables", "nodesTableName") diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index a752fd2..efa58d6 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -40,7 +40,14 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list: UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] - for riga in [riga for riga in righe if ';|;' in riga]: + """ + Ciclo su tutte le righe del file CSV, escludendo quelle che: + non hanno il pattern ';|;' perché non sono dati ma è la header + che hanno il pattern 'No RX' perché sono letture non pervenute o in errore + che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola + che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina + """ + for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]: timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) EventDate, EventTime = timestamp.split(' ') if batlevel == '|': @@ -70,7 +77,7 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) + node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$' @@ -104,10 +111,10 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) + node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] - for riga in [riga for riga in righe if ';|;' in riga]: + for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]: timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3) EventDate, EventTime = timestamp.split(' ') valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] @@ -132,10 +139,10 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) + node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] - for riga in [riga for riga in righe if ';|;' in riga]: + for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]: timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2) if timestamp == '': continue @@ -194,17 +201,21 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] - pattern = r'^-\d*dB$' - for riga in [riga for riga in righe if ';|;' in riga]: - timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + pattern = r';-?\d+dB$' + for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]: + timestamp, rilevazioni = riga.split(';|;',1) EventDate, EventTime = timestamp.split(' ') - if batlevel == '|': - batlevel = temperature - temperature, rilevazioni = rilevazioni.split(';',1) - if re.match(pattern, rilevazioni): - valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' - for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): - valori = valori_nodo.split(';') - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + logger.info(f"GD id {id}: {pattern} {rilevazioni}") + if re.search(pattern, rilevazioni): + batlevel, temperature, rssi = rilevazioni.split(';') + logger.info(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") + elif all(char == ';' for char in rilevazioni): + pass + elif ';|;' in rilevazioni: + unit_metrics, data = rilevazioni.split(';|;') + batlevel, temperature = unit_metrics.split(';') + logger.info(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") + else: + logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}") return matrice_valori \ No newline at end of file diff --git a/utils/csv/loaders.py b/utils/csv/loaders.py index 3020ffc..25fc7f7 100644 --- a/utils/csv/loaders.py +++ b/utils/csv/loaders.py @@ -1,5 +1,5 @@ from utils.database.loader_action import load_data, update_status, unlock -from utils.database import DATA_LOADED +from utils.database import WorkflowFlags from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix import logging @@ -32,13 +32,13 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: logger.info("matrice valori creata") # Load the data into the database if await load_data(cfg, matrice_valori, pool): - await update_status(cfg, id, DATA_LOADED, pool) + await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await unlock(cfg, id, pool) else: logger.warning(f"Action '{action}' non riconosciuta.") -async def get_next_csv_atomic(pool, table_name, status): +async def get_next_csv_atomic(pool, table_name, status, next_status): """Preleva atomicamente il prossimo CSV da elaborare""" async with pool.acquire() as conn: # IMPORTANTE: Disabilita autocommit per questa transazione @@ -47,14 +47,17 @@ async def get_next_csv_atomic(pool, table_name, status): try: async with conn.cursor() as cur: # Usa SELECT FOR UPDATE per lock atomico + await cur.execute(f""" SELECT id, unit_type, tool_type, unit_name, tool_name FROM {table_name} - WHERE locked = 0 AND status = %s + WHERE locked = 0 + AND ((status & %s) > 0 OR %s = 0) + AND (status & %s) = 0 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED - """, (status,)) + """, (status, status, next_status)) result = await cur.fetchone() if result: diff --git a/utils/database/__init__.py b/utils/database/__init__.py index e7a09f7..818b46e 100644 --- a/utils/database/__init__.py +++ b/utils/database/__init__.py @@ -1,4 +1,18 @@ -CSV_RECEIVED = 0 -DATA_LOADED = 1 -DATA_ELABORATED = 2 -DATA_SENT = 3 \ No newline at end of file +class WorkflowFlags: + CSV_RECEIVED = 0 # 0000 + DATA_LOADED = 1 # 0001 + DATA_ELABORATED = 2 # 0010 + SENT_RAW_DATA = 4 # 0100 + SENT_ELAB_DATA = 8 # 1000 + + +# Mappatura flag -> colonna timestamp +FLAG_TO_TIMESTAMP = { + WorkflowFlags.CSV_RECEIVED: "inserted_at", + WorkflowFlags.DATA_LOADED: "loaded_at", + WorkflowFlags.DATA_ELABORATED: "elaborated_at", + WorkflowFlags.SENT_RAW_DATA: "sent_raw_at", + WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at" +} + +BATCH_SIZE = 1000 \ No newline at end of file diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index 061844d..e53bdc9 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -2,11 +2,10 @@ import logging import asyncio +from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE + logger = logging.getLogger(__name__) -timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at", "sent_at"] - - async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: """Carica una lista di record di dati grezzi nel database. @@ -62,15 +61,23 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `Created_at` = NOW() """ - + #logger.info(f"Query insert: {sql_insert_RAWDATA}.") + #logger.info(f"Matrice valori da inserire: {matrice_valori}.") rc = False async with pool.acquire() as conn: async with conn.cursor() as cur: for attempt in range(cfg.max_retries): try: logging.info(f"Loading data attempt {attempt + 1}.") - await cur.executemany(sql_insert_RAWDATA, matrice_valori) - await conn.commit() + + for i in range(0, len(matrice_valori), BATCH_SIZE): + batch = matrice_valori[i:i + BATCH_SIZE] + + await cur.executemany(sql_insert_RAWDATA, batch) + await conn.commit() + + logging.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}") + logging.info("Data loaded.") rc = True break @@ -93,7 +100,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: return rc -async def update_status(cfg: object, id: int, status: int, pool: object) -> None: +async def update_status(cfg: object, id: int, status: str, pool: object) -> None: """Aggiorna lo stato di un record nella tabella dei record CSV. Args: @@ -106,7 +113,11 @@ async def update_status(cfg: object, id: int, status: int, pool: object) -> None async with conn.cursor() as cur: try: await cur.execute( - f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}" + f"""update {cfg.dbrectable} set + status = status | {status}, + {FLAG_TO_TIMESTAMP[status]} = now() + where id = {id} + """ ) await conn.commit() logging.info(f"Status updated id {id}.") diff --git a/utils/ftp/elab_send.py b/utils/ftp/send_data.py similarity index 95% rename from utils/ftp/elab_send.py rename to utils/ftp/send_data.py index 06291eb..c906136 100644 --- a/utils/ftp/elab_send.py +++ b/utils/ftp/send_data.py @@ -4,6 +4,7 @@ import logging import aiomysql logger = logging.getLogger(__name__) + class FTPConnection: """ Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection. @@ -38,7 +39,11 @@ class FTPConnection: def __exit__(self, exc_type, exc_val, exc_tb): self.ftp.quit() -async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: +async def send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: + None + return True + +async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: """ Sends elaborated CSV data to a customer via FTP. @@ -79,8 +84,6 @@ async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_dat passive = ftp_parms.get('passive', True) port = ftp_parms.get('port', 21) - - # Connessione FTP with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp: