Module db_management
Модуль для работы с БД.
Expand source code
# This Python file uses the following encoding: utf-8
"""Модуль для работы с БД."""
from configparser import ConfigParser
import os
import psycopg2
import psycopg2.extras
import sqlite3
class DatabaseConnection:
"""
Класс представляет собой абстракцию для работы с базой данных.
Желательно с классом работать изнутри этого модуля. Целевая схема следующая:
создается функция, которая будет вызываться извне, def function
внутри функции определяется запрос к БД.
db = db_connection() // создается экземпляр класса
db.open() // открывается подключение
result = db.execute... // выполняется запрос
db.close() // закрывается подключение
Методы класса:
open создать подключение к БД.
close закрыть подключение.
execute выполняет запрос и возвращает результат в виде списка словарей.
execute_scalar выполняет запрос и возвращает результат в виде одного значения,
нужно использовать в запросах типа "select count(x) from" или
"select top 1 x from".
execute_non_query необходимо использовать для запросов, которые изменяют данные,
такие как: "insert", "update".
execute_script выполняет скрипт из *.sql-файла.
test_connection проверить возможность подключения:
0 - всё в порядке;
1 - подключение есть, отсутствует структура, можно вызвать метод create_db;
2 - что-то непонятное, нужно искать причины.
Атрибуты класса (извне не используются):
rdbms тип БД.
connection_string строка подключения.
connection текущее подключение.
"""
def __init__(self):
"""Описание."""
config = ConfigParser()
config.read('options.ini')
self.db_connection_string = config['DATABASE']['db_connection_string']
self.debug = config['DATABASE']['debug']
self.rdbms = config['DATABASE']['rdbms']
def create_db(self):
"""Создание структуры базы данных."""
try:
self.open()
if self.rdbms == "sqlite":
self.execute_script("cicd/db/sqlite_create_db.sql")
elif self.rdbms == "postgresql":
self.execute_script("cicd/db/postgres_create_db.sql")
# Заполнение таблицы mac_owners
with open('cicd/db/macs.txt', encoding="utf-8") as file:
lines = file.read().splitlines()
query = 'insert into mac_owners(mac, manufacturer) values '
for line in lines:
mac, owner = line[0:6].replace('\'', '\'\''), line[11:].replace('\'', '\'\'')
query += '(\'' + mac + '\', \'' + owner + '\'),'
query = query[0:-1] + ';'
self.execute_non_query(query)
# Тестовые наборы данных для отладки
if self.debug:
self.execute_script("cicd/db/debug_data.sql")
self.close()
except Exception as error:
print(f'Что-то пошло не так! Ошибка:{error}')
def open(self):
"""Открытие."""
if self.rdbms == "sqlite":
self.connection = sqlite3.connect(self.db_connection_string)
elif self.rdbms == "postgresql":
self.connection = psycopg2.connect(self.db_connection_string)
def close(self):
"""Закрытие."""
self.connection.close()
def test_connection(self):
"""Проверка соединения."""
if self.rdbms == "sqlite":
if os.path.exists(self.db_connection_string):
return 'BASE EXISTS'
else:
return 'BASE NOT EXISTS'
elif self.rdbms == "postgresql":
# проверка доступности базы данных
try:
self.open()
# проверка наличия структуры в базе данных
query = "select count(table_name) _count from information_schema.tables WHERE table_schema='public'"
table_count = self.execute_scalar(query)
if table_count == 0:
self.close()
return 'BASE NOT EXISTS'
else:
self.close()
return 'BASE EXISTS'
except Exception as e:
ERROR = str(e)
return ERROR
ERROR = 'База не SQL или Postgres.'
return ERROR
def dict_factory(self, cursor, row):
"""Описание."""
dick = {}
for idx, col in enumerate(cursor.description):
dick[col[0]] = row[idx]
return dick
def execute(self, query):
"""Выполняет запрос и возвращает результат в виде списка словарей."""
if self.rdbms == 'sqlite':
self.connection.row_factory = self.dict_factory
cursor = self.connection.cursor()
elif self.rdbms == 'postgresql':
cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(query)
result = cursor.fetchall()
cursor.close()
return result
def execute_non_query(self, query, data=''):
"""Необходимо использовать для запросов, которые изменяют данные "insert", "update"."""
cursor = self.connection.cursor()
cursor.execute(query, data)
self.connection.commit()
cursor.close()
return True
def execute_scalar(self, query, data=''):
"""
Выполняет запрос и возвращает результат в виде одного значения.
Нужно использовать в запросах типа "select count(x) from" или "select top 1 x from".
"""
cursor = self.connection.cursor()
cursor.execute(query, data)
result = cursor.fetchone()[0]
cursor.close()
return result
def execute_script(self, path):
"""Выполняет скрипт из *.sql-файла."""
cursor = self.connection.cursor()
with open(path, 'r') as file:
query = file.read().replace('\n', ' ').replace('\t', '')
if self.rdbms == 'sqlite':
cursor.executescript(query)
elif self.rdbms == 'postgresql':
cursor.execute(query)
self.connection.commit()
cursor.close()
return True
def get_value(data):
"""Описание."""
if data is None:
value = 'null'
elif not data.isdigit():
value = '\'' + data.replace('\'', '\'\'') + '\''
else:
value = data
return value
# Функции для работы извне
#
# Вставка словаря в соответствующую таблицу:
#
# data словарь
# table имя таблицы
# conn подключение
#
# Если подключение не передаётся в качестве параметра, то создается подключение по умолчанию,
# которое закрывается после выполнения.
def insert_data(data, table, conn='not_created'):
"""Добавляем данные в базу."""
columns, values = '', ''
for key in data:
if columns == '':
columns += key
values += get_value(data[key])
else:
columns += ', ' + key
values += ', ' + get_value(data[key])
query = f'insert into {table}({columns}) values({values})'
connection = conn
if connection == 'not_created':
db = DatabaseConnection()
db.open()
db.execute_non_query(query)
if connection == 'not_created':
db.close()
def new_syslog_event(event, db):
"""Описание."""
cursor = db.connection.cursor()
cursor.callproc('new_syslog_event', [event['priority'], event['device_time'], event['from_host'], event['process'],
event['syslog_tag'], event['message'], event['mac']])
db.connection.commit()
cursor.close()
def login_exists(login):
"""
Проверка существования логина.
login проверяемый логин.
"""
query = 'select count(1) _count from [admin] where [login] =:login'
data = {'login': login}
db = DatabaseConnection()
db.open()
result = (int(db.execute_scalar(query, data)) > 0)
db.close()
return result
def get_events(all_events=True, only_unknown_mac=False, started_at='', ended_at='', mac=''):
"""
Выборка событий из syslog'а.
all_events получать только c тэгами link-up и LINK_DOWN.
only_unknown_mac получить события только с неизвестными mac'ами.
started_at начало диапазона.
ended_at конец диапазона.
По умолчанию в выборку попадают все события за последний два часа.
"""
db = DatabaseConnection()
query = ''
if db.rdbms == 'sqlite':
if started_at == '':
started_at = 'datetime(\'now\',\'-2000 hour\', \'localtime\')'
if ended_at == '':
ended_at = 'datetime(\'now\', \'localtime\')'
elif db.rdbms == 'postgresql':
if started_at == '':
started_at = 'current_timestamp + interval \'-2000 hour\''
if ended_at == '':
ended_at = 'current_timestamp'
if db.rdbms == 'sqlite':
query = '''select
receivedat, '''
elif db.rdbms == 'postgresql':
query = '''select
receivedat::timestamp(0) receivedat,'''
query += '''priority,
from_host,
process,
syslog_tag,
case
when mac_addresses.mac is null then ''
when mac_addresses.wellknown = 1 then 'wellknown'
when mac_addresses.mac is not null and (mac_addresses.wellknown is null or mac_addresses.wellknown = 0) then 'unknown'
end mac_type,
message,
syslog.mac
from
syslog
left join
mac_addresses
on upper(syslog.mac) = upper(mac_addresses.mac)
where
receivedat > %(started_at)s
and
receivedat < %(ended_at)s
''' % {'started_at': started_at, 'ended_at': ended_at}
if not all_events:
query += ' and (syslog_tag like \'%link-up%\' or syslog_tag like \'%LINK_DOWN%\')'
if mac != '':
query += 'and syslog.mac = %(mac)s' % {'mac': mac}
query += ''' order by receivedat desc limit 500'''
db.open()
result = db.execute(query)
db.close()
# print(query)
return result
def get_current_state(only_unknown=False):
"""
Выборка текущих подключений к сетевому оборудованию.
from_host
port
mac
mac_type
manufacturer
up_time
Фильтр: only_unknown - только "недоверенные" mac-адресы.
"""
query = '''select
from_host,
port,
current_state.mac mac,
case
when mac_addresses.mac is null then ''
when mac_addresses.wellknown = 1 then 'wellknown'
when mac_addresses.mac is not null and (mac_addresses.wellknown is null or mac_addresses.wellknown = 0) then 'unknown'
end mac_type,
manufacturer,
started_at up_time
from
current_state
join
mac_addresses
on
upper(current_state.mac) = upper(mac_addresses.mac)
left join
mac_owners
on
upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac
where
state = 1
'''
if only_unknown:
query += " and mac_addresses.mac is not null and " \
"(mac_addresses.wellknown is null or mac_addresses.wellknown = 0) "
query += "order by from_host, port"
db = DatabaseConnection()
db.open()
result = db.execute(query)
db.close()
return result
def get_mac(status):
"""
Выборка всех mac-адресов по критерию.
В зависимости от переданного параметра:
status = 'wellknown' - выборка доверенных маков.
status = 'unknown' - выборка недоверенных маков.
"""
query_for_wellknown = '''select
mac_addresses.mac mac,
mac_addresses.wellknown_author wellknown_author,
mac_addresses.description description,
mac_addresses.wellknown_started_at wellknown_started_at,
mac_owners.manufacturer manufacturer
from
mac_addresses
left join
mac_owners
on
upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac
where
wellknown = 1
'''
query_for_unknown = '''select
mac_addresses.mac mac,
mac_owners.manufacturer manufacturer
from
mac_addresses
left join
mac_owners
on
upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac
where
wellknown = 0
or
wellknown is null
'''
db = DatabaseConnection()
db.open()
if status == 'wellknown':
result = db.execute(query_for_wellknown)
elif status == 'unknown':
result = db.execute(query_for_unknown)
db.close()
return result
def set_mac_to_wellknown(mac, login, description):
"""Установка признака "доверенный" для mac-адреса."""
db = DatabaseConnection()
query = '''update
mac_addresses
set
wellknown = 1,
wellknown_author =:login,
description =:description,
'''
data = {'login': login, 'description': description}
if db.rdbms == 'sqlite':
query += '''wellknown_started_at = datetime('now','localtime')'''
elif db.rdbms == 'postgresql':
query += '''wellknown_started_at = current_timestamp'''
query += ''' where
mac = '%(mac)s'
''' % {'mac': mac}
db.open()
db.execute_non_query(query, data)
db.close()
def set_mac_to_unknown(mac, login):
"""Удаление признака "доверенный" для mac-адреса."""
db = DatabaseConnection()
query = '''update
mac_addresses
set
wellknown = 0,
wellknown_author =:login,
description = '',
'''
data = {'login': login}
if db.rdbms == 'sqlite':
query += '''wellknown_started_at = datetime('now','localtime')'''
elif db.rdbms == 'postgresql':
query += '''wellknown_started_at = current_timestamp'''
query += ''' where
mac = '%(mac)s'
''' % {'mac': mac}
db.open()
db.execute_non_query(query, data)
db.close()
def flask_logon(login, hash_sha384):
"""Аутентификация."""
query = '''
select
count(1) _count
from
admin
where
login =:login
and
hash =:hash
'''
data = {'login': login, 'hash': hash_sha384}
db = DatabaseConnection()
db.open()
result = (int(db.execute_scalar(query, data)) > 0)
db.close()
return result
Functions
def flask_logon(login, hash_sha384)
-
Аутентификация.
Expand source code
def flask_logon(login, hash_sha384): """Аутентификация.""" query = ''' select count(1) _count from admin where login =:login and hash =:hash ''' data = {'login': login, 'hash': hash_sha384} db = DatabaseConnection() db.open() result = (int(db.execute_scalar(query, data)) > 0) db.close() return result
def get_current_state(only_unknown=False)
-
Выборка текущих подключений к сетевому оборудованию.
from_host port mac mac_type manufacturer up_time
Фильтр: only_unknown - только "недоверенные" mac-адресы.
Expand source code
def get_current_state(only_unknown=False): """ Выборка текущих подключений к сетевому оборудованию. from_host port mac mac_type manufacturer up_time Фильтр: only_unknown - только "недоверенные" mac-адресы. """ query = '''select from_host, port, current_state.mac mac, case when mac_addresses.mac is null then '' when mac_addresses.wellknown = 1 then 'wellknown' when mac_addresses.mac is not null and (mac_addresses.wellknown is null or mac_addresses.wellknown = 0) then 'unknown' end mac_type, manufacturer, started_at up_time from current_state join mac_addresses on upper(current_state.mac) = upper(mac_addresses.mac) left join mac_owners on upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac where state = 1 ''' if only_unknown: query += " and mac_addresses.mac is not null and " \ "(mac_addresses.wellknown is null or mac_addresses.wellknown = 0) " query += "order by from_host, port" db = DatabaseConnection() db.open() result = db.execute(query) db.close() return result
def get_events(all_events=True, only_unknown_mac=False, started_at='', ended_at='', mac='')
-
Выборка событий из syslog'а.
all_events получать только c тэгами link-up и LINK_DOWN. only_unknown_mac получить события только с неизвестными mac'ами. started_at начало диапазона. ended_at конец диапазона.
По умолчанию в выборку попадают все события за последний два часа.
Expand source code
def get_events(all_events=True, only_unknown_mac=False, started_at='', ended_at='', mac=''): """ Выборка событий из syslog'а. all_events получать только c тэгами link-up и LINK_DOWN. only_unknown_mac получить события только с неизвестными mac'ами. started_at начало диапазона. ended_at конец диапазона. По умолчанию в выборку попадают все события за последний два часа. """ db = DatabaseConnection() query = '' if db.rdbms == 'sqlite': if started_at == '': started_at = 'datetime(\'now\',\'-2000 hour\', \'localtime\')' if ended_at == '': ended_at = 'datetime(\'now\', \'localtime\')' elif db.rdbms == 'postgresql': if started_at == '': started_at = 'current_timestamp + interval \'-2000 hour\'' if ended_at == '': ended_at = 'current_timestamp' if db.rdbms == 'sqlite': query = '''select receivedat, ''' elif db.rdbms == 'postgresql': query = '''select receivedat::timestamp(0) receivedat,''' query += '''priority, from_host, process, syslog_tag, case when mac_addresses.mac is null then '' when mac_addresses.wellknown = 1 then 'wellknown' when mac_addresses.mac is not null and (mac_addresses.wellknown is null or mac_addresses.wellknown = 0) then 'unknown' end mac_type, message, syslog.mac from syslog left join mac_addresses on upper(syslog.mac) = upper(mac_addresses.mac) where receivedat > %(started_at)s and receivedat < %(ended_at)s ''' % {'started_at': started_at, 'ended_at': ended_at} if not all_events: query += ' and (syslog_tag like \'%link-up%\' or syslog_tag like \'%LINK_DOWN%\')' if mac != '': query += 'and syslog.mac = %(mac)s' % {'mac': mac} query += ''' order by receivedat desc limit 500''' db.open() result = db.execute(query) db.close() # print(query) return result
def get_mac(status)
-
Выборка всех mac-адресов по критерию.
В зависимости от переданного параметра: status = 'wellknown' - выборка доверенных маков. status = 'unknown' - выборка недоверенных маков.
Expand source code
def get_mac(status): """ Выборка всех mac-адресов по критерию. В зависимости от переданного параметра: status = 'wellknown' - выборка доверенных маков. status = 'unknown' - выборка недоверенных маков. """ query_for_wellknown = '''select mac_addresses.mac mac, mac_addresses.wellknown_author wellknown_author, mac_addresses.description description, mac_addresses.wellknown_started_at wellknown_started_at, mac_owners.manufacturer manufacturer from mac_addresses left join mac_owners on upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac where wellknown = 1 ''' query_for_unknown = '''select mac_addresses.mac mac, mac_owners.manufacturer manufacturer from mac_addresses left join mac_owners on upper(substr(replace(mac_addresses.mac,':',''),1,6)) = mac_owners.mac where wellknown = 0 or wellknown is null ''' db = DatabaseConnection() db.open() if status == 'wellknown': result = db.execute(query_for_wellknown) elif status == 'unknown': result = db.execute(query_for_unknown) db.close() return result
def get_value(data)
-
Описание.
Expand source code
def get_value(data): """Описание.""" if data is None: value = 'null' elif not data.isdigit(): value = '\'' + data.replace('\'', '\'\'') + '\'' else: value = data return value
def insert_data(data, table, conn='not_created')
-
Добавляем данные в базу.
Expand source code
def insert_data(data, table, conn='not_created'): """Добавляем данные в базу.""" columns, values = '', '' for key in data: if columns == '': columns += key values += get_value(data[key]) else: columns += ', ' + key values += ', ' + get_value(data[key]) query = f'insert into {table}({columns}) values({values})' connection = conn if connection == 'not_created': db = DatabaseConnection() db.open() db.execute_non_query(query) if connection == 'not_created': db.close()
def login_exists(login)
-
Проверка существования логина.
login проверяемый логин.
Expand source code
def login_exists(login): """ Проверка существования логина. login проверяемый логин. """ query = 'select count(1) _count from [admin] where [login] =:login' data = {'login': login} db = DatabaseConnection() db.open() result = (int(db.execute_scalar(query, data)) > 0) db.close() return result
def new_syslog_event(event, db)
-
Описание.
Expand source code
def new_syslog_event(event, db): """Описание.""" cursor = db.connection.cursor() cursor.callproc('new_syslog_event', [event['priority'], event['device_time'], event['from_host'], event['process'], event['syslog_tag'], event['message'], event['mac']]) db.connection.commit() cursor.close()
def set_mac_to_unknown(mac, login)
-
Удаление признака "доверенный" для mac-адреса.
Expand source code
def set_mac_to_unknown(mac, login): """Удаление признака "доверенный" для mac-адреса.""" db = DatabaseConnection() query = '''update mac_addresses set wellknown = 0, wellknown_author =:login, description = '', ''' data = {'login': login} if db.rdbms == 'sqlite': query += '''wellknown_started_at = datetime('now','localtime')''' elif db.rdbms == 'postgresql': query += '''wellknown_started_at = current_timestamp''' query += ''' where mac = '%(mac)s' ''' % {'mac': mac} db.open() db.execute_non_query(query, data) db.close()
def set_mac_to_wellknown(mac, login, description)
-
Установка признака "доверенный" для mac-адреса.
Expand source code
def set_mac_to_wellknown(mac, login, description): """Установка признака "доверенный" для mac-адреса.""" db = DatabaseConnection() query = '''update mac_addresses set wellknown = 1, wellknown_author =:login, description =:description, ''' data = {'login': login, 'description': description} if db.rdbms == 'sqlite': query += '''wellknown_started_at = datetime('now','localtime')''' elif db.rdbms == 'postgresql': query += '''wellknown_started_at = current_timestamp''' query += ''' where mac = '%(mac)s' ''' % {'mac': mac} db.open() db.execute_non_query(query, data) db.close()
Classes
class DatabaseConnection
-
Класс представляет собой абстракцию для работы с базой данных.
Желательно с классом работать изнутри этого модуля. Целевая схема следующая: создается функция, которая будет вызываться извне, def function внутри функции определяется запрос к БД.
db = db_connection() // создается экземпляр класса db.open() // открывается подключение result = db.execute… // выполняется запрос db.close() // закрывается подключение
Методы класса:
open создать подключение к БД. close закрыть подключение. execute выполняет запрос и возвращает результат в виде списка словарей. execute_scalar выполняет запрос и возвращает результат в виде одного значения, нужно использовать в запросах типа "select count(x) from" или "select top 1 x from". execute_non_query необходимо использовать для запросов, которые изменяют данные, такие как: "insert", "update". execute_script выполняет скрипт из *.sql-файла. test_connection проверить возможность подключения: 0 - всё в порядке; 1 - подключение есть, отсутствует структура, можно вызвать метод create_db; 2 - что-то непонятное, нужно искать причины.
Атрибуты класса (извне не используются):
rdbms тип БД. connection_string строка подключения. connection текущее подключение.
Описание.
Expand source code
class DatabaseConnection: """ Класс представляет собой абстракцию для работы с базой данных. Желательно с классом работать изнутри этого модуля. Целевая схема следующая: создается функция, которая будет вызываться извне, def function внутри функции определяется запрос к БД. db = db_connection() // создается экземпляр класса db.open() // открывается подключение result = db.execute... // выполняется запрос db.close() // закрывается подключение Методы класса: open создать подключение к БД. close закрыть подключение. execute выполняет запрос и возвращает результат в виде списка словарей. execute_scalar выполняет запрос и возвращает результат в виде одного значения, нужно использовать в запросах типа "select count(x) from" или "select top 1 x from". execute_non_query необходимо использовать для запросов, которые изменяют данные, такие как: "insert", "update". execute_script выполняет скрипт из *.sql-файла. test_connection проверить возможность подключения: 0 - всё в порядке; 1 - подключение есть, отсутствует структура, можно вызвать метод create_db; 2 - что-то непонятное, нужно искать причины. Атрибуты класса (извне не используются): rdbms тип БД. connection_string строка подключения. connection текущее подключение. """ def __init__(self): """Описание.""" config = ConfigParser() config.read('options.ini') self.db_connection_string = config['DATABASE']['db_connection_string'] self.debug = config['DATABASE']['debug'] self.rdbms = config['DATABASE']['rdbms'] def create_db(self): """Создание структуры базы данных.""" try: self.open() if self.rdbms == "sqlite": self.execute_script("cicd/db/sqlite_create_db.sql") elif self.rdbms == "postgresql": self.execute_script("cicd/db/postgres_create_db.sql") # Заполнение таблицы mac_owners with open('cicd/db/macs.txt', encoding="utf-8") as file: lines = file.read().splitlines() query = 'insert into mac_owners(mac, manufacturer) values ' for line in lines: mac, owner = line[0:6].replace('\'', '\'\''), line[11:].replace('\'', '\'\'') query += '(\'' + mac + '\', \'' + owner + '\'),' query = query[0:-1] + ';' self.execute_non_query(query) # Тестовые наборы данных для отладки if self.debug: self.execute_script("cicd/db/debug_data.sql") self.close() except Exception as error: print(f'Что-то пошло не так! Ошибка:{error}') def open(self): """Открытие.""" if self.rdbms == "sqlite": self.connection = sqlite3.connect(self.db_connection_string) elif self.rdbms == "postgresql": self.connection = psycopg2.connect(self.db_connection_string) def close(self): """Закрытие.""" self.connection.close() def test_connection(self): """Проверка соединения.""" if self.rdbms == "sqlite": if os.path.exists(self.db_connection_string): return 'BASE EXISTS' else: return 'BASE NOT EXISTS' elif self.rdbms == "postgresql": # проверка доступности базы данных try: self.open() # проверка наличия структуры в базе данных query = "select count(table_name) _count from information_schema.tables WHERE table_schema='public'" table_count = self.execute_scalar(query) if table_count == 0: self.close() return 'BASE NOT EXISTS' else: self.close() return 'BASE EXISTS' except Exception as e: ERROR = str(e) return ERROR ERROR = 'База не SQL или Postgres.' return ERROR def dict_factory(self, cursor, row): """Описание.""" dick = {} for idx, col in enumerate(cursor.description): dick[col[0]] = row[idx] return dick def execute(self, query): """Выполняет запрос и возвращает результат в виде списка словарей.""" if self.rdbms == 'sqlite': self.connection.row_factory = self.dict_factory cursor = self.connection.cursor() elif self.rdbms == 'postgresql': cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute(query) result = cursor.fetchall() cursor.close() return result def execute_non_query(self, query, data=''): """Необходимо использовать для запросов, которые изменяют данные "insert", "update".""" cursor = self.connection.cursor() cursor.execute(query, data) self.connection.commit() cursor.close() return True def execute_scalar(self, query, data=''): """ Выполняет запрос и возвращает результат в виде одного значения. Нужно использовать в запросах типа "select count(x) from" или "select top 1 x from". """ cursor = self.connection.cursor() cursor.execute(query, data) result = cursor.fetchone()[0] cursor.close() return result def execute_script(self, path): """Выполняет скрипт из *.sql-файла.""" cursor = self.connection.cursor() with open(path, 'r') as file: query = file.read().replace('\n', ' ').replace('\t', '') if self.rdbms == 'sqlite': cursor.executescript(query) elif self.rdbms == 'postgresql': cursor.execute(query) self.connection.commit() cursor.close() return True
Methods
def close(self)
-
Закрытие.
Expand source code
def close(self): """Закрытие.""" self.connection.close()
def create_db(self)
-
Создание структуры базы данных.
Expand source code
def create_db(self): """Создание структуры базы данных.""" try: self.open() if self.rdbms == "sqlite": self.execute_script("cicd/db/sqlite_create_db.sql") elif self.rdbms == "postgresql": self.execute_script("cicd/db/postgres_create_db.sql") # Заполнение таблицы mac_owners with open('cicd/db/macs.txt', encoding="utf-8") as file: lines = file.read().splitlines() query = 'insert into mac_owners(mac, manufacturer) values ' for line in lines: mac, owner = line[0:6].replace('\'', '\'\''), line[11:].replace('\'', '\'\'') query += '(\'' + mac + '\', \'' + owner + '\'),' query = query[0:-1] + ';' self.execute_non_query(query) # Тестовые наборы данных для отладки if self.debug: self.execute_script("cicd/db/debug_data.sql") self.close() except Exception as error: print(f'Что-то пошло не так! Ошибка:{error}')
def dict_factory(self, cursor, row)
-
Описание.
Expand source code
def dict_factory(self, cursor, row): """Описание.""" dick = {} for idx, col in enumerate(cursor.description): dick[col[0]] = row[idx] return dick
def execute(self, query)
-
Выполняет запрос и возвращает результат в виде списка словарей.
Expand source code
def execute(self, query): """Выполняет запрос и возвращает результат в виде списка словарей.""" if self.rdbms == 'sqlite': self.connection.row_factory = self.dict_factory cursor = self.connection.cursor() elif self.rdbms == 'postgresql': cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute(query) result = cursor.fetchall() cursor.close() return result
def execute_non_query(self, query, data='')
-
Необходимо использовать для запросов, которые изменяют данные "insert", "update".
Expand source code
def execute_non_query(self, query, data=''): """Необходимо использовать для запросов, которые изменяют данные "insert", "update".""" cursor = self.connection.cursor() cursor.execute(query, data) self.connection.commit() cursor.close() return True
def execute_scalar(self, query, data='')
-
Выполняет запрос и возвращает результат в виде одного значения.
Нужно использовать в запросах типа "select count(x) from" или "select top 1 x from".
Expand source code
def execute_scalar(self, query, data=''): """ Выполняет запрос и возвращает результат в виде одного значения. Нужно использовать в запросах типа "select count(x) from" или "select top 1 x from". """ cursor = self.connection.cursor() cursor.execute(query, data) result = cursor.fetchone()[0] cursor.close() return result
def execute_script(self, path)
-
Выполняет скрипт из *.sql-файла.
Expand source code
def execute_script(self, path): """Выполняет скрипт из *.sql-файла.""" cursor = self.connection.cursor() with open(path, 'r') as file: query = file.read().replace('\n', ' ').replace('\t', '') if self.rdbms == 'sqlite': cursor.executescript(query) elif self.rdbms == 'postgresql': cursor.execute(query) self.connection.commit() cursor.close() return True
def open(self)
-
Открытие.
Expand source code
def open(self): """Открытие.""" if self.rdbms == "sqlite": self.connection = sqlite3.connect(self.db_connection_string) elif self.rdbms == "postgresql": self.connection = psycopg2.connect(self.db_connection_string)
def test_connection(self)
-
Проверка соединения.
Expand source code
def test_connection(self): """Проверка соединения.""" if self.rdbms == "sqlite": if os.path.exists(self.db_connection_string): return 'BASE EXISTS' else: return 'BASE NOT EXISTS' elif self.rdbms == "postgresql": # проверка доступности базы данных try: self.open() # проверка наличия структуры в базе данных query = "select count(table_name) _count from information_schema.tables WHERE table_schema='public'" table_count = self.execute_scalar(query) if table_count == 0: self.close() return 'BASE NOT EXISTS' else: self.close() return 'BASE EXISTS' except Exception as e: ERROR = str(e) return ERROR ERROR = 'База не SQL или Postgres.' return ERROR