使用Python连接数据库
Python, Pandas, SQLite, MySQL ·SQLite
sqlite3
为python3
自带,无需另行安装。
连接SQLite数据库时需指定对应的.db文件。
拉取数据
- 使用
sl.connect()
建立与数据库的连接,其中的database
参数为SQLite数据库db文件的路径; - 使用
conn.cursor()
创建游标; - pandas中的
pd.read_sql_query(sql_text, conn)
命令可以直接执行sql
命令并将结果保存为DataFrame; - 依次使用
cur.close()
,conn.close()
关闭游标,关闭连接。
import pandas as pd
import sqlite3 as sl
def load_data_from_table(database_path: str,
table_name: str,
column_names: list = [],
filter_condition: dict = {},
limit_size: int = 100):
column_text = "*" if column_names == [] else ",\n\t".join(column_names)
if filter_condition == {}:
condition_text = ""
else:
condition_text = "WHERE "
for key, value in filter_condition.items():
condition_text += f"{key} = '{value}'"
condition_text += " AND "
condition_text = condition_text[:-5]
limit_text = "" if limit_size == 0 else f"LIMIT {limit_size}"
conn = sl.connect(database=database_path)
cur = conn.cursor()
sql_text = f"""SELECT {column_text}
FROM {table_name}
{condition_text}
{limit_text}
"""
logger.info(f"SQL command:\n{sql_text}")
res = pd.read_sql_query(sql_text, conn)
cur.close()
conn.close()
return res
写入数据
- 创建表前需对DataFrame的数据类型进行转换,SQLite中主要类型为INTER
- 与数据库建立连接并创建游标后使用cur.execute()执行指定的sql命令(此处为创建表命令);
- 使用
conn.commit()
提交修改内容,每次对数据库进行变更操作后都需要执行; - 使用pandas中的
to_sql()
直接将DataFrame写入到指定名称的数据表,可以通过if_exist
参数指定追加还是覆盖,通过index
参数指定是否需要重新索引;
def generate_create_table_sql(dataframe: pd.DataFrame,
table_name: str,
primary_keys: list = []) -> str:
""" 生成创建表的 SQL 语句
Parameters:
dataframe (pd.DataFrame): 包含数据的 DataFrame
table_name (str): 表名
Returns:
str: 创建表的 SQL 语句
"""
# 获取每列的名称和数据类型
columns_info = []
for column_name, dtype in dataframe.dtypes.items():
#将 Pandas 数据类型映射到 SQLite 数据类型
sqlite_data_type = {
"int64": "INTEGER",
"float64": "REAL",
"object": "TEXT",
"datetime64[ns]": "TEXT", # 日期时间类型在 SQLite 中使用 TEXT
}.get(str(dtype), "TEXT")
columns_info.append(
f"{column_name} {sqlite_data_type} PRIMARY KEY"
if column_name in primary_keys else
f"{column_name} {sqlite_data_type}") # 将列信息组合成创建表的 SQL 语句
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (\n"
create_table_sql += ",\n".join(columns_info)
create_table_sql += "\n);"
return create_table_sql
def create_table_from_df(database_path: str,
output_df: pd.DataFrame,
table_name: str,
primary_keys: list = []) -> None:
create_sql_text = generate_create_table_sql(dataframe=output_df,
table_name=table_name,
primary_keys=primary_keys)
conn = sl.connect(database=database_path)
cur = conn.cursor()
cur.execute(f"DROP TABLE IF EXISTS {table_name}")
conn.commit()
cur.execute(create_sql_text)
conn.commit()
output_df.to_sql(name=table_name,
con=conn,
if_exists="append",
index=False)
conn.commit()
cur.close()
conn.close()
MySQL
连接MySQL并进行写入需要安装第三方包pymysql
, sqlalchemy
, mysqlclient
。
注意,使用pip安装sqlalchemy
并不会自动安装mysqlclient
,需要手动安装mysqlclient
。
执行连接时需提供MySQL数据库的host
, username
, password
以及需访问的表名。对于本地的数据库,host
可以为localhost
。
拉取数据
拉取数据与SQLite一致。
def load_data_from_table(database_host: str,
database_user: str,
database_password: str,
database_path: str,
table_name: str,
column_names: list = [],
filter_condition: dict = {},
limit_size: int = 100):
column_text = "*" if column_names == [] else ",\n\t".join(column_names)
if filter_condition == {}:
condition_text = ""
else:
condition_text = "WHERE "
for key, value in filter_condition.items():
condition_text += f"{key} = '{value}'"
condition_text += " AND "
condition_text = condition_text[:-5]
limit_text = "" if limit_size == 0 else f"LIMIT {limit_size}"
conn = mysql.connect(host=database_host,
user=database_user,
password=database_password,
database=database_path)
cur = conn.cursor()
sql_text = f"""SELECT {column_text}
FROM {table_name}
{condition_text}
{limit_text}
"""
logger.info(f"SQL command:\n{sql_text}")
res = pd.read_sql_query(sql_text, conn)
cur.close()
conn.close()
return res
写入数据
写入数据时,由于pandas的to_sql()
仅支持使用 sqlalchemy
进行MySQL数据库的连接,此处需要改用sqlalchemy
的create_engine()
来建立与数据库的连接。
def create_table_from_df(database_host: str,
database_user: str,
database_password: str,
database_path: str,
output_df: pd.DataFrame,
table_name: str,
primary_keys: list = []) -> None:
create_sql_text = generate_create_table_sql(dataframe=output_df,
table_name=table_name,
primary_keys=primary_keys)
conn = mysql.connect(host=database_host,
user=database_user,
password=database_password,
database=database_path)
try:
cur = conn.cursor()
cur.execute(create_sql_text)
conn.commit()
logger.info("Create table successed!")
engine = create_engine(f"mysql://{database_user}:{database_password}@{database_host}/{database_path}")
output_df.to_sql(name=table_name,
con=engine,
if_exists="append",
index=False)
engine.dispose()
logger.info(f"Write in table successed!")
except Exception as e:
conn.rollback()
logger.warning(f"Write in table failed: {e}!")
finally:
cur.close()
conn.close()