鹅厂实习收获
我将业务进行了抽象,本文符合保密协议,不涉及真实业务场景。
本文只讨论技术,所有代码均根据网络资料改编。
分布式锁业务实战
在没有Redis的情况下使用数据库中去重表的唯一索引处理幂等性校验。后续引入Redis使用分布式锁处理,同时使用全局变量和sync.Once实现单例模式,保证新建Redis连接池的代码在多线程多请求环境下只初始化一次。后期使用Redsync做分布式锁,同时在业务耗时不稳定的情况下新建协程实现锁自动续期.
废话不多说,直接上代码。
import (
"fmt"
"sync"
"testing"
"time"
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
// 定义全局变量
var (
redisClient *goredislib.Client
once sync.Once
)
// getRedisClient 是一个函数,用于返回 Redis 客户端实例。
// 使用 sync.Once 确保 Redis 客户端只初始化一次。
func getRedisClient() *goredislib.Client {
once.Do(func() {
// 初始化 Redis 客户端
redisClient = goredislib.NewClient(&goredislib.Options{
Addr: "192.168.179.130:6379", // Redis 地址
Password: "", // 密码(如果有的话)
DB: 2, // 使用的数据库编号
})
fmt.Println("Redis 客户端已初始化")
})
return redisClient
}
func TestRedisPool(t *testing.T) {
redisClient := getRedisClient()
pool := goredis.NewPool(redisClient) // 或者,pool := redigo.NewPool(...)
// 创建一个redsync实例,用于获取互斥锁
rs := redsync.New(pool)
// 通过使用相同的名字获取一个新的互斥锁,想要获取同一个锁的所有实例都需要使用相同的名字
mutexname := "my-global-mutex"
// 默认是8秒的过期时间
mutex := rs.NewMutex(mutexname)
// 获取我们的互斥锁的锁,这个操作成功后,没有人能获取同一个锁(同一个互斥锁名字)直到我们解锁它
if err := mutex.TryLock(); err != nil {
panic(err)
}
//创建一个channel,用来通知续租goroutine任务已经完成
done := make(chan bool)
// 开启一个goroutine,周期性地续租锁
go func() {
ticker := time.NewTicker(4 * time.Second) // 按照需求调整
defer ticker.Stop()
for {
select {
case <-ticker.C:
ok, err := mutex.Extend()
if err != nil {
fmt.Println("Failed to extend lock:", err)
} else if !ok {
fmt.Println("Failed to extend lock: not successes")
}
case <-done:
return
}
}
}()
defer mutex.Unlock()
//通知goRoutine停止续租
defer close(done)
// 在这里执行需要锁的工作
time.Sleep(15 * time.Second)
}
乐观锁版本号
数据库里存在版本号,版本号对则更新,版本号不对则递归再更新。
循环重试和递归重试一样的。
var (
sequence int64
)
func TestOptimisticLock(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
maxRetries := 100 // 添加最大重试次数防止死循环
for retries := 0; retries < maxRetries; retries++ {
current := atomic.LoadInt64(&sequence)
expected := current
newVal := expected + 1
// 模拟延迟,增加冲突概率
time.Sleep(time.Millisecond * 5)
// 使用 CAS 原子操作模拟乐观锁
if atomic.CompareAndSwapInt64(&sequence, expected, newVal) {
fmt.Println("更新成功")
break
} else {
fmt.Println("更新失败,正在重新更新")
fmt.Printf("Goroutine 冲突失败: 当前值 %d != 期望值 %d\n", sequence, expected)
// time.Sleep(time.Millisecond * 10) // 避免频繁重试
}
}
}()
}
wg.Wait()
// 验证最终结果是否正确
if sequence != 10 {
t.Errorf("期望最终 sequence 为 10,实际为 %d", sequence)
}
}
函数式选项模式
go中的一种设计模式,和常见的设计模式一样,也是通过代码的设计,让配置更加方便。
业务背景:初始化一个类,有些是固定的参数,有些是可选的。这些可选的叫插件Plugins。
我们以一个服务器为例子,可以给服务器添加多个插件。
如果每种都要写构造函数,那要写很多构造函数。所以进行了改造。
import (
"fmt"
"testing"
)
// 插件结构体
type Plugin struct {
Name string
Config map[string]interface{}
}
// Server 结构体(对外暴露)
type Server struct {
Host string
Port int
Plugins []Plugin
}
// 内部配置结构体(对外隐藏)
type serverConfig struct {
host string
port int
plugins []Plugin
}
// IOption 接口:用于配置 Server
type IOption interface {
apply(*serverConfig)
}
// funcOption 类型:实现 IOption 接口
type funcOption func(*serverConfig)
func (fo funcOption) apply(cfg *serverConfig) {
fo(cfg)
}
// NewServer 构造函数
func NewServer(opts ...IOption) *Server {
cfg := &serverConfig{
host: "localhost", // 默认 Host
port: 8080, // 默认 Port
plugins: make([]Plugin, 0),
}
for _, opt := range opts {
opt.apply(cfg)
}
return &Server{
Host: cfg.host,
Port: cfg.port,
Plugins: cfg.plugins,
}
}
// WithHost 设置 Host 字段
func WithHost(host string) IOption {
return funcOption(
func(cfg *serverConfig) {
cfg.host = host
})
}
// WithPort 设置 Port 字段
func WithPort(port int) IOption {
return funcOption(
func(cfg *serverConfig) {
cfg.port = port
})
}
// WithAuthPlugin 添加认证插件
func WithAuthPlugin(realm string) IOption {
return funcOption(
func(cfg *serverConfig) {
cfg.plugins = append(cfg.plugins, Plugin{
Name: "auth",
Config: map[string]interface{}{"realm": realm},
})
})
}
// WithRateLimitPlugin 添加限流插件
func WithRateLimitPlugin(limit int) IOption {
return funcOption(
func(cfg *serverConfig) {
cfg.plugins = append(cfg.plugins, Plugin{
Name: "rate_limit",
Config: map[string]interface{}{"limit": limit},
})
})
}
// WithLoggerPlugin 添加日志插件
func WithLoggerPlugin(level string) IOption {
return funcOption(
func(cfg *serverConfig) {
cfg.plugins = append(cfg.plugins, Plugin{
Name: "logger",
Config: map[string]interface{}{"level": level},
})
})
}
func TestPlugins(t *testing.T) {
server := NewServer(
WithHost("example.com"),
WithPort(8000),
WithAuthPlugin("admin"),
WithRateLimitPlugin(100),
WithLoggerPlugin("debug"),
)
fmt.Printf("Host: %s\n", server.Host)
fmt.Printf("Port: %d\n", server.Port)
fmt.Println("Plugins:")
for _, plugin := range server.Plugins {
fmt.Printf("- %s: %v\n", plugin.Name, plugin.Config)
}
}