Files
meowrain 3284ce07c7 feat: enhance API and session management with Nacos and Redis integration
- Add Nacos registry for service registration and deregistration.
- Implement Redis registry for session management with heartbeat and session claiming.
- Improve completion service with session handling and request validation.
- Enhance WebSocket handling for completion requests with JSON-RPC support.
- Add tests for new registry implementations and completion manager functionalities.
- Refactor existing code for better readability and maintainability.
2026-02-15 17:46:34 +08:00

187 lines
5.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cluster
import (
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"sync"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
type NacosRegistryConfig struct {
ServerAddr string // Nacos 地址,例如 10.0.0.10:8848。
Namespace string // Nacos namespace可为空表示 public。
Group string // 服务组,默认 DEFAULT_GROUP。
ServiceName string // 注册服务名,例如 lsp-gateway。
ClusterName string // 可选集群名。
Username string // Nacos 用户名,可为空。
Password string // Nacos 密码,可为空。
IP string // 实例注册 IP建议注入可达内网地址。
Port uint64 // 实例注册端口(服务监听端口)。
Metadata map[string]string // 实例元数据。
Ephemeral bool // 是否临时实例,默认 true。
Weight float64 // 实例权重,默认 1。
TimeoutMs uint64 // SDK 请求超时,默认 5000ms。
}
// NacosRegistry 负责将当前实例注册到 Nacos并在退出时反注册。
type NacosRegistry struct {
client naming_client.INamingClient
serviceName string
groupName string
clusterName string
ip string
port uint64
ephemeral bool
closeOnce sync.Once
}
func NewNacosRegistry(cfg NacosRegistryConfig) (*NacosRegistry, error) {
serverHost, serverPort, err := parseServerAddr(cfg.ServerAddr)
if err != nil {
return nil, err
}
if strings.TrimSpace(cfg.ServiceName) == "" {
return nil, errors.New("nacos service name is required")
}
if strings.TrimSpace(cfg.IP) == "" {
return nil, errors.New("nacos register ip is required")
}
if cfg.Port == 0 {
return nil, errors.New("nacos register port is required")
}
if strings.TrimSpace(cfg.Group) == "" {
cfg.Group = "DEFAULT_GROUP"
}
if cfg.Weight <= 0 {
cfg.Weight = 1
}
if cfg.TimeoutMs == 0 {
cfg.TimeoutMs = 5000
}
clientConfig := constant.NewClientConfig(
constant.WithNamespaceId(strings.TrimSpace(cfg.Namespace)),
constant.WithTimeoutMs(cfg.TimeoutMs),
constant.WithUsername(strings.TrimSpace(cfg.Username)),
constant.WithPassword(strings.TrimSpace(cfg.Password)),
)
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: clientConfig,
ServerConfigs: []constant.ServerConfig{
{
IpAddr: serverHost,
Port: serverPort,
},
},
})
if err != nil {
return nil, fmt.Errorf("create nacos naming client failed: %w", err)
}
registry := &NacosRegistry{
client: namingClient,
serviceName: strings.TrimSpace(cfg.ServiceName),
groupName: strings.TrimSpace(cfg.Group),
clusterName: strings.TrimSpace(cfg.ClusterName),
ip: strings.TrimSpace(cfg.IP),
port: cfg.Port,
ephemeral: cfg.Ephemeral,
}
ok, err := registry.client.RegisterInstance(vo.RegisterInstanceParam{
Ip: registry.ip,
Port: registry.port,
Weight: cfg.Weight,
Enable: true,
Healthy: true,
Metadata: cfg.Metadata,
ClusterName: registry.clusterName,
ServiceName: registry.serviceName,
GroupName: registry.groupName,
Ephemeral: registry.ephemeral,
})
if err != nil {
registry.client.CloseClient()
return nil, fmt.Errorf("register nacos instance failed: %w", err)
}
if !ok {
registry.client.CloseClient()
return nil, errors.New("register nacos instance returned false")
}
return registry, nil
}
func (r *NacosRegistry) Close() error {
var closeErr error
r.closeOnce.Do(func() {
ok, err := r.client.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: r.ip,
Port: r.port,
Cluster: r.clusterName,
ServiceName: r.serviceName,
GroupName: r.groupName,
Ephemeral: r.ephemeral,
})
if err != nil {
closeErr = fmt.Errorf("deregister nacos instance failed: %w", err)
} else if !ok {
closeErr = errors.New("deregister nacos instance returned false")
}
r.client.CloseClient()
})
return closeErr
}
func parseServerAddr(addr string) (string, uint64, error) {
trimmed := strings.TrimSpace(addr)
if trimmed == "" {
return "", 0, errors.New("nacos server addr is required")
}
host := trimmed
port := uint64(8848)
if strings.Contains(trimmed, ":") {
parsedHost, parsedPort, err := net.SplitHostPort(trimmed)
if err != nil {
return "", 0, fmt.Errorf("invalid nacos server addr %q: %w", addr, err)
}
host = parsedHost
rawPort, err := strconv.ParseUint(parsedPort, 10, 64)
if err != nil {
return "", 0, fmt.Errorf("invalid nacos server port %q: %w", parsedPort, err)
}
port = rawPort
}
host = strings.TrimSpace(host)
if host == "" {
return "", 0, errors.New("nacos server host is required")
}
return host, port, nil
}
func ResolveRegisterIP(explicitIP, instanceURL string) string {
if ip := strings.TrimSpace(explicitIP); ip != "" {
return ip
}
u, err := url.Parse(strings.TrimSpace(instanceURL))
if err == nil {
if host := strings.TrimSpace(u.Hostname()); host != "" {
return host
}
}
return ""
}