关系型数据库代码

连接

$ docker exec -it mysql mysql -u root -p

模型

从代码定义模型

from sqlalchemy import MetaData, Table, Column, Integer, String, Date, ForeignKey, Boolean

from _connection import engine


meta = MetaData()

# 定义表结构
students = Table(
    'students', meta,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('first_name', String(128), unique=True, nullable=False),
    Column('last_name', String(128)),
    Column('birthday', Date, nullable=False),
    Column('is_delete', Boolean, default=False),
)

# 关联表
# ============================================

# 部门表
department = Table(
    'department', meta,
    Column('id', Integer, primary_key=True),
    Column('name', String(128), unique=True, nullable=False),
    Column('leader', String(128), nullable=False, comment="领导人")
)

# 员工表
employee = Table(
    "employee", meta,
    Column('id', Integer, primary_key=True),
    # 外键
    Column("department_id", Integer, ForeignKey("department.id"), nullable=False),
    Column('name', String(255), nullable=False),
    Column('birthday', Date, nullable=False),
)


# 建表, 如果是从已存在的数据库执行任务,则这行没有多大必要存在
# =============================================================================
meta.create_all(engine)

从db生成模型代码

支持sqlalchemy2.X版本的sqlacodegen3.X仍未正式发布,等发布后再整理

sqlalchemy支持映射表结构到Table类: https://docs.sqlalchemy.org/en/20/core/reflection.html#reflecting-database-objects

使用场景之一是同步多个cmdb资源,每一个资源对应一张info表, 需要更新哪些字段是配置在数据库中的。此时可以自动映射表结构到Table, 然后编写 函数动态写入数据

from sqlalchemy import MetaData
from sqlalchemy.engine import Engine
from sqlalchemy.dialects.mysql import insert

def sync_cmdb_task(engine: Engine, table_name, inputlist: list[dict], updatelist: list):
    """"
    :param inputlist: 资源数据
    :param: updatelist: 有则更新的字段
    """
    # 自动映射表结构
    table = Table(table_name, MetaData(), autoload_with=engine)

    with engine.connect() as conn:
        stmt = insert(table).values(inputlist)
        try:
            duplicate_key_data = {column: insert_stmt.inserted[column] for column in updatelist}
        except KeyError as err:
            raise AssertionError("该字段不存在: " + str(err)) from err
        duplicate_key_data['is_delete'] = 0
        duplicate_key_data['deletetime'] = None
        stmt = stmt.on_duplicate_key_update(**duplicate_key_data)
        conn.execute(stmt)
        conn.commit()

基本CRUD

from sqlalchemy import Table, Engine

engine: Engine
students: Table

# 构造执行sql
insert_tome = students.insert().values(first_name='Tome', last_name='Smith')

with engine.connect() as conn:
    result = conn.execute(insert_tome)
    print(result.inserted_primary_key)
    conn.commit()

    # 写入多条数据
    person_insert = students.insert()
    conn.execute(person_insert, [
        {"first_name": "James", "last_name": "d"},
        {"first_name": "chan", "last_name": "shiyu"},
        {"first_name": "parado", "last_name": "x"},
    ])
    conn.commit()

更新

from datetime import date

from sqlalchemy import update

from _connection import engine
from _tables import students


with engine.connect() as conn:
    update_sql = students.update().where(students.c.id == 5).values(name='Maria')
    conn.execute(update_sql)

    update_sql2 = update(students).where(students.c.id.in_(5, 6)).values(name='Maria')
    conn.execute(update_sql2)
    conn.commit()

# 基于ORM的更新
# =================================
from sqlalchemy.orm import Session
from _models import Employee

with Session(engine) as session:
    session.execute(
        update(Employee),
        [
            # 根据id更新,不需要额外写where条件
            {"id": 2, "birthday": date(1999, 2, 9)},
            {"id": 5, "name": "Samuel"},
        ]
    )
    session.commit()

删除

from sqlalchemy.sql import delete

from _connection import engine
from _tables import students

with engine.connect() as conn:
    delete_sql = students.delete().where(students.c.id == 7)
    conn.execute(delete_sql)
    conn.commit()

# 基于orm的删除
# ==================================
from _models import Employee


with engine.connect() as conn:
    conn.execute(
        delete(Employee).where(Employee.name.in_(["Jack", 'Smith']))
    )
    conn.commit()

写入

from datetime import date

from sqlalchemy.sql import insert, select
from sqlalchemy.dialects.mysql import insert as mysql_insert

from _connection import engine
from _tables import students

with engine.connect() as conn:
    # 构造insert sql
    sql = students.insert().values(first_name='Tome', last_name='Smith', birthday=date(2000, 12, 12))
    result = conn.execute(sql)
    print(result.inserted_primary_key)

    # 写入多条数据
    person_insert = students.insert()
    conn.execute(person_insert, [
        {"first_name": "James", "last_name": "d", "birthday": date(2000, 3, 14)},
        {"first_name": "chan", "last_name": "shiyu", "birthday": date(1998, 8, 12)},
        {"first_name": "parado", "last_name": "x", "birthday": date(1998, 7, 30)},
    ])
    conn.commit()

    # INSERT…ON DUPLICATE KEY UPDATE (Upsert)
    # --------------------------------------------------------------------------
    # https://docs.sqlalchemy.org/en/20/dialects/mysql.html#insert-on-duplicate-key-update-upsert
    insert_stmt = mysql_insert(students).values(
        [
            {"first_name": "James", "last_name": "d", "birthday": date(2000, 3, 14)},
            {"first_name": "chan", "last_name": "shiyu", "birthday": date(1998, 8, 12)},
            {"first_name": "parado", "last_name": "x", "birthday": date(1998, 7, 30)},
        ]
    )
    on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
        # inserted指被写入的数据, 转换成`VALUES(first_name)` 的效果
        first_name=insert_stmt.inserted.first_name,
        # 可以动态指定字段
        last_name=insert_stmt.inserted['last_name'],
        # 可以设置一个固定值
        is_delete=0,
    )
    conn.execute(on_duplicate_key_stmt)


# 事务
# ======================================================
with engine.connect() as conn, conn.begin():
    # 没异常自动提交
    sql = students.insert().values(first_name='Tome', last_name='Smith', birthday=date(2000, 12, 12))
    result = conn.execute(sql)
    # 出现异常自动回滚
    conn.execute("select 1/0")


# 基于ORM的写入
# ====================================================
from sqlalchemy.orm import Session
from _models import Department, Employee

with Session(engine) as session:
    session.execute(
        insert(Department).values(
            [
                {"name": "QA"},
                {"name": "Sales"},
            ]
        )
    )
    session.commit()

    # 带关联的批量插入
    # --------------------------------------------------
    session.execute(
        insert(Employee).values(
            [
                {
                    "dep_id": select(Department.id).where(Department.name == "hr"),
                    "name": "www",
                    "birthday": date(2000, 1, 19),
                 },
                {
                    "dep_id": select(Department.id).where(Department.name == "market"),
                    "name": "YYY",
                    "birthday": date(2000, 2, 19),
                }
            ]
        )
    )

    # 自动提交和回滚
    # ---------------------------------------------
    with session.begin():
        dep = Department(name="oooo")
        session.add(dep)


# 关联表写入
# ====================================================
from _tables import department, employee

with engine.connect() as conn:
    conn.execute(department.insert(), [
        {"name": "hr"},
        {"name": "it"},
    ])

    conn.execute(employee.insert(), [
        {"department_id": 1, "name": "Jack"},
        {"department_id": 1, "name": "Tom"},
        {"department_id": 1, "name": "Mary"},
        {"department_id": 2, "name": "Smith"},
        {"department_id": 2, "name": "Rose"},
        {"department_id": 2, "name": "Leon"},
    ])
    conn.commit()

INSERT ON DUPLICATE KEY UPDATE

package main

import (
    "gorm.io/driver/mysql"
    "gorm.io/gorm"
    "gorm.io/gorm/clause"
    "log"
    "script/dao/model"
    "script/dao/query"
)

var q *query.Query

func init() {
    dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
    db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("连接数据库失败, error=" + err.Error())
    }

    _ = db.AutoMigrate(&model.Customer{}, &model.CreditCard{})
    q = query.Use(db)

    // 初始化数据
    _ = q.Customer.Create(&model.Customer{
        ID: 1,
        CreditCards: []model.CreditCard{
            {
                ID:     1,
                Number: "xxx",
            },
        },
    })
}

func main() {

    creditCards := []*model.CreditCard{
        {
            ID:     1,
            Number: "xxxx",
        },
        {
            ID:     2,
            Number: "yyyy",
        },
    }
    // https://gorm.io/zh_CN/gen/create.html#Upsert-x2F-On-Conflict
    err := q.CreditCard.Clauses(clause.OnConflict{
        // 如果出现id冲突时, 更新指定的字段
        DoUpdates: clause.AssignmentColumns(
            []string{"number"}),
    }).CreateInBatches(creditCards, 100)
    if err != nil {
        panic(err)
    }
}

高级查询

关联查询

知识点:

  • 条件查询

  • 关联查询

from sqlalchemy.sql import and_, or_, select, outerjoin

from _connection import engine
from _tables import students

with engine.connect() as conn:
    query = students.select() \
        .where(or_(
        students.c.birthday > '2000-10-13'),
        and_(students.c.first_name == "li")
    )
    result_set = conn.execute(query)

    # 逐行fetch
    # for row in result_set:
    #     print(row.first_name)

    result = result_set.fetchall()
    print(result)

# 基于ORM的查询
# ==============================================
from sqlalchemy.orm import aliased
from _models import Employee, Department, Session

session = Session()

with engine.connect() as conn:
    print("基于ORM的查询")
    print("-----------------------------------------------------")
    # 单表查询
    # ----------------------------------------
    sql = select(Employee.id,
                 Employee.department_id,
                 Employee.name,
                 Employee.birthday).\
        where(Employee.dep_id == 1,
              Employee.address != "xxx")
    print(conn.execute(sql).fetchall())

# 关联查询
# ------------------------------------------
emp_cls = aliased(Employee, name="emp")
dep_cls = aliased(Department, name="dep")
query = select(emp_cls,
               dep_cls).\
    join(emp_cls.department.of_type(dep_cls))  # 关联字段也要转别名
result = session.execute(query)
for row in result:
    print(row)

# 指定字段
# ~~~~~~~~~~~~~~~~~~~~~~~
# 使用字段时用join_from, 使用类时用join
query = select(Employee.name.label('emp_name'), Department.name).join_from(Employee, Department)
session.execute(query)

# 左外连接
# ~~~~~~~~~~~~~~~~~~~~~~~~~
query = select(Employee.name, Department.name).\
    select_from(outerjoin(Employee, Department))


# 关联查询
# ===============================================
from _tables import department, employee

with engine.connect() as conn:
    print("左连接查询")
    print("-----------------------------------------------------")
    sql = select(employee.c.id,
                 employee.c.department_id,
                 department.c.leader,
                 employee.c.name,
                 employee.c.birthday).\
        join(department, department.c.id == employee.c.department_id, isouter=True).\
        where(department.c.name == 'hr')
    """
               SELECT employee.id, 
                      employee.department_id, 
                      department.leader,
                      employee.name, 
                      employee.birthday 
                 FROM employee 
      LEFT OUTER JOIN department ON department.id = employee.department_id 
                WHERE department.name = 'hr'
    """
    print(conn.execute(sql).fetchall())

    print("返回指定部门的员工表信息")
    print("-----------------------------------------------------")
    sql = select(employee).\
        select_from(
        employee.join(department, department.c.id == employee.c.department_id)).\
        where(department.c.name == 'hr')
    """
    SELECT employee.id, 
           employee.department_id, 
           employee.name, 
           employee.birthday 
      FROM employee 
      JOIN department ON department.id = employee.department_id 
     WHERE department.name = 'hr'
    """
    print(conn.execute(sql).fetchall())

跨库join表

在表名前手工加上库名

type User struct {
}

type (User) TableName() string {
return "user.user"
}

type Discover struct {
}

func (Discover) TableName() string {
return "discover.discover"
}

参见

issue: #2959

子查询

tests/gen_test.go

// TestSubQuery IN子查询
// 关联issue: https://github.com/go-gorm/gen/issues/286
// 关联文档: https://gorm.io/zh_CN/gen/query.html#%E5%AD%90%E6%9F%A5%E8%AF%A2
func (s *GenSuite) TestSubQuery() {
	ast := assert.New(s.T())
	ctx := context.Background()

	Author := s.q.Author
	Story := s.q.Story

	// SELECT * FROM `stories` WHERE `stories`.`author_id` IN (SELECT `author`.`id` FROM `author`) AND `stories`.`deleted_at` IS NULL
	results, err := Story.WithContext(ctx).Where(
		Story.Columns(Story.AuthorID).In(Author.WithContext(ctx).Select(Author.ID)),
	).Debug().Find()
	ast.Nil(err)
	ast.Equal(len(results), 1)
}

自定义复杂查询

// TestRawSelect 写raw sql实现复杂的字段查询
// 关联的issue: https://github.com/go-gorm/gen/issues/1240
//
// github上已有用户要求Filed支持自定义字段并发起了pr: https://github.com/go-gorm/gen/issues/1212
// Field的文档: https://gorm.io/zh_CN/gen/query.html#%E6%96%B0%E5%BB%BA%E5%AD%97%E6%AE%B5
func (s *AuthorGenSuite) TestRawSelect() {
    ast := assert.New(s.T())
    ctx := context.Background()

    type Result struct {
        Id        int
        FirstName string
        Year      int
    }
    var result Result

    table := s.q.Author
    table.WithContext(ctx).Where(table.ID.Eq(1)).UnderlyingDB().
        Select([]string{
            table.ID.ColumnName().String(),
            table.FirstName.ColumnName().String(),
            fmt.Sprintf("YEAR(CAST(%s as DATE)) as year", table.BirthDate.ColumnName()),
        }).Debug().Scan(&result)
    // 执行sql: SELECT `id`,`first_name`,YEAR(CAST(birth_date as DATE)) as year FROM `author` WHERE `author`.`id` = 1
    ast.Equal(2024, result.Year)
    ast.Equal("蝉", result.FirstName)
}

函数

拼接字符串

select * from gitee_videopart where concat(page_num, '-', part)='P2-02_redis是什么';

钩子