add user mngmt site command

This commit is contained in:
2022-01-22 15:05:10 +01:00
parent f2475fbafc
commit cba605cfd9
26 changed files with 903 additions and 467 deletions

10
.gitignore vendored
View File

@@ -7,8 +7,10 @@
*.txt *.txt
*.pyz *.pyz
*.pyo *.pyo
*.log
*.lock
pyftpdlib_example.py pyftpdlib_example.py
prova.py prova*.py
dist/FtpCsvReceiver FtpCsvReceiver
prova1.py ase-receiver/ase-receiver/provaftp.py
prova_mail.py .envrc

16
.vscode/launch.json vendored
View File

@@ -1,16 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: File corrente",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}

View File

@@ -1,4 +1,4 @@
{ {
"python.pythonPath": "/usr/bin/python3.8", "docwriter.style": "Google",
"python.formatting.provider": "yapf" "python.formatting.provider": "autopep8"
} }

View File

@@ -1,163 +0,0 @@
#!/usr/bin/python3
import sys
import os
import pika
import logging
import csv
import re
import mariadb
import shutil
from asebat.timefmt import timestamp_fmt as ts
from asebat.timefmt import date_refmt as df
from asebat.config import set_config as setting
class sqlraw():
def __init__(self, cfg):
self.config = {
'host': cfg.dbhost,
'user': cfg.dbuser,
'password': cfg.dbpass
}
self.dbname = cfg.dbname
self.table = cfg.table
self.sql_head = (
"INSERT IGNORE INTO " + self.dbname + "." + self.table +
" (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`,"
+ "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," +
"`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES ")
def add_data(self, values):
self.sql = self.sql_head + '(' + '),('.join(values) + ');'
def write_db(self):
try:
conn = mariadb.connect(**self.config, database=self.dbname)
except mariadb.Error as err:
logging.error("PID {:>5} >> Error to connet to DB {} - System error {}.".format(
os.getpid(), self.dbname, err))
sys.exit(1)
cur = conn.cursor()
try:
cur.execute(self.sql)
except mariadb.ProgrammingError as err:
logging.error("PID {:>5} >> Error write into DB {} - System error {}.".format(
os.getpid(), self.dbname, err))
print(err)
sys.exit(1)
finally:
conn.close()
def callback_ase(ch, method, properties, body, config): #body è di tipo byte
logging.info("PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8")))
msg = body.decode("utf-8").split(";")
sql = sqlraw(config)
stmlst = []
commonData = '"{0}","{1}"'.format(msg[1],msg[2])
tooltype = msg[3]
with open(msg[6], "r") as csvfile:
lines = csvfile.read().splitlines()
for line in lines:
fields = line.split(";|;")
if (mG501 := re.match(
r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$",
fields[0]
)):
rowData = ',"{0}",{1},{2}'.format(mG501.group(1), mG501.group(2), mG501.group(3))
fields.pop(0)
elif (mG201 := re.match(
r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$",
fields[0]
)):
mbtG201 = re.match(
r"^(.+);(.+)$",
fields[1]
)
rowData = ',"{0}",{1},{2}'.format(df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2))
fields.pop(0)
fields.pop(0)
else:
continue
nodeNum = 0
for field in fields:
nodeNum += 1
vals = field.split(";")
stmlst.append(commonData + rowData + ',{0},'.format(nodeNum) + ','.join('"{0}"'.format(d) for d in vals) + ',' + ','.join(['null']*(config.valueNum-len(vals))))
if (config.maxInsertRow < len(stmlst)):
sql.add_data(stmlst)
try:
sql.write_db()
stmlst.clear()
except:
print("errore nell'inserimento")
sys.exit(1)
if len(stmlst) > 0:
sql.add_data(stmlst)
try:
sql.write_db()
ch.basic_ack(delivery_tag=method.delivery_tag)
except:
print("errore nell'inserimento")
sys.exit(1)
newFilename = msg[6].replace("received", "loaded")
newPath, filenameExt = os.path.split(newFilename)
try:
os.makedirs(newPath)
logging.info("PID {:>5} >> path {} created.".format(
os.getpid(), newPath))
except FileExistsError:
logging.info("PID {:>5} >> path {} already exists.".format(
os.getpid(), newPath))
try:
shutil.move(msg[6], newFilename)
logging.info("PID {:>5} >> {} moved into {}.".format(
os.getpid(), filenameExt, newFilename))
except OSError:
logging.error("PID {:>5} >> Error to move {} into {}.".format(
os.getpid(), filenameExt, newFilename))
def main():
cfg = setting.config()
logging.basicConfig(
format="%(asctime)s %(message)s",
filename="/var/log/" + cfg.elablog,
level=logging.INFO,
)
parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass +
'@' + cfg.mqhost + ':' + cfg.mqport +'/%2F')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=cfg.csv_queue, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=cfg.csv_queue,
on_message_callback=lambda ch, method, properties, body: callback_ase(ch, method, properties, body, config=cfg)
)
#channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg)
try:
channel.start_consuming()
except KeyboardInterrupt:
logging.info("PID {0:>5} >> Info: {1}.".format(
os.getpid(), "Shutdown requested...exiting"))
if __name__ == "__main__":
main()

View File

@@ -1,240 +0,0 @@
#!/usr/bin/python3.8
import sys
import os
import shutil
import ssl
import pika
import re
import logging
from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError
from email.mime.text import MIMEText
from asebat.timefmt import timestamp_fmt as ts
from asebat.timefmt import date_refmt as df
from asebat.config import set_config as setting
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import UnixAuthorizer
from pyftpdlib.filesystems import UnixFilesystem
def send_mail(sev, msg, cfg):
msg = MIMEText(cfg.message + "\n" + msg)
msg["Subject"] = cfg.subject + " " + sev
msg["From"] = cfg.sender
msg["To"] = cfg.receivers
conn = SMTP(host=cfg.smtphost,
port=cfg.smtpport,
local_hostname=None,
timeout=5,
source_address=None)
conn.set_debuglevel(cfg.debuglevel)
try:
conn.login(cfg.sender, cfg.password)
conn.sendmail(cfg.sender, cfg.receivers, msg.as_string())
except SMTPAuthenticationError:
logging.error("PID {:>5} >> Mail failed: {}.".format(
os.getpid(), "SMTP authentication error"))
except:
logging.info("PID {:>5} >> Mail failed: {}.".format(
os.getpid(), "CUSTOM_ERROR"))
finally:
conn.quit()
class mq():
def __init__(self, cfg):
parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' +
cfg.mqpass + '@' + cfg.mqhost + ':' +
cfg.mqport + '/%2F')
connection = pika.BlockingConnection(parameters)
self.channel = connection.channel()
self.channel.queue_declare(queue=cfg.csv_queue, durable=True)
def write(self, msg, cfg):
try:
self.channel.basic_publish(
exchange='',
routing_key=cfg.csv_queue,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
logging.info("PID {:>5} >> write message {} in queue".format(
os.getpid(), msg))
except:
logging.error(
"PID {:>5} >> error write message {} in queue".format(
os.getpid(), msg))
def close(self):
self.channel.close()
class ASEHandler(FTPHandler):
def on_file_received(self, file):
unitType = ''
unitName = ''
toolName = ''
toolType = ''
fileDate = ''
fileTime = ''
queue = ''
if not os.stat(file).st_size:
os.remove(file)
logging.info("PID {:>5} >> file {} was empty: removed.".format(
os.getpid(), file))
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
if (m := re.match(
r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$",
filename,
re.I,
)):
unitType = m.group(1).upper()
unitName = m.group(2).upper()
toolName = m.group(3).upper()
toolType = "N/A"
fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4)
fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9)
elif re.match(
r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$",
filename, re.I):
with open(file, "r") as fileCsv:
try:
for i, line in enumerate(fileCsv.readlines(4096), 1):
if (m1 := re.match(
r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$",
line,
re.I,
)):
fileDate = m1.group(2)
fileTime = m1.group(3)
elif (m2 := re.match(
r"^(\w+\d+)\s(\w+\d+)\;*\n?$",
line,
re.I,
)):
unitType = m2.group(1).upper()
unitName = m2.group(2).upper()
elif (m3 := re.match(
r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$",
line, re.I)):
if m3.group(2):
toolType = m3.group(1).upper()
toolName = m3.group(2).upper()
else:
toolType = "".join(
re.findall("^[a-zA-Z]+",
m3.group(1))).upper()
toolName = m3.group(1).upper()
break
except:
logging.error("PID {:>5} >> Error: {}.".format(
os.getpid(),
sys.exc_info()[1]))
fileCsv.close
logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format(
os.getpid(),
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
))
newPath = cfg.csvfs + self.username + "/received/" + unitName.upper(
) + "/"
newFilename = (newPath + filename + "_" +
str(ts.timestamp("tms") + fileExtension))
fileRenamed = (file + "_" + str(ts.timestamp("tms")))
os.rename(file, fileRenamed)
try:
os.makedirs(newPath)
logging.info("PID {:>5} >> path {} created.".format(
os.getpid(), newPath))
except FileExistsError:
logging.info("PID {:>5} >> path {} already exists.".format(
os.getpid(), newPath))
try:
shutil.move(fileRenamed, newFilename)
logging.info("PID {:>5} >> {} moved into {}.".format(
os.getpid(), filenameExt, newFilename))
except OSError:
logging.error("PID {:>5} >> Error to move {} into {}.".format(
os.getpid(), filenameExt, newFilename))
send_mail(
"Error",
"OS error move " + filenameExt + " to " + newFilename, cfg)
mq_message = "{};{};{};{};{};{};{}".format(
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
newFilename,
)
try:
queue = mq(cfg)
queue.write(mq_message, cfg)
logging.info("PID {:>5} >> queue message: {}.".format(
os.getpid(), mq_message))
except:
logging.error(
"PID {:>5} >> Error to put message in queue: {}.".format(
os.getpid(), mq_message))
send_mail("Error",
"Error to put message " + mq_message + " in queue.",
cfg)
finally:
queue.close()
def on_incomplete_file_received(self, file):
# remove partially uploaded files
os.remove(file)
def main():
cfg = setting.config()
try:
authorizer = UnixAuthorizer(rejected_users=["root"],
require_valid_shell=True)
handler = ASEHandler
handler.cfg = cfg
handler.authorizer = authorizer
handler.abstracted_fs = UnixFilesystem
handler.masquerade_address = cfg.proxyaddr
_range = list(range(cfg.firstport, cfg.firstport + 20))
handler.passive_ports = _range
logging.basicConfig(
format="%(asctime)s %(message)s",
filename="/var/log/" + cfg.logfilename,
level=logging.INFO,
)
server = FTPServer(("0.0.0.0", 21), handler)
server.serve_forever()
except KeyboardInterrupt:
logging.info("PID {:>5} >> Info: {}.".format(
os.getpid(), "Shutdown requested...exiting"))
except Exception:
print("{} - PID {:>5} >> Error: {}.".format(ts.timestamp("log"),
os.getpid(),
sys.exc_info()[1]))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,197 @@
#!/usr/bin/python3
import sys
import os
import pika
import logging
import csv
import re
import mariadb
import shutil
from utils.timefmt import timestamp_fmt as ts
from utils.timefmt import date_refmt as df
from utils.config import set_config as setting
class sqlraw:
def __init__(self, cfg):
self.config = {"host": cfg.dbhost, "user": cfg.dbuser, "password": cfg.dbpass}
self.dbname = cfg.dbname
self.table = cfg.table
self.sql_head = (
"INSERT IGNORE INTO "
+ self.dbname
+ "."
+ self.table
+ " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`,"
+ "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,"
+ "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES "
)
def add_data(self, values):
self.sql = self.sql_head + "(" + "),(".join(values) + ");"
def write_db(self):
try:
conn = mariadb.connect(**self.config, database=self.dbname)
except mariadb.Error as err:
logging.error(
"PID {:>5} >> Error to connet to DB {} - System error {}.".format(
os.getpid(), self.dbname, err
)
)
sys.exit(1)
cur = conn.cursor()
try:
cur.execute(self.sql)
except mariadb.ProgrammingError as err:
logging.error(
"PID {:>5} >> Error write into DB {} - System error {}.".format(
os.getpid(), self.dbname, err
)
)
print(err)
sys.exit(1)
finally:
conn.close()
def callback_ase(ch, method, properties, body, config): # body è di tipo byte
logging.info(
"PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8"))
)
msg = body.decode("utf-8").split(";")
sql = sqlraw(config)
stmlst = []
commonData = '"{0}","{1}"'.format(msg[1], msg[2])
tooltype = msg[3]
with open(msg[6], "r") as csvfile:
lines = csvfile.read().splitlines()
for line in lines:
fields = line.split(";|;")
if mG501 := re.match(
r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", fields[0]
):
rowData = ',"{0}",{1},{2}'.format(
mG501.group(1), mG501.group(2), mG501.group(3)
)
fields.pop(0)
elif mG201 := re.match(
r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$", fields[0]
):
mbtG201 = re.match(r"^(.+);(.+)$", fields[1])
rowData = ',"{0}",{1},{2}'.format(
df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2)
)
fields.pop(0)
fields.pop(0)
else:
continue
nodeNum = 0
for field in fields:
nodeNum += 1
vals = field.split(";")
stmlst.append(
commonData
+ rowData
+ ",{0},".format(nodeNum)
+ ",".join('"{0}"'.format(d) for d in vals)
+ ","
+ ",".join(["null"] * (config.valueNum - len(vals)))
)
if config.maxInsertRow < len(stmlst):
sql.add_data(stmlst)
try:
sql.write_db()
stmlst.clear()
except:
print("errore nell'inserimento")
sys.exit(1)
if len(stmlst) > 0:
sql.add_data(stmlst)
try:
sql.write_db()
ch.basic_ack(delivery_tag=method.delivery_tag)
except:
print("errore nell'inserimento")
sys.exit(1)
newFilename = msg[6].replace("received", "loaded")
newPath, filenameExt = os.path.split(newFilename)
try:
os.makedirs(newPath)
logging.info("PID {:>5} >> path {} created.".format(os.getpid(), newPath))
except FileExistsError:
logging.info(
"PID {:>5} >> path {} already exists.".format(os.getpid(), newPath)
)
try:
shutil.move(msg[6], newFilename)
logging.info(
"PID {:>5} >> {} moved into {}.".format(
os.getpid(), filenameExt, newFilename
)
)
except OSError:
logging.error(
"PID {:>5} >> Error to move {} into {}.".format(
os.getpid(), filenameExt, newFilename
)
)
def main():
cfg = setting.config()
logging.basicConfig(
format="%(asctime)s %(message)s",
filename="/var/log/" + cfg.elablog,
level=logging.INFO,
)
parameters = pika.URLParameters(
"amqp://"
+ cfg.mquser
+ ":"
+ cfg.mqpass
+ "@"
+ cfg.mqhost
+ ":"
+ cfg.mqport
+ "/%2F"
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=cfg.csv_queue, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=cfg.csv_queue,
on_message_callback=lambda ch, method, properties, body: callback_ase(
ch, method, properties, body, config=cfg
),
)
# channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg)
try:
channel.start_consuming()
except KeyboardInterrupt:
logging.info(
"PID {0:>5} >> Info: {1}.".format(
os.getpid(), "Shutdown requested...exiting"
)
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,397 @@
#!/usr/bin/env python3
import sys
import os
import shutil
import ssl
import pika
import re
import logging
import sqlite3
from hashlib import md5
from pathlib import Path
from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError
from email.mime.text import MIMEText
from utils.time import timestamp_fmt as ts
from utils.time import date_refmt as df
from utils.config import set_config as setting
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
def send_mail(sev, msg, cfg):
msg = MIMEText(cfg.message + "\n" + msg)
msg["Subject"] = cfg.subject + " " + sev
msg["From"] = cfg.sender
msg["To"] = cfg.receivers
conn = SMTP(
host=cfg.smtphost,
port=cfg.smtpport,
local_hostname=None,
timeout=5,
source_address=None,
)
conn.set_debuglevel(cfg.debuglevel)
try:
conn.login(cfg.sender, cfg.password)
conn.sendmail(cfg.sender, cfg.receivers, msg.as_string())
except SMTPAuthenticationError:
logging.error(
"PID {:>5} >> Mail failed: {}.".format(
os.getpid(), "SMTP authentication error"
)
)
except:
logging.info(
"PID {:>5} >> Mail failed: {}.".format(os.getpid(), "CUSTOM_ERROR")
)
finally:
conn.quit()
class mq:
def __init__(self, cfg):
parameters = pika.URLParameters(
"amqp://"
+ cfg.mquser
+ ":"
+ cfg.mqpass
+ "@"
+ cfg.mqhost
+ ":"
+ cfg.mqport
+ "/%2F"
)
connection = pika.BlockingConnection(parameters)
self.channel = connection.channel()
self.channel.queue_declare(queue=cfg.csv_queue, durable=True)
def write(self, msg, cfg):
try:
self.channel.basic_publish(
exchange="",
routing_key=cfg.csv_queue,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
)
logging.info(
"PID {:>5} >> write message {} in queue".format(
os.getpid(), msg)
)
except:
logging.error(
"PID {:>5} >> error write message {} in queue".format(
os.getpid(), msg)
)
def close(self):
self.channel.close()
class DummyMD5Authorizer(DummyAuthorizer):
def validate_authentication(self, username, password, handler):
# if sys.version_info >= (3, 0):
# password = md5(password.encode("latin1"))
hash = md5(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != hash:
raise KeyError
except KeyError:
raise AuthenticationFailed
class ASEHandler(FTPHandler):
def on_file_received(self, file):
unitType = ""
unitName = ""
toolName = ""
toolType = ""
fileDate = ""
fileTime = ""
queue = ""
if not os.stat(file).st_size:
os.remove(file)
logging.info(
"PID {:>5} >> file {} was empty: removed.".format(
os.getpid(), file)
)
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
if m := re.match(
r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$",
filename,
re.I,
):
unitType = m.group(1).upper()
unitName = m.group(2).upper()
toolName = m.group(3).upper()
toolType = "N/A"
fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4)
fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9)
elif re.match(
r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I
):
with open(file, "r") as fileCsv:
try:
for i, line in enumerate(fileCsv.readlines(4096), 1):
if m1 := re.match(
r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$",
line,
re.I,
):
fileDate = m1.group(2)
fileTime = m1.group(3)
elif m2 := re.match(
r"^(\w+\d+)\s(\w+\d+)\;*\n?$",
line,
re.I,
):
unitType = m2.group(1).upper()
unitName = m2.group(2).upper()
elif m3 := re.match(
r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$",
line,
re.I,
):
if m3.group(2):
toolType = m3.group(1).upper()
toolName = m3.group(2).upper()
else:
toolType = "".join(
re.findall("^[a-zA-Z]+", m3.group(1))
).upper()
toolName = m3.group(1).upper()
break
except:
logging.error(
"PID {:>5} >> Error: {}.".format(
os.getpid(), sys.exc_info()[1]
)
)
fileCsv.close
logging.info(
"PID {:>5} >> {} - {} - {} - {} - {} {}.".format(
os.getpid(),
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
)
)
newPath = cfg.csvfs + self.username + "/received/" + unitName.upper() + "/"
newFilename = (
newPath + filename + "_" +
str(ts.timestamp("tms") + fileExtension)
)
fileRenamed = file + "_" + str(ts.timestamp("tms"))
os.rename(file, fileRenamed)
try:
os.makedirs(newPath)
logging.info(
"PID {:>5} >> path {} created.".format(
os.getpid(), newPath)
)
except FileExistsError:
logging.info(
"PID {:>5} >> path {} already exists.".format(
os.getpid(), newPath)
)
try:
shutil.move(fileRenamed, newFilename)
logging.info(
"PID {:>5} >> {} moved into {}.".format(
os.getpid(), filenameExt, newFilename
)
)
except OSError:
logging.error(
"PID {:>5} >> Error to move {} into {}.".format(
os.getpid(), filenameExt, newFilename
)
)
send_mail(
"Error", "OS error move " + filenameExt + " to " + newFilename, cfg
)
mq_message = "{};{};{};{};{};{};{}".format(
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
newFilename,
)
try:
queue = mq(cfg)
queue.write(mq_message, cfg)
logging.info(
"PID {:>5} >> queue message: {}.".format(
os.getpid(), mq_message)
)
except:
logging.error(
"PID {:>5} >> Error to put message in queue: {}.".format(
os.getpid(), mq_message
)
)
send_mail(
"Error", "Error to put message " + mq_message + " in queue.", cfg
)
finally:
queue.close()
def on_incomplete_file_received(self, file):
# remove partially uploaded files
os.remove(file)
def ftp_SITE_ADDU(self, line):
"""
add virtual user and save virtuser cfg file
create virtuser dir in virtpath cfg path
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0])
password = parms[1]
hash = md5(password.encode("UTF-8")).hexdigest()
try:
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except:
self.responde('551 Error in create virtual user path.')
else:
try:
self.authorizer.add_user(str(user),
hash, cfg.virtpath + user, perm="lmw")
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)",
(user, hash, cfg.virtpath + user, 'elmw'))
con.commit()
con.close()
self.respond('200 SITE ADDU successful.')
except:
self.respond('501 Invalid SITE ADDU format.')
def ftp_SITE_REMU(self, line):
"""
remove virtual user and save virtuser cfg file
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0])
try:
self.authorizer.remove_user(str(user))
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute("DELETE FROM virtusers WHERE user = ?", (user,))
con.commit()
con.close()
self.respond('200 SITE REMU successful.')
except:
self.respond('501 Invalid SITE REMU format.')
def ftp_SITE_LSTU(self, line):
"""
list virtual user
"""
cfg = self.cfg
users_list = []
try:
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
self.push("214-The following virtual users are defined:\r\n")
for row in cur.execute("SELECT * FROM virtusers").fetchall():
users_list.append(
" Username: " + row[0] + " - Perms: " + row[3] + "\r\n")
con.close()
self.push(''.join(users_list))
self.respond("214 LSTU SITE command successful.")
except:
self.respond('501 list users failed.')
def main():
cfg = setting.config()
try:
proto_cmds = FTPHandler.proto_cmds.copy()
proto_cmds.update(
{'SITE ADDU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
)
proto_cmds.update(
{'SITE REMU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> REMU USERNAME (remove virtual user).')}
)
proto_cmds.update(
{'SITE LSTU': dict(perm='M', auth=True, arg=None,
help='Syntax: SITE <SP> LSTU (list virtual users).')}
)
authorizer = DummyMD5Authorizer()
authorizer.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute(
'''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''')
cur.execute(
'''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''')
for row in cur.execute('SELECT * FROM virtusers'):
authorizer.add_user(row[0], row[1], row[2], perm=row[3])
con.close()
handler = ASEHandler
handler.proto_cmds = proto_cmds
handler.cfg = cfg
handler.authorizer = authorizer
handler.masquerade_address = cfg.proxyaddr
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = _range
logging.basicConfig(
format="%(asctime)s %(message)s",
filename=cfg.logfilename,
level=logging.INFO,
)
server = FTPServer(("0.0.0.0", 2121), handler)
server.serve_forever()
except KeyboardInterrupt:
logging.info(
"PID {:>5} >> Info: {}.".format(
os.getpid(), "Shutdown requested...exiting")
)
except Exception:
print(
"{} - PID {:>5} >> Error: {}.".format(
ts.timestamp("log"), os.getpid(), sys.exc_info()[1]
)
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.
"""A basic unix daemon using the python-daemon library:
http://pypi.python.org/pypi/python-daemon
Example usages:
$ python unix_daemon.py start
$ python unix_daemon.py stop
$ python unix_daemon.py status
$ python unix_daemon.py # foreground (no daemon)
$ python unix_daemon.py --logfile /var/log/ftpd.log start
$ python unix_daemon.py --pidfile /var/run/ftpd.pid start
This is just a proof of concept which demonstrates how to daemonize
the FTP server.
You might want to use this as an example and provide the necessary
customizations.
Parts you might want to customize are:
- UMASK, WORKDIR, HOST, PORT constants
- get_server() function (to define users and customize FTP handler)
Authors:
- Ben Timby - btimby <at> gmail.com
- Giampaolo Rodola' - g.rodola <at> gmail.com
"""
import atexit
import errno
import optparse
import os
import signal
import sys
import time
from pyftpdlib.authorizers import UnixAuthorizer
from pyftpdlib.filesystems import UnixFilesystem
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
# overridable options
HOST = ""
PORT = 21
PID_FILE = "/var/run/pyftpdlib.pid"
LOG_FILE = "/var/log/pyftpdlib.log"
WORKDIR = os.getcwd()
UMASK = 0
def pid_exists(pid):
"""Return True if a process with the given PID is currently running."""
try:
os.kill(pid, 0)
except OSError as err:
return err.errno == errno.EPERM
else:
return True
def get_pid():
"""Return the PID saved in the pid file if possible, else None."""
try:
with open(PID_FILE) as f:
return int(f.read().strip())
except IOError as err:
if err.errno != errno.ENOENT:
raise
def stop():
"""Keep attempting to stop the daemon for 5 seconds, first using
SIGTERM, then using SIGKILL.
"""
pid = get_pid()
if not pid or not pid_exists(pid):
sys.exit("daemon not running")
sig = signal.SIGTERM
i = 0
while True:
sys.stdout.write('.')
sys.stdout.flush()
try:
os.kill(pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
print("\nstopped (pid %s)" % pid)
return
else:
raise
i += 1
if i == 25:
sig = signal.SIGKILL
elif i == 50:
sys.exit("\ncould not kill daemon (pid %s)" % pid)
time.sleep(0.1)
def status():
"""Print daemon status and exit."""
pid = get_pid()
if not pid or not pid_exists(pid):
print("daemon not running")
else:
print("daemon running with pid %s" % pid)
sys.exit(0)
def get_server():
"""Return a pre-configured FTP server instance."""
handler = FTPHandler
handler.authorizer = UnixAuthorizer()
handler.abstracted_fs = UnixFilesystem
server = FTPServer((HOST, PORT), handler)
return server
def daemonize():
"""A wrapper around python-daemonize context manager."""
def _daemonize():
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
# decouple from parent environment
os.chdir(WORKDIR)
os.setsid()
os.umask(0)
# do second fork
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(LOG_FILE, 'r')
so = open(LOG_FILE, 'a+')
se = open(LOG_FILE, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
pid = str(os.getpid())
with open(PID_FILE, 'w') as f:
f.write("%s\n" % pid)
atexit.register(lambda: os.remove(PID_FILE))
pid = get_pid()
if pid and pid_exists(pid):
sys.exit('daemon already running (pid %s)' % pid)
# instance FTPd before daemonizing, so that in case of problems we
# get an exception here and exit immediately
server = get_server()
_daemonize()
server.serve_forever()
def main():
global PID_FILE, LOG_FILE
USAGE = "python [-p PIDFILE] [-l LOGFILE]\n\n" \
"Commands:\n - start\n - stop\n - status"
parser = optparse.OptionParser(usage=USAGE)
parser.add_option('-l', '--logfile', dest='logfile',
help='the log file location')
parser.add_option('-p', '--pidfile', dest='pidfile', default=PID_FILE,
help='file to store/retreive daemon pid')
options, args = parser.parse_args()
if options.pidfile:
PID_FILE = options.pidfile
if options.logfile:
LOG_FILE = options.logfile
if not args:
server = get_server()
server.serve_forever()
else:
if len(args) != 1:
sys.exit('too many commands')
elif args[0] == 'start':
daemonize()
elif args[0] == 'stop':
stop()
elif args[0] == 'restart':
try:
stop()
finally:
daemonize()
elif args[0] == 'status':
status()
else:
sys.exit('invalid command')
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,7 +1,12 @@
[ftpserver] [ftpserver]
firstport = 40000 firstPort = 40000
logfilename = ftppylog.log logFilename = ./ftppylog.log
proxyaddr = 0.0.0.0 proxyAddr = 0.0.0.0
portRangeWidth = 50
virtusersdb = /home/aseftp/virtusers.db
virtpath = /home/aseftp/
adminuser = admin|c8cf955bd8b8a78419013b831e627eb2|/home/aseftp/|elradfmwMT
[mailserver] [mailserver]
hostname = smtps.aruba.it hostname = smtps.aruba.it
@@ -10,7 +15,7 @@
password = taylor1964NFL! password = taylor1964NFL!
receivers = alessandro.battilani@gmail.com receivers = alessandro.battilani@gmail.com
message = prova messaggio message = prova messaggio
bbbbb bbbbb
ccccc ccccc
subject = ciao a domani subject = ciao a domani
@@ -21,20 +26,20 @@
port = 5672 port = 5672
user = alex user = alex
password = batt1l0 password = batt1l0
csv_queue = task_queue csvQueue = task_queue
elab_queue = elab_queue elabQueue = elab_queue
[csvfs] [csvfs]
path = /home/ path = /home/
[csvelab] [csvelab]
logfilename = csvElab.log logFilename = csvElab.log
[db] [db]
hostname = 192.168.1.241 hostname = 192.168.1.241
user = root user = root
password = batt1l0 password = batt1l0
dbname = ase dbName = ase
tablename = rawdata tableName = rawdata
maxInsertRow = 20000 maxInsertRow = 20000
valueNum = 16 valueNum = 16

View File

@@ -0,0 +1,16 @@
[tool.poetry]
name = "ase-receiver"
version = "0.1.0"
description = ""
authors = ["Alessandro Battilani <alessandro.battilani@gmail.com>"]
[tool.poetry.dependencies]
python = "^3.9"
pika = "^1.2.0"
pyftpdlib = "^1.5.6"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1 @@
"""Utilità"""

View File

@@ -0,0 +1 @@
"""Config ini setting"""

View File

@@ -4,14 +4,18 @@
from configparser import ConfigParser from configparser import ConfigParser
class config(): class config:
def __init__(self): def __init__(self):
c = ConfigParser() c = ConfigParser()
c.read(["/etc/ase/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"]) c.read(["/etc/aseftp/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"])
# FTP setting # FTP setting
self.firstport = c.getint("ftpserver", "firstport") self.firstport = c.getint("ftpserver", "firstPort")
self.logfilename = c.get("ftpserver", "logfilename") self.logfilename = c.get("ftpserver", "logFilename")
self.proxyaddr = c.get("ftpserver", "proxyaddr") self.proxyaddr = c.get("ftpserver", "proxyAddr")
self.portrangewidth = c.getint("ftpserver", "portRangeWidth")
self.virtusersdb = c.get("ftpserver", "virtusersdb")
self.virtpath = c.get("ftpserver", "virtpath")
self.adminuser = c.get("ftpserver", "adminuser").split("|")
# MAIL setting # MAIL setting
self.smtphost = c.get("mailserver", "hostname") self.smtphost = c.get("mailserver", "hostname")
self.smtpport = c.getint("mailserver", "port") self.smtpport = c.getint("mailserver", "port")
@@ -26,17 +30,17 @@ class config():
self.mqport = c.get("mqserver", "port") self.mqport = c.get("mqserver", "port")
self.mquser = c.get("mqserver", "user") self.mquser = c.get("mqserver", "user")
self.mqpass = c.get("mqserver", "password") self.mqpass = c.get("mqserver", "password")
self.csv_queue = c.get("mqserver", "csv_queue") self.csv_queue = c.get("mqserver", "csvQueue")
self.elab_queue = c.get("mqserver", "elab_queue") self.elab_queue = c.get("mqserver", "elabQueue")
# CSV FILE setting # CSV FILE setting
self.csvfs = c.get("csvfs", "path") self.csvfs = c.get("csvfs", "path")
# LOADER setting # LOADER setting
self.elablog = c.get("csvelab", "logfilename") self.elablog = c.get("csvelab", "logFilename")
# DB setting # DB setting
self.dbhost = c.get("db", "hostname") self.dbhost = c.get("db", "hostname")
self.dbuser = c.get("db", "user") self.dbuser = c.get("db", "user")
self.dbpass = c.get("db", "password") self.dbpass = c.get("db", "password")
self.dbname = c.get("db", "dbname") self.dbname = c.get("db", "dbName")
self.table = c.get("db", "tablename") self.table = c.get("db", "tableName")
self.valueNum = c.getint("db", "valueNum") self.valueNum = c.getint("db", "valueNum")
self.maxInsertRow = c.getint("db", "maxInsertRow") self.maxInsertRow = c.getint("db", "maxInsertRow")

View File

@@ -0,0 +1 @@
locals

View File

@@ -0,0 +1 @@
"""Parser delle centraline"""

View File

@@ -0,0 +1 @@
"""Utilità per i formati timestamp"""

View File

@@ -4,20 +4,22 @@
import datetime import datetime
def dateFmt(date): def dateFmt(date):
t = date.replace("/","-") t = date.replace("/", "-")
try: try:
datetime.datetime.strptime(t, '%Y-%m-%d') datetime.datetime.strptime(t, "%Y-%m-%d")
return t return t
except ValueError: except ValueError:
d = datetime.datetime.strptime(t, '%d-%m-%Y') d = datetime.datetime.strptime(t, "%d-%m-%Y")
return datetime.datetime.strftime(d, "%Y-%m-%d") return datetime.datetime.strftime(d, "%Y-%m-%d")
def dateTimeFmt(date): def dateTimeFmt(date):
t = date.replace("/","-") t = date.replace("/", "-")
try: try:
datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S') datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
return t return t
except ValueError: except ValueError:
d = datetime.datetime.strptime(t, '%d-%m-%Y %H:%M:%S') d = datetime.datetime.strptime(t, "%d-%m-%Y %H:%M:%S")
return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S") return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S")

View File

@@ -0,0 +1,25 @@
"""Funzioni per convertire formato data
"""
import datetime
def dateFmt(date):
t = date.replace("/", "-")
try:
datetime.datetime.strptime(t, "%Y-%m-%d")
return t
except ValueError:
d = datetime.datetime.strptime(t, "%d-%m-%Y")
return datetime.datetime.strftime(d, "%Y-%m-%d")
def dateTimeFmt(date):
t = date.replace("/", "-")
try:
datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
return t
except ValueError:
d = datetime.datetime.strptime(t, "%d-%m-%Y %H:%M:%S")
return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S")

View File

@@ -0,0 +1,10 @@
"""Funzioni per timestamp
"""
import datetime
def timestamp(t):
fmt = {"log": "%Y-%m-%d %H:%M:%S", "tms": "%Y%m%d%H%M%S"}
return datetime.datetime.now().strftime(fmt[t])

View File

@@ -1 +0,0 @@
"""Utilità"""

View File

@@ -1 +0,0 @@
"""Config ini setting"""

View File

@@ -1 +0,0 @@
locals

View File

@@ -1 +0,0 @@
"""Parser delle centraline"""

View File

@@ -1 +0,0 @@
"""Utilità per i formati timestamp"""

View File

@@ -1,9 +0,0 @@
"""Funzioni per timestamp
"""
import datetime
def timestamp(t):
fmt = {'log': '%Y-%m-%d %H:%M:%S', 'tms': '%Y%m%d%H%M%S'}
return datetime.datetime.now().strftime(fmt[t])

Binary file not shown.