关系型数据库代码
sqlalchemy: https://docs.sqlalchemy.org/en/20/index.html
连接
$ docker exec -it mysql mysql -u root -p
import pymysql
conn = pymysql.connect(host="127.0.0.1",
user="root",
password="mysql",
database='mysql',
port=13306,
cursorclass=pymysql.cursors.DictCursor)
with conn.cursor() as cursor:
cursor.execute("select 1")
result = cursor.fetchone()
assert result == {"1": 1}
url格式: https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls
import os
from sqlalchemy import create_engine, text, column
from testcontainers.mysql import MySqlContainer
# 创建engine
# ====================================
# url格式
# =========================================
# url = 'sqlite+pysqlite:///:memory:'
# 默认
# url = 'mysql://user:pwd@localhost/testdb'
# pymysql: "mysql+pymysql://scott:tiger@localhost/foo"
url = "mysql+pymysql://root:mysql@localhost/mysql"
"""
密码带有@符号时要转义
from urllib.parse import quote_plus
password='dfwaf@'
password = quote_plus(password)
"""
engine = create_engine(url, echo=True)
with engine.connect() as conn, conn.begin():
# text: https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.text
query = text("SELECT 1 col;")
result = conn.execute(query)
print(result.all())
# [(1,)]
# 将结果转换为字典列表
# ==============================================================
result = conn.execute(query)
columns = [col[0] for col in result.cursor.description]
dict_result = [dict(zip(columns, row)) for row in result]
print(dict_result)
# [{'col': 1}]
# 创建表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""))
# 插入数据
users_data = [
{'username': 'alice', 'password_hash': 'hashed_password_for_alice', 'email': 'alice@example.com'},
{'username': 'bob', 'password_hash': 'hashed_password_for_bob', 'email': 'bob@example.com'},
{'username': 'charlie', 'password_hash': 'hashed_password_for_charlie', 'email': 'charlie@example.com'}
]
insert_sql = text("""
INSERT INTO users (username, password_hash, email)
VALUES (:username, :password_hash, :email)
""")
conn.execute(insert_sql, users_data)
from sqlalchemy import Integer, String
# 占位符参数
t = text("SELECT * FROM users WHERE id=:user_id")
result = conn.execute(t, {"user_id": 2})
print(result.all())
# 在程序结束的时候销毁引擎
# engine.dispose()
# mysql操作
# =====================================
def main():
with MySqlContainer('mysql:8.0.35') as mysql:
engine = create_engine(mysql.get_connection_url())
with engine.connect() as conn, conn.begin():
# 创建表
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""))
# 批量插入多条记录
insert_sql = text("""
INSERT INTO users (username, password_hash, email)
VALUES (:username, :password_hash, :email)
""")
conn.execute(insert_sql, users_data)
# 第二种传参方式
t = text("SELECT id, username, email FROM users WHERE username in :users"). \
bindparams(users=['bob', 'charlie']). \
columns(column('id', String), column('username', String), column('email', String))
for row in conn.execute(t):
print(row)
"""
(2, 'bob', 'bob@example.com')
(3, 'charlie', 'charlie@example.com')
"""
engine.dispose()
package P2
import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var DB *gorm.DB
var MysqlLogger logger.Interface
func init() {
username := "root"
password := "root"
host := "127.0.0.1"
port := 3306
Dbname := "gorm"
timeout := "10s"
// 第一种配置日志的方式
MysqlLogger = logger.Default.LogMode(logger.Info)
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=%s", username, password, host, port, Dbname, timeout)
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
// 如果没有数据一致性的要求,可以在初始化时禁用, 这样可以获得60%的性能提升
//SkipDefaultTransaction: true,
//NamingStrategy: schema.NamingStrategy{
// TablePrefix: "f_", // 表名前缀
// SingularTable: true, // 是否单数表明
// NoLowerCase: false, // 关闭大小写转换
//},
//Logger: MysqlLogger,
})
if err != nil {
panic("连接数据库失败, error=" + err.Error())
}
// 连接成功
DB = db
type Result struct {
Col int
}
var result Result
DB.Raw("SELECT 1 col").Scan(&result)
fmt.Println(result.Col)
}
package main
import (
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
func main() {
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
// 需要自行将 dsn 中的 host,账号 密码配置正确
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
conn := sqlx.NewMysql(dsn)
_ = conn
}
模型
从代码定义模型
from sqlalchemy import MetaData, Table, Column, Integer, String, Date, ForeignKey, Boolean
from _connection import engine
meta = MetaData()
# 定义表结构
students = Table(
'students', meta,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('first_name', String(128), unique=True, nullable=False),
Column('last_name', String(128)),
Column('birthday', Date, nullable=False),
Column('is_delete', Boolean, default=False),
)
# 关联表
# ============================================
# 部门表
department = Table(
'department', meta,
Column('id', Integer, primary_key=True),
Column('name', String(128), unique=True, nullable=False),
Column('leader', String(128), nullable=False, comment="领导人")
)
# 员工表
employee = Table(
"employee", meta,
Column('id', Integer, primary_key=True),
# 外键
Column("department_id", Integer, ForeignKey("department.id"), nullable=False),
Column('name', String(255), nullable=False),
Column('birthday', Date, nullable=False),
)
# 建表, 如果是从已存在的数据库执行任务,则这行没有多大必要存在
# =============================================================================
meta.create_all(engine)
from datetime import datetime
from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Mapped, mapped_column
from sqlalchemy import Column, Integer, ForeignKey, Date, String, Table, DateTime
from sqlalchemy.sql import func
from typing_extensions import Annotated
from _connection import engine
# ORM映射类
# ============================================================
Base = declarative_base()
class Employee(Base):
__tablename__ = "employee"
__allow_unmapped__ = False
id = Column(Integer, primary_key=True)
# 外键
# -------------------------------
dep_id = Column(Integer, ForeignKey("department.id"), nullable=False)
# lazy: 默认是True懒惰, 查一次employee表,访问到department再查一次department表
# 为False时使用join连接一次性查询employee和department表
# back_populates: 双向关联, 需要在两个关联的类中分别定义关系, 为Employee类增加一个属性
# 还有一个backref参数也是定义反向引用
# 通常来说,如果你只需要一个简单的反向引用,backref 就足够了。如果你希望更明确地控制这种关系,那么 back_populates 更合适。
department = relationship("Department", back_populates="employees", lazy=False)
name = Column(String(255), nullable=False)
birthday = Column(Date, nullable=False)
address = Column(String(255), nullable=True)
# 一对一关系
# ----------------------------------------
computer_id: Mapped[int] = mapped_column(ForeignKey("computer.id"), nullable=True)
computer = relationship("Computer", lazy=False, back_populates="employee")
def __repr__(self):
return f'id: {self.id}, dep_id: {self.dep_id}, name: {self.name}, birthday: {self.birthday}'
# 2.X新方式
# -------------------------------------------
# 自定义类型注解
int_pk = Annotated[int, mapped_column(primary_key=True)]
create_time = Annotated[datetime, mapped_column(nullable=False, server_default=func.now())]
update_time = Annotated[datetime, mapped_column(nullable=False, server_onupdate=func.now())]
class Department(Base):
__tablename__ = 'department'
id: Mapped[int_pk]
name: Mapped[str] = Column(String(128), unique=True, nullable=False)
leader = Column(String(128), nullable=False, comment='领导人')
create_time: Mapped[create_time]
update_time: Mapped[update_time]
# 一对多关系
# ------------------------------------------------------
employees: Mapped[list["Employee"]] = relationship(back_populates="department")
def __repr__(self):
return f'id: {self.id}, name: {self.name}, create_time: {self.create_time}'
# 多对多关联
# ==================================================
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]
association_table = Table(
"user_role",
Base.metadata,
Column("user_id", ForeignKey("users.id"), primary_key=True),
Column("role_id", ForeignKey("roles.id"), primary_key=True),
)
class User(Base):
__tablename__ = "users"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
password: Mapped[required_string]
# 多对多关联
roles: Mapped[list["Role"]] = relationship(secondary=association_table, lazy=False, back_populates="users")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
class Role(Base):
__tablename__ = "roles"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
users: Mapped[list["User"]] = relationship(secondary=association_table, lazy=False, back_populates="roles")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
# 一对一关联
# ==============================================================
class Computer(Base):
__tablename__ = "computer"
id: Mapped[int_pk]
model: Mapped[required_string]
number: Mapped[required_unique_name]
employee = relationship("Employee", lazy=True, back_populates="computer")
# 建表
# ==================================================
Base.metadata.create_all(engine)
# 会话
# =============================================================
Session = sessionmaker(bind=engine)
/* https://gorm.io/zh_CN/docs/models.html#%E6%A8%A1%E5%9E%8B%E5%AE%9A%E4%B9%89
*/
package main
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"time"
)
// Student 定义模型
type Student struct {
ID uint // Standard field for the primary key
FirstName string `gorm:"size:128,unique"`
LastName string // 一个常规字符串字段
Birthday *time.Time // A pointer to time.Time, can be null
}
func main() {
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
// 建表
// https://gorm.io/zh_CN/docs/migration.html#AutoMigrate
db.AutoMigrate(&Student{})
}
CREATE TABLE students (
id INTEGER NOT NULL AUTO_INCREMENT,
first_name VARCHAR(128) NOT NULL,
last_name VARCHAR(128),
birthday DATE NOT NULL,
is_delete BOOL,
PRIMARY KEY (id),
UNIQUE (first_name)
)
从db生成模型代码
支持sqlalchemy2.X版本的sqlacodegen3.X仍未正式发布,等发布后再整理
sqlalchemy支持映射表结构到Table类: https://docs.sqlalchemy.org/en/20/core/reflection.html#reflecting-database-objects
使用场景之一是同步多个cmdb资源,每一个资源对应一张info表, 需要更新哪些字段是配置在数据库中的。此时可以自动映射表结构到Table, 然后编写 函数动态写入数据
from sqlalchemy import MetaData
from sqlalchemy.engine import Engine
from sqlalchemy.dialects.mysql import insert
def sync_cmdb_task(engine: Engine, table_name, inputlist: list[dict], updatelist: list):
""""
:param inputlist: 资源数据
:param: updatelist: 有则更新的字段
"""
# 自动映射表结构
table = Table(table_name, MetaData(), autoload_with=engine)
with engine.connect() as conn:
stmt = insert(table).values(inputlist)
try:
duplicate_key_data = {column: insert_stmt.inserted[column] for column in updatelist}
except KeyError as err:
raise AssertionError("该字段不存在: " + str(err)) from err
duplicate_key_data['is_delete'] = 0
duplicate_key_data['deletetime'] = None
stmt = stmt.on_duplicate_key_update(**duplicate_key_data)
conn.execute(stmt)
conn.commit()
参见
bilibili: 操作数据库已存在的表(单文件脚本)
用官方的 gen 工具
gen的配置文件
/*
```mysql
CREATE TABLE customers (
id INT AUTO_INCREMENT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted_at TIMESTAMP
);
CREATE TABLE credit_cards (
id INT AUTO_INCREMENT PRIMARY KEY,
number VARCHAR(255) NOT NULL,
customer_refer INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted_at TIMESTAMP,
-- FOREIGN KEY (customer_refer) REFERENCES customers(id) ON DELETE CASCADE ON UPDATE CASCADE
);
```
*/
package main
import (
"gorm.io/driver/mysql"
"gorm.io/gen"
"gorm.io/gen/field"
"gorm.io/gorm"
"log"
)
func main() {
g := gen.NewGenerator(gen.Config{
// .是go.mod文件所在位置
OutPath: "./dao/query",
ModelPkgPath: "./model",
WithUnitTest: true,
FieldNullable: true, // 当字段为NULL时生成指针
FieldCoverable: true, // 当字段有默认值时生成指针
FieldWithIndexTag: true, // 生成gorm索引标签
FieldWithTypeTag: true, // 生成gorm字段类型标签
Mode: gen.WithoutContext | gen.WithDefaultQuery | gen.WithQueryInterface, // generate mode
})
dsn := "root:pass@tcp(127.0.0.1:3306)/mysql?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
g.UseDB(db)
// 从已存在的表生成到代码
card := g.GenerateModel("credit_cards")
// 一个customer关联多张信用卡
customer := g.GenerateModel("customers", gen.FieldRelate(field.HasMany, "CreditCards", card,
&field.RelateConfig{
// RelateSlice: true,
// foreignKey: 在CreditCard模型多出一个CustomerRefer字段, 可以得到customer的id值
// references: 指定CustomerRefer的主键字段
GORMTag: field.GormTag{"foreignKey": []string{"CustomerRefer"}, "references": []string{"ID"}},
}),
)
g.ApplyBasic(card, customer)
g.Execute()
}
源文件: https://gitee.com/luzhenxiong/bilibili-orm/blob/master/gorm/examples/script/gencode/main.go
参见
知乎: 操作数据库已存在的表(单文件脚本) bilibili: 操作数据库已存在的表(单文件脚本)
python manage.py inspectdb table1 table2 > ./application/models/gen.py
这个功能是作为一个快捷方式,而不是作为明确的模型生成。生成之后看下是否需要调整模型代码。
默认情况下, 自动生成的模型的Meta类中的managed为False告诉Django不要管理每张表的创建、修改和删除, 这在单元测试中是一个大坑, 不会自动在测试库中自动建表。 所以一般都要手工将managed这一行代码移除, 将非托管模型变为托管的
information_schema.COLUMNS
表记录了字段的信息
基本CRUD
from sqlalchemy import Table, Engine
engine: Engine
students: Table
# 构造执行sql
insert_tome = students.insert().values(first_name='Tome', last_name='Smith')
with engine.connect() as conn:
result = conn.execute(insert_tome)
print(result.inserted_primary_key)
conn.commit()
# 写入多条数据
person_insert = students.insert()
conn.execute(person_insert, [
{"first_name": "James", "last_name": "d"},
{"first_name": "chan", "last_name": "shiyu"},
{"first_name": "parado", "last_name": "x"},
])
conn.commit()
import pymysql.cursors
# Connect to the database
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
database='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
# Create a new record
sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
# connection is not autocommit by default. So you must commit to save
# your changes.
connection.commit()
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%(email)s"
cursor.execute(sql, {'email': 'webmaster@python.org'})
result = cursor.fetchone()
print(result)
import pymysql.cursors
import pandas as pd
with pymysql.connect(host='localhost',
user='user',
password='passwd',
database='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor) as conn:
df = pd.read_sql("SELECT `id`, `password`, 'age', 'name' FROM `users` WHERE `email`=%(email)s",
conn, params={"email": 'webmaster@python.org'})
# 根据字段筛选数据
print(df[['id', 'password']])
# 类sql查询
print(df.query("age >= 18"))
print(df.query("name in ('Bob', 'Alice)"))
print(df.query("name == 'Bob'"))
# TODO: 写入
insert into stus(stu_num, stu_name, stu_gender, stu_age, stu_tel)
values('20210101', '张三', '男', 21, '13030303300');
-- 存在就更新数据
# https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html
INSERT INTO t1 (a,b,c) VALUES (1,2,3)
ON DUPLICATE KEY UPDATE c=c+1;
INSERT INTO t1 (a,b,c) VALUES (1,2,3),(4,5,6)
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);
-- 插入多条
INSERT INTO tbl_name (a,b,c)
VALUES(1,2,3), (4,5,6), (7,8,9);
更新
from datetime import date
from sqlalchemy import update
from _connection import engine
from _tables import students
with engine.connect() as conn:
update_sql = students.update().where(students.c.id == 5).values(name='Maria')
conn.execute(update_sql)
update_sql2 = update(students).where(students.c.id.in_(5, 6)).values(name='Maria')
conn.execute(update_sql2)
conn.commit()
# 基于ORM的更新
# =================================
from sqlalchemy.orm import Session
from _models import Employee
with Session(engine) as session:
session.execute(
update(Employee),
[
# 根据id更新,不需要额外写where条件
{"id": 2, "birthday": date(1999, 2, 9)},
{"id": 5, "name": "Samuel"},
]
)
session.commit()
from _models import Session, Employee
session = Session()
# 类对象操作
# ==============================================================
person = session.query(Employee).filter(Employee.id == 1).one()
person.address = 'www'
session.commit()
# 类SQL操作
# =============================================================
session.query(Employee).filter(Employee.id == 1).update({
Employee.address: "www"
})
session.commit()
# 一对一更新
# =======================================
from _models import Computer
session.query(Employee.id == 3).update({Employee.computer_id: None})
c = session.query(Computer).filter(Computer.id == 3).scalar()
e = session.query(Employee).filter(Employee.id == 3).scalar()
e.computer = c
session.commit()
UPDATE person SET name='Maria' WHERE id=5
删除
from sqlalchemy.sql import delete
from _connection import engine
from _tables import students
with engine.connect() as conn:
delete_sql = students.delete().where(students.c.id == 7)
conn.execute(delete_sql)
conn.commit()
# 基于orm的删除
# ==================================
from _models import Employee
with engine.connect() as conn:
conn.execute(
delete(Employee).where(Employee.name.in_(["Jack", 'Smith']))
)
conn.commit()
DELETE FROM person
写入
from datetime import date
from sqlalchemy.sql import insert, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from _connection import engine
from _tables import students
with engine.connect() as conn:
# 构造insert sql
sql = students.insert().values(first_name='Tome', last_name='Smith', birthday=date(2000, 12, 12))
result = conn.execute(sql)
print(result.inserted_primary_key)
# 写入多条数据
person_insert = students.insert()
conn.execute(person_insert, [
{"first_name": "James", "last_name": "d", "birthday": date(2000, 3, 14)},
{"first_name": "chan", "last_name": "shiyu", "birthday": date(1998, 8, 12)},
{"first_name": "parado", "last_name": "x", "birthday": date(1998, 7, 30)},
])
conn.commit()
# INSERT…ON DUPLICATE KEY UPDATE (Upsert)
# --------------------------------------------------------------------------
# https://docs.sqlalchemy.org/en/20/dialects/mysql.html#insert-on-duplicate-key-update-upsert
insert_stmt = mysql_insert(students).values(
[
{"first_name": "James", "last_name": "d", "birthday": date(2000, 3, 14)},
{"first_name": "chan", "last_name": "shiyu", "birthday": date(1998, 8, 12)},
{"first_name": "parado", "last_name": "x", "birthday": date(1998, 7, 30)},
]
)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
# inserted指被写入的数据, 转换成`VALUES(first_name)` 的效果
first_name=insert_stmt.inserted.first_name,
# 可以动态指定字段
last_name=insert_stmt.inserted['last_name'],
# 可以设置一个固定值
is_delete=0,
)
conn.execute(on_duplicate_key_stmt)
# 事务
# ======================================================
with engine.connect() as conn, conn.begin():
# 没异常自动提交
sql = students.insert().values(first_name='Tome', last_name='Smith', birthday=date(2000, 12, 12))
result = conn.execute(sql)
# 出现异常自动回滚
conn.execute("select 1/0")
# 基于ORM的写入
# ====================================================
from sqlalchemy.orm import Session
from _models import Department, Employee
with Session(engine) as session:
session.execute(
insert(Department).values(
[
{"name": "QA"},
{"name": "Sales"},
]
)
)
session.commit()
# 带关联的批量插入
# --------------------------------------------------
session.execute(
insert(Employee).values(
[
{
"dep_id": select(Department.id).where(Department.name == "hr"),
"name": "www",
"birthday": date(2000, 1, 19),
},
{
"dep_id": select(Department.id).where(Department.name == "market"),
"name": "YYY",
"birthday": date(2000, 2, 19),
}
]
)
)
# 自动提交和回滚
# ---------------------------------------------
with session.begin():
dep = Department(name="oooo")
session.add(dep)
# 关联表写入
# ====================================================
from _tables import department, employee
with engine.connect() as conn:
conn.execute(department.insert(), [
{"name": "hr"},
{"name": "it"},
])
conn.execute(employee.insert(), [
{"department_id": 1, "name": "Jack"},
{"department_id": 1, "name": "Tom"},
{"department_id": 1, "name": "Mary"},
{"department_id": 2, "name": "Smith"},
{"department_id": 2, "name": "Rose"},
{"department_id": 2, "name": "Leon"},
])
conn.commit()
from datetime import date
from _models import Session, Employee, Department
session = Session()
# 添加单条记录
# =====================================
# 方式一
# -----------------------------------------------
d1 = Department(name='hr', leader='ll')
session.add(d1)
session.flush() # 让d1保证有id的值
e1 = Employee(dep_id=d1.id, name='Jack', birthday=date(2001, 9, 18), address='post')
# 方式二(推荐)
# --------------------------------------
# 性能要比方式一更高
d = Department(name='hr', leader='kk')
e = Employee(name="Amy", birthday=date(2000, 9, 18), address='unknown')
e.department = d
session.add(e)
session.commit()
# 添加多条记录
# =====================================
employees = [
Employee(name="Eric", birthday=date(1998, 2, 18), address='unknown', department=d),
Employee(name="Samuel", birthday=date(1997, 1, 15), address='unknown', department=d),
]
session.add_all(employees)
session.commit()
# 多对多写入
# =====================================
from _models import User, Role
role1 = Role(name="Admin")
role2 = Role(name="Operator")
user1 = User(name="Jack", password="111")
user2 = User(name="Tom", password="222")
user3 = User(name="Mary", password="333")
user1.roles.append(role1)
user1.roles.append(role2)
user2.roles.append(role1)
user3.roles.append(role2)
session.add_all([user1, user2, user3])
# 一对一写入
# ==================================================
from _models import Computer
c1 = Computer(model="Dell", number="1111")
c2 = Computer(model="Surface", number="2222")
c3 = Computer(model="MacBook Pro", number="3333")
e2 = Employee(name="Jack", computer=c1)
e3 = Employee(name="Mary", computer=c2)
e4 = Employee(name="Tom", computer=c3)
session.add_all([e2, e3, e4])
session.commit()
func (l *CreateStoryLogic) CreateStory(req *types.CreateOrUpdateStoryReq) (resp *types.CreateOrUpdateStoryResp, err error) {
Story := l.svcCtx.Model.Story
story := model.Story{
Title: req.Title,
Content: req.Content,
AuthorID: int32(req.UserId),
}
// 写入数据
// ==========================================
err = Story.WithContext(l.ctx).Create(&story)
if err != nil {
return nil, errors.New("创建故事失败: " + err.Error())
}
// 返回数据
// ==========================================
resp = &types.CreateOrUpdateStoryResp{
Id: story.ID,
Title: story.Title,
UserId: int64(req.UserId),
UserName: story.Author.FirstName + story.Author.LastName,
}
return
}
func (l *CreateStoryLogic) CreateStory(req *types.CreateOrUpdateStoryReq) (resp *types.CreateOrUpdateStoryResp, err error) {
story := model.Story{
Title: req.Title,
Content: req.Content,
AuthorID: int32(req.UserId),
}
// 写入数据
// ==========================================
err = l.svcCtx.Db.Create(&story).Error
if err != nil {
return nil, errors.New("创建故事失败: " + err.Error())
}
// 返回数据
// ==========================================
resp = &types.CreateOrUpdateStoryResp{
Id: story.ID,
Title: story.Title,
UserId: int64(req.UserId),
UserName: story.Author.FirstName + story.Author.LastName,
}
return
}
// 批量写入
func (l *BatchCreateStoryLogic) BatchCreateStory(req *types.BatchCreateStoryReq) (resp *types.BatchCreateStoryResp, err error) {
resp = new(types.BatchCreateStoryResp)
var stories []*model.Story
for _, story := range req.Stories {
stories = append(stories, &model.Story{
Title: story.Title,
Content: story.Content,
AuthorID: int32(req.UserId),
})
}
err = l.svcCtx.Db.Create(stories).Error
if err != nil {
return nil, errors.New("批量创建故事失败: " + err.Error())
}
for _, story := range stories {
resp.CreatedIds = append(resp.CreatedIds, story.ID)
}
return
}
INSERT ON DUPLICATE KEY UPDATE
package main
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"log"
"script/dao/model"
"script/dao/query"
)
var q *query.Query
func init() {
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("连接数据库失败, error=" + err.Error())
}
_ = db.AutoMigrate(&model.Customer{}, &model.CreditCard{})
q = query.Use(db)
// 初始化数据
_ = q.Customer.Create(&model.Customer{
ID: 1,
CreditCards: []model.CreditCard{
{
ID: 1,
Number: "xxx",
},
},
})
}
func main() {
creditCards := []*model.CreditCard{
{
ID: 1,
Number: "xxxx",
},
{
ID: 2,
Number: "yyyy",
},
}
// https://gorm.io/zh_CN/gen/create.html#Upsert-x2F-On-Conflict
err := q.CreditCard.Clauses(clause.OnConflict{
// 如果出现id冲突时, 更新指定的字段
DoUpdates: clause.AssignmentColumns(
[]string{"number"}),
}).CreateInBatches(creditCards, 100)
if err != nil {
panic(err)
}
}
obj, created = Person.objects.update_or_create(
first_name="John",
last_name="Lennon",
defaults={"first_name": "Bob"},
create_defaults={"first_name": "Bob", "birthday": date(1940, 10, 9)},
)
高级查询
关联查询
知识点:
条件查询
关联查询
from sqlalchemy.sql import and_, or_, select, outerjoin
from _connection import engine
from _tables import students
with engine.connect() as conn:
query = students.select() \
.where(or_(
students.c.birthday > '2000-10-13'),
and_(students.c.first_name == "li")
)
result_set = conn.execute(query)
# 逐行fetch
# for row in result_set:
# print(row.first_name)
result = result_set.fetchall()
print(result)
# 基于ORM的查询
# ==============================================
from sqlalchemy.orm import aliased
from _models import Employee, Department, Session
session = Session()
with engine.connect() as conn:
print("基于ORM的查询")
print("-----------------------------------------------------")
# 单表查询
# ----------------------------------------
sql = select(Employee.id,
Employee.department_id,
Employee.name,
Employee.birthday).\
where(Employee.dep_id == 1,
Employee.address != "xxx")
print(conn.execute(sql).fetchall())
# 关联查询
# ------------------------------------------
emp_cls = aliased(Employee, name="emp")
dep_cls = aliased(Department, name="dep")
query = select(emp_cls,
dep_cls).\
join(emp_cls.department.of_type(dep_cls)) # 关联字段也要转别名
result = session.execute(query)
for row in result:
print(row)
# 指定字段
# ~~~~~~~~~~~~~~~~~~~~~~~
# 使用字段时用join_from, 使用类时用join
query = select(Employee.name.label('emp_name'), Department.name).join_from(Employee, Department)
session.execute(query)
# 左外连接
# ~~~~~~~~~~~~~~~~~~~~~~~~~
query = select(Employee.name, Department.name).\
select_from(outerjoin(Employee, Department))
# 关联查询
# ===============================================
from _tables import department, employee
with engine.connect() as conn:
print("左连接查询")
print("-----------------------------------------------------")
sql = select(employee.c.id,
employee.c.department_id,
department.c.leader,
employee.c.name,
employee.c.birthday).\
join(department, department.c.id == employee.c.department_id, isouter=True).\
where(department.c.name == 'hr')
"""
SELECT employee.id,
employee.department_id,
department.leader,
employee.name,
employee.birthday
FROM employee
LEFT OUTER JOIN department ON department.id = employee.department_id
WHERE department.name = 'hr'
"""
print(conn.execute(sql).fetchall())
print("返回指定部门的员工表信息")
print("-----------------------------------------------------")
sql = select(employee).\
select_from(
employee.join(department, department.c.id == employee.c.department_id)).\
where(department.c.name == 'hr')
"""
SELECT employee.id,
employee.department_id,
employee.name,
employee.birthday
FROM employee
JOIN department ON department.id = employee.department_id
WHERE department.name = 'hr'
"""
print(conn.execute(sql).fetchall())
from _models import Session, Employee
session = Session()
for employee in session.query(Employee).filter(Employee.address == 'unknown'):
print(f'name: {employee.name}, birthday: {employee.birthday}')
# 只拿一条记录,不需要循环
# ================================================
employee = session.query(Employee).filter(Employee.id == 1).first()
print(employee)
# 取1条,若数据不存在或存在多条记录则抛出异常
# ---------------------------------------------------------------
employee = session.query(Employee).filter(Employee.id < 3).one()
print(employee)
# 取1条,若存在多条记录则抛出异常
# ---------------------------------------------------------------
employee = session.query(Employee).filter(Employee.id < 3).scalar()
print(employee)
# 多对多查询
# ==========================================
from _models import User
u = session.query(User).filter(User.id == 1).one()
print(u.roles)
# 一对一查询
# ===============================================
from _models import Computer
e = session.query(Employee).filter(Employee.id == 1).scalar()
print(e.computer)
c = session.query(Computer).filter(Computer.id == 2).scalar()
print(c.employee)
app/internal/logic/retrievestorylogic.go
func (l *RetrieveStoryLogic) RetrieveStory(req *types.RetrieveStoryReq) (resp *types.RetrieveStoryResp, err error) {
// 获取实例
Story := l.svcCtx.Model.Story
story, err := Story.WithContext(l.ctx).Preload(Story.Author).Where(Story.ID.Eq(req.ID)).First()
// 返回数据
if err != nil {
return nil, errors.New("获取用户失败: " + err.Error())
}
resp = &types.RetrieveStoryResp{
Id: req.ID,
Title: story.Title,
UserId: int64(story.Author.ID),
UserName: story.Author.FirstName + story.Author.LastName,
}
return
}
func (l *RetrieveStoryLogic) RetrieveStory(req *types.RetrieveStoryReq) (resp *types.RetrieveStoryResp, err error) {
// 获取实例
var story model.Story
// 预加载: https://gorm.io/zh_CN/docs/preload.html#%E9%A2%84%E5%8A%A0%E8%BD%BD%E7%A4%BA%E4%BE%8B
err = l.svcCtx.Db.Preload("Author").First(&story).Error
// 返回数据
if err != nil {
return nil, errors.New("获取用户失败: " + err.Error())
}
resp = &types.RetrieveStoryResp{
Id: req.ID,
Title: story.Title,
UserId: int64(story.Author.ID),
UserName: story.Author.FirstName + story.Author.LastName,
}
return
}
跨库join表
在表名前手工加上库名
type User struct {
}
type (User) TableName() string {
return "user.user"
}
type Discover struct {
}
func (Discover) TableName() string {
return "discover.discover"
}
参见
issue: #2959
子查询
tests/gen_test.go
// TestSubQuery IN子查询
// 关联issue: https://github.com/go-gorm/gen/issues/286
// 关联文档: https://gorm.io/zh_CN/gen/query.html#%E5%AD%90%E6%9F%A5%E8%AF%A2
func (s *GenSuite) TestSubQuery() {
ast := assert.New(s.T())
ctx := context.Background()
Author := s.q.Author
Story := s.q.Story
// SELECT * FROM `stories` WHERE `stories`.`author_id` IN (SELECT `author`.`id` FROM `author`) AND `stories`.`deleted_at` IS NULL
results, err := Story.WithContext(ctx).Where(
Story.Columns(Story.AuthorID).In(Author.WithContext(ctx).Select(Author.ID)),
).Debug().Find()
ast.Nil(err)
ast.Equal(len(results), 1)
}
自定义复杂查询
// TestRawSelect 写raw sql实现复杂的字段查询
// 关联的issue: https://github.com/go-gorm/gen/issues/1240
//
// github上已有用户要求Filed支持自定义字段并发起了pr: https://github.com/go-gorm/gen/issues/1212
// Field的文档: https://gorm.io/zh_CN/gen/query.html#%E6%96%B0%E5%BB%BA%E5%AD%97%E6%AE%B5
func (s *AuthorGenSuite) TestRawSelect() {
ast := assert.New(s.T())
ctx := context.Background()
type Result struct {
Id int
FirstName string
Year int
}
var result Result
table := s.q.Author
table.WithContext(ctx).Where(table.ID.Eq(1)).UnderlyingDB().
Select([]string{
table.ID.ColumnName().String(),
table.FirstName.ColumnName().String(),
fmt.Sprintf("YEAR(CAST(%s as DATE)) as year", table.BirthDate.ColumnName()),
}).Debug().Scan(&result)
// 执行sql: SELECT `id`,`first_name`,YEAR(CAST(birth_date as DATE)) as year FROM `author` WHERE `author`.`id` = 1
ast.Equal(2024, result.Year)
ast.Equal("蝉", result.FirstName)
}
函数
拼接字符串
select * from gitee_videopart where concat(page_num, '-', part)='P2-02_redis是什么';
from django.db import models
from django.db.models.functions import Concat
from django.db.models import Value
class VideoPart(models.Model):
"""所有分P"""
page_num = models.CharField(max_length=4, help_text="选集号码")
part = models.CharField(max_length=256, help_text='选集名称')
STATUS_CHOICES = [
(0, "未启用"),
(1, "启用"),
(2, "已完成"),
]
status = models.SmallIntegerField(choices=STATUS_CHOICES, default=0)
duration = models.DurationField(help_text="视频时长", null=True)
num = models.IntegerField(help_text="号码(从page_num解析)", null=True)
# https://docs.djangoproject.com/zh-hans/4.2/ref/models/database-functions/#concat
# annotate是实现了设置新字段的效果
VideoPart.objects.annotate(c=Concat("page_num", Value("-"), "part")).filter(c='P2-02_redis是什么')