使用Python连接数据库

SQLite

sqlite3python3自带,无需另行安装。

连接SQLite数据库时需指定对应的.db文件。

拉取数据

  1. 使用sl.connect()建立与数据库的连接,其中的database参数为SQLite数据库db文件的路径;
  2. 使用conn.cursor()创建游标;
  3. pandas中的pd.read_sql_query(sql_text, conn)命令可以直接执行sql命令并将结果保存为DataFrame;
  4. 依次使用cur.close(), conn.close()关闭游标,关闭连接。
import pandas as pd
import sqlite3 as sl

def load_data_from_table(database_path: str,
                         table_name: str,
                         column_names: list = [],
                         filter_condition: dict = {},
                         limit_size: int = 100):
    column_text = "*" if column_names == [] else ",\n\t".join(column_names)
    if filter_condition == {}:
        condition_text = ""
    else:
        condition_text = "WHERE "
        for key, value in filter_condition.items():
            condition_text += f"{key} = '{value}'"
            condition_text += " AND "
        condition_text = condition_text[:-5]
    limit_text = "" if limit_size == 0 else f"LIMIT {limit_size}"

    conn = sl.connect(database=database_path)
    cur = conn.cursor()
    sql_text = f"""SELECT {column_text}
FROM {table_name}
{condition_text}
{limit_text}
    """
    logger.info(f"SQL command:\n{sql_text}")
    
    res = pd.read_sql_query(sql_text, conn)
    cur.close()
    conn.close()
    return res

写入数据

  1. 创建表前需对DataFrame的数据类型进行转换,SQLite中主要类型为INTER
  2. 与数据库建立连接并创建游标后使用cur.execute()执行指定的sql命令(此处为创建表命令);
  3. 使用conn.commit()提交修改内容,每次对数据库进行变更操作后都需要执行;
  4. 使用pandas中的to_sql()直接将DataFrame写入到指定名称的数据表,可以通过if_exist参数指定追加还是覆盖,通过index参数指定是否需要重新索引;
def generate_create_table_sql(dataframe: pd.DataFrame,
                              table_name: str,
                              primary_keys: list = []) -> str:
    """ 生成创建表的 SQL 语句 
    
    Parameters: 
    dataframe (pd.DataFrame): 包含数据的 DataFrame 
    table_name (str): 表名 
    
    Returns: 
    str: 创建表的 SQL 语句 
    """
    # 获取每列的名称和数据类型
    columns_info = []
    for column_name, dtype in dataframe.dtypes.items():
        #将 Pandas 数据类型映射到 SQLite 数据类型
        sqlite_data_type = {
            "int64": "INTEGER",
            "float64": "REAL",
            "object": "TEXT",
            "datetime64[ns]": "TEXT",  # 日期时间类型在 SQLite 中使用 TEXT
        }.get(str(dtype), "TEXT")
        columns_info.append(
            f"{column_name} {sqlite_data_type} PRIMARY KEY"
            if column_name in primary_keys else
            f"{column_name} {sqlite_data_type}")  # 将列信息组合成创建表的 SQL 语句
    create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (\n"
    create_table_sql += ",\n".join(columns_info)
    create_table_sql += "\n);"
    return create_table_sql
    
def create_table_from_df(database_path: str,
                         output_df: pd.DataFrame,
                         table_name: str,
                         primary_keys: list = []) -> None:
    create_sql_text = generate_create_table_sql(dataframe=output_df,
                                                table_name=table_name,
                                                primary_keys=primary_keys)
    conn = sl.connect(database=database_path)
    cur = conn.cursor()
    cur.execute(f"DROP TABLE IF EXISTS {table_name}")
    conn.commit()
    cur.execute(create_sql_text)
    conn.commit()
    output_df.to_sql(name=table_name,
                     con=conn,
                     if_exists="append",
                     index=False)
    conn.commit()
    cur.close()
    conn.close()

MySQL

连接MySQL并进行写入需要安装第三方包pymysql, sqlalchemy, mysqlclient

注意,使用pip安装sqlalchemy并不会自动安装mysqlclient,需要手动安装mysqlclient

执行连接时需提供MySQL数据库的host, username, password以及需访问的表名。对于本地的数据库,host可以为localhost

拉取数据

拉取数据与SQLite一致。

def load_data_from_table(database_host: str,
                         database_user: str,
                         database_password: str,
                         database_path: str,
                         table_name: str,
                         column_names: list = [],
                         filter_condition: dict = {},
                         limit_size: int = 100):
    column_text = "*" if column_names == [] else ",\n\t".join(column_names)
    if filter_condition == {}:
        condition_text = ""
    else:
        condition_text = "WHERE "
        for key, value in filter_condition.items():
            condition_text += f"{key} = '{value}'"
            condition_text += " AND "
        condition_text = condition_text[:-5]
    limit_text = "" if limit_size == 0 else f"LIMIT {limit_size}"

    conn = mysql.connect(host=database_host,
                         user=database_user,
                         password=database_password,
                         database=database_path)
    cur = conn.cursor()
    sql_text = f"""SELECT {column_text}
FROM {table_name}
{condition_text}
{limit_text}
    """
    logger.info(f"SQL command:\n{sql_text}")

    res = pd.read_sql_query(sql_text, conn)
    cur.close()
    conn.close()
    return res

写入数据

写入数据时,由于pandas的to_sql()仅支持使用 sqlalchemy进行MySQL数据库的连接,此处需要改用sqlalchemycreate_engine()来建立与数据库的连接。

def create_table_from_df(database_host: str,
                         database_user: str,
                         database_password: str,
                         database_path: str,
                         output_df: pd.DataFrame,
                         table_name: str,
                         primary_keys: list = []) -> None:
    create_sql_text = generate_create_table_sql(dataframe=output_df,
                                                table_name=table_name,
                                                primary_keys=primary_keys)
    conn = mysql.connect(host=database_host,
                         user=database_user,
                         password=database_password,
                         database=database_path)
    try:
        cur = conn.cursor()
        cur.execute(create_sql_text)
        conn.commit()
        logger.info("Create table successed!")
        engine = create_engine(f"mysql://{database_user}:{database_password}@{database_host}/{database_path}")
        output_df.to_sql(name=table_name,
                         con=engine,
                         if_exists="append",
                         index=False)
        engine.dispose()
        logger.info(f"Write in table successed!")
    except Exception as e:
        conn.rollback()
        logger.warning(f"Write in table failed: {e}!")
    finally:
        cur.close()
        conn.close()