- Add Nacos registry for service registration and deregistration. - Implement Redis registry for session management with heartbeat and session claiming. - Improve completion service with session handling and request validation. - Enhance WebSocket handling for completion requests with JSON-RPC support. - Add tests for new registry implementations and completion manager functionalities. - Refactor existing code for better readability and maintainability.
234 lines
6.4 KiB
Go
234 lines
6.4 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type RedisRegistryConfig struct {
|
|
Addr string // Redis 地址,例如 127.0.0.1:6379。
|
|
Password string // Redis 密码,可为空。
|
|
DB int // Redis 数据库编号。
|
|
KeyPrefix string // 键前缀,隔离不同环境/业务。
|
|
InstanceID string // 当前实例唯一 ID。
|
|
InstanceEndpoint string // 当前实例对外访问地址。
|
|
SessionTTL time.Duration // 会话键的过期时间。
|
|
InstanceTTL time.Duration // 实例元数据过期时间。
|
|
HeartbeatInterval time.Duration // 实例心跳刷新周期。
|
|
}
|
|
|
|
// RedisRegistry 负责在 Redis 中维护实例心跳与会话归属。
|
|
type RedisRegistry struct {
|
|
client *redis.Client
|
|
|
|
keyPrefix string
|
|
instanceID string
|
|
instanceEndpoint string
|
|
sessionTTL time.Duration
|
|
instanceTTL time.Duration
|
|
heartbeatInterval time.Duration
|
|
|
|
stopCh chan struct{}
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// claimSessionScript 原子地抢占/续租会话,返回当前 owner。
|
|
var claimSessionScript = redis.NewScript(`
|
|
local sessionKey = KEYS[1]
|
|
local owner = ARGV[1]
|
|
local now = ARGV[2]
|
|
local ttl = tonumber(ARGV[3])
|
|
|
|
local existing = redis.call('HGET', sessionKey, 'owner')
|
|
if (not existing) or existing == owner then
|
|
redis.call('HSET', sessionKey, 'owner', owner, 'updatedAt', now)
|
|
redis.call('PEXPIRE', sessionKey, ttl)
|
|
return owner
|
|
end
|
|
|
|
redis.call('PEXPIRE', sessionKey, ttl)
|
|
return existing
|
|
`)
|
|
|
|
// releaseSessionScript 仅允许 owner 主动释放自己的会话。
|
|
var releaseSessionScript = redis.NewScript(`
|
|
local sessionKey = KEYS[1]
|
|
local owner = ARGV[1]
|
|
local existing = redis.call('HGET', sessionKey, 'owner')
|
|
if existing == owner then
|
|
redis.call('DEL', sessionKey)
|
|
return 1
|
|
end
|
|
return 0
|
|
`)
|
|
|
|
// NewRedisRegistry 初始化 Redis 客户端并启动实例心跳。
|
|
func NewRedisRegistry(ctx context.Context, cfg RedisRegistryConfig) (*RedisRegistry, error) {
|
|
if strings.TrimSpace(cfg.Addr) == "" {
|
|
return nil, errors.New("redis addr is required")
|
|
}
|
|
if strings.TrimSpace(cfg.KeyPrefix) == "" {
|
|
cfg.KeyPrefix = "lsp-gateway"
|
|
}
|
|
if cfg.SessionTTL <= 0 {
|
|
cfg.SessionTTL = 20 * time.Minute
|
|
}
|
|
if cfg.InstanceTTL <= 0 {
|
|
cfg.InstanceTTL = 30 * time.Second
|
|
}
|
|
if cfg.HeartbeatInterval <= 0 {
|
|
cfg.HeartbeatInterval = 10 * time.Second
|
|
}
|
|
if cfg.InstanceID == "" {
|
|
return nil, errors.New("instance id is required")
|
|
}
|
|
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: cfg.Addr,
|
|
Password: cfg.Password,
|
|
DB: cfg.DB,
|
|
})
|
|
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
return nil, fmt.Errorf("redis ping failed: %w", err)
|
|
}
|
|
|
|
registry := &RedisRegistry{
|
|
client: client,
|
|
keyPrefix: cfg.KeyPrefix,
|
|
instanceID: cfg.InstanceID,
|
|
instanceEndpoint: cfg.InstanceEndpoint,
|
|
sessionTTL: cfg.SessionTTL,
|
|
instanceTTL: cfg.InstanceTTL,
|
|
heartbeatInterval: cfg.HeartbeatInterval,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
if err := registry.refreshInstance(ctx); err != nil {
|
|
_ = client.Close()
|
|
return nil, err
|
|
}
|
|
go registry.heartbeatLoop()
|
|
return registry, nil
|
|
}
|
|
|
|
// ClaimSession 尝试声明会话归属,并解析 owner 对应的实例地址。
|
|
func (r *RedisRegistry) ClaimSession(
|
|
ctx context.Context,
|
|
language string,
|
|
sessionID string,
|
|
) (ownerID string, ownerEndpoint string, err error) {
|
|
key := r.sessionKey(language, sessionID)
|
|
now := time.Now().UTC().Format(time.RFC3339Nano)
|
|
raw, err := claimSessionScript.Run(
|
|
ctx,
|
|
r.client,
|
|
[]string{key},
|
|
r.instanceID,
|
|
now,
|
|
r.sessionTTL.Milliseconds(),
|
|
).Result()
|
|
if err != nil {
|
|
return "", "", fmt.Errorf("claim session failed: %w", err)
|
|
}
|
|
|
|
owner := fmt.Sprint(raw)
|
|
if owner == "" {
|
|
return "", "", errors.New("claim session returned empty owner")
|
|
}
|
|
if owner == r.instanceID {
|
|
return owner, r.instanceEndpoint, nil
|
|
}
|
|
|
|
endpoint, err := r.resolveInstanceEndpoint(ctx, owner)
|
|
if err != nil {
|
|
return owner, "", err
|
|
}
|
|
return owner, endpoint, nil
|
|
}
|
|
|
|
// ReleaseSession 释放当前实例持有的会话键。
|
|
func (r *RedisRegistry) ReleaseSession(ctx context.Context, language, sessionID string) error {
|
|
key := r.sessionKey(language, sessionID)
|
|
if _, err := releaseSessionScript.Run(ctx, r.client, []string{key}, r.instanceID).Result(); err != nil {
|
|
return fmt.Errorf("release session failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close 停止心跳并关闭 Redis 连接。
|
|
func (r *RedisRegistry) Close() error {
|
|
r.stopOnce.Do(func() {
|
|
close(r.stopCh)
|
|
})
|
|
return r.client.Close()
|
|
}
|
|
|
|
// heartbeatLoop 周期性刷新实例元数据,维持实例在线状态。
|
|
func (r *RedisRegistry) heartbeatLoop() {
|
|
ticker := time.NewTicker(r.heartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
_ = r.refreshInstance(ctx)
|
|
cancel()
|
|
case <-r.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// refreshInstance 更新实例 endpoint 和 TTL。
|
|
func (r *RedisRegistry) refreshInstance(ctx context.Context) error {
|
|
key := r.instanceKey(r.instanceID)
|
|
now := time.Now().UTC().Format(time.RFC3339Nano)
|
|
if err := r.client.HSet(ctx, key, map[string]any{
|
|
"endpoint": r.instanceEndpoint,
|
|
"updatedAt": now,
|
|
}).Err(); err != nil {
|
|
return fmt.Errorf("refresh instance metadata failed: %w", err)
|
|
}
|
|
if err := r.client.Expire(ctx, key, r.instanceTTL).Err(); err != nil {
|
|
return fmt.Errorf("refresh instance ttl failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// resolveInstanceEndpoint 根据实例 ID 读取其对外地址。
|
|
func (r *RedisRegistry) resolveInstanceEndpoint(ctx context.Context, ownerID string) (string, error) {
|
|
key := r.instanceKey(ownerID)
|
|
endpoint, err := r.client.HGet(ctx, key, "endpoint").Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", fmt.Errorf("resolve instance endpoint failed: %w", err)
|
|
}
|
|
return strings.TrimSpace(endpoint), nil
|
|
}
|
|
|
|
func (r *RedisRegistry) sessionKey(language, sessionID string) string {
|
|
return fmt.Sprintf("%s:sessions:%s:%s", r.keyPrefix, normalizePart(language), normalizePart(sessionID))
|
|
}
|
|
|
|
func (r *RedisRegistry) instanceKey(instanceID string) string {
|
|
return fmt.Sprintf("%s:instances:%s", r.keyPrefix, normalizePart(instanceID))
|
|
}
|
|
|
|
// normalizePart 规范化 Redis key 片段,避免空字符串破坏 key 结构。
|
|
func normalizePart(value string) string {
|
|
trimmed := strings.TrimSpace(value)
|
|
if trimmed == "" {
|
|
return "default"
|
|
}
|
|
return trimmed
|
|
}
|