gorm.io/sharding:改造,当查询条件中不包含分表键时,从自定义方法中获取对应的表进行查询
项目背景
这篇文章是一种特殊的情形——当查询条件中不包含分表键时,从自定义方法中获取对应的表进行查询。实际项目中并不建议这种用法。
当然,这里只是提供一种思路。这篇文章也是这个系列中的第三篇文章。前两篇文章《基于gorm.io/sharding分表中间件使用案例》、《gorm.io/sharding使用中遗留问题探究:Error 1062 (23000): Duplicate entry ‘2‘ for key ‘orders_2025.PRIMARY》有兴趣的看官可以移步过去看看。
解决方案
我们重点关注回调函数注册的逻辑,这段回调函数的注册逻辑中,我们重点关注查询的回调函数注册,实际上这几个逻辑是一致的。
s.Callback().Query().Before("*").Register("gorm:sharding", s.switchConn)这行代码在查询(Query)操作的所有内置回调之前注册了一个名为gorm:sharding
的自定义回调,该回调执行s.switchConn
方法。这意味着在每次执行查询操作之前,都会先调用s.switchConn
方法来根据分片规则选择或切换数据库连接。
func (s *Sharding) registerCallbacks(db *gorm.DB) {s.Callback().Create().Before("*").Register("gorm:sharding", s.switchConn)s.Callback().Query().Before("*").Register("gorm:sharding", s.switchConn)s.Callback().Update().Before("*").Register("gorm:sharding", s.switchConn)s.Callback().Delete().Before("*").Register("gorm:sharding", s.switchConn)s.Callback().Row().Before("*").Register("gorm:sharding", s.switchConn)s.Callback().Raw().Before("*").Register("gorm:sharding", s.switchConn)
}
所以,我们的目标就是修改这个自定义回调方法switchConn。
func (s *Sharding) switchConn(db *gorm.DB) {// Support ignore sharding in some case, like:// When DoubleWrite is enabled, we need to query database schema// information by table name during the migration.if _, ok := db.Get(ShardingIgnoreStoreKey); !ok {s.mutex.Lock()if db.Statement.ConnPool != nil {s.ConnPool = &ConnPool{ConnPool: db.Statement.ConnPool, sharding: s}db.Statement.ConnPool = s.ConnPool}s.mutex.Unlock()}// 获取查询语句的上下文信息ctx := db.Statement.Context// 检查查询中是否包含分表键(这里需要根据实际情况实现具体的检查逻辑)containsShardKey := true// 从上下文中获取查询条件中是否包含分表键if ctx.Value("no_sharding") != nil {containsShardKey = false}// 如果不包含分表键,则使用自定义函数获取表名if !containsShardKey && s._config.TableResolver != nil {tableName := s._config.TableResolver(ctx)if tableName != "" {// 设置查询的表名db.Statement.Table = tableName}}
}
因此,我们还需要在config中增加一个传自定义的获取表明的方法。
// Config specifies the configuration for sharding.
type Config struct {// When DoubleWrite enabled, data will double write to both main table and sharding table.DoubleWrite bool// ShardingKey specifies the table column you want to used for sharding the table rows.// For example, for a product order table, you may want to split the rows by `user_id`.ShardingKey string// NumberOfShards specifies how many tables you want to sharding.NumberOfShards uint// 当查询中需要指定表时,使用该函数来解析表名TableResolver func(context.Context) string// tableFormat specifies the sharding table suffix format.tableFormat string// ShardingAlgorithm specifies a function to generate the sharding// table's suffix by the column value.// For example, this function implements a mod sharding algorithm.//// func(value any) (suffix string, err error) {// if uid, ok := value.(int64);ok {// return fmt.Sprintf("_%02d", user_id % 64), nil// }// return "", errors.New("invalid user_id")// }ShardingAlgorithm func(columnValue any) (suffix string, err error)// ShardingSuffixs specifies a function to generate all table's suffix.// Used to support Migrator and generate PrimaryKey.// For example, this function get a mod all sharding suffixs.//// func () (suffixs []string) {// numberOfShards := 5// for i := 0; i < numberOfShards; i++ {// suffixs = append(suffixs, fmt.Sprintf("_%02d", i%numberOfShards))// }// return// }ShardingSuffixs func() (suffixs []string)// ShardingAlgorithmByPrimaryKey specifies a function to generate the sharding// table's suffix by the primary key. Used when no sharding key specified.// For example, this function use the Snowflake library to generate the suffix.//// func(id int64) (suffix string) {// return fmt.Sprintf("_%02d", snowflake.ParseInt64(id).Node())// }ShardingAlgorithmByPrimaryKey func(id int64) (suffix string)// PrimaryKeyGenerator specifies the primary key generate algorithm.// Used only when insert and the record does not contains an id field.// Options are PKSnowflake, PKPGSequence and PKCustom.// When use PKCustom, you should also specify PrimaryKeyGeneratorFn.PrimaryKeyGenerator int// PrimaryKeyGeneratorFn specifies a function to generate the primary key.// When use auto-increment like generator, the tableIdx argument could ignored.// For example, this function use the Snowflake library to generate the primary key.// If you don't want to auto-fill the `id` or use a primary key that isn't called `id`, just return 0.//// func(tableIdx int64) int64 {// return nodes[tableIdx].Generate().Int64()// }PrimaryKeyGeneratorFn func(tableIdx int64) int64
}
改造的工作就此完成,当然,如果实际要使用这种方式,上述还有诸多需要优化的地方,因为,只是用于说明解决方案的思路,所以,上述改造并不是最优解。
接下来,我们看一下使用案例。
测试案例
直接上代码。
说明:关于order的定义,建表的逻辑,测试数据插入等在这个系列中的另外两篇文章中有定义。
package testimport ("context""fmt""testing""time""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/sharding"
)func customTableResolver(ctx context.Context) string {if ctx.Value("year") != nil {return fmt.Sprintf("orders_%d", ctx.Value("year"))}return ""
}// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding2(t *testing.T) {// 连接到 MySQL 数据库dsn := "user:password@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"db, err := gorm.Open(mysql.New(mysql.Config{DSN: dsn,}), &gorm.Config{})if err != nil {panic("failed to connect database")}globalDB = db// 配置 Gorm Sharding 中间件,使用自定义的分片算法middleware := sharding.Register(sharding.Config{ShardingKey: "order_id",TableResolver: customTableResolver, // 使用自定义的表解析器ShardingAlgorithm: customShardingAlgorithm, // 使用自定义的分片算法PrimaryKeyGenerator: sharding.PKMySQLSequence,}, "orders") // 逻辑表名为 "orders"db.Use(middleware)// 没有分表键的查询ctx := context.Background()ctx = context.WithValue(ctx, "year", 2024)ctx = context.WithValue(ctx, "no_sharding", true)err = db.WithContext(ctx).Model(&Order{}).Where("product_id=?", 102).Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("no sharding key Selected orders: %#v\n", orders)
}