- Add Nacos registry for service registration and deregistration. - Implement Redis registry for session management with heartbeat and session claiming. - Improve completion service with session handling and request validation. - Enhance WebSocket handling for completion requests with JSON-RPC support. - Add tests for new registry implementations and completion manager functionalities. - Refactor existing code for better readability and maintainability.
187 lines
5.2 KiB
Go
187 lines
5.2 KiB
Go
package cluster
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"net"
|
||
"net/url"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/vo"
|
||
)
|
||
|
||
type NacosRegistryConfig struct {
|
||
ServerAddr string // Nacos 地址,例如 10.0.0.10:8848。
|
||
Namespace string // Nacos namespace,可为空表示 public。
|
||
Group string // 服务组,默认 DEFAULT_GROUP。
|
||
ServiceName string // 注册服务名,例如 lsp-gateway。
|
||
ClusterName string // 可选集群名。
|
||
Username string // Nacos 用户名,可为空。
|
||
Password string // Nacos 密码,可为空。
|
||
IP string // 实例注册 IP,建议注入可达内网地址。
|
||
Port uint64 // 实例注册端口(服务监听端口)。
|
||
Metadata map[string]string // 实例元数据。
|
||
Ephemeral bool // 是否临时实例,默认 true。
|
||
Weight float64 // 实例权重,默认 1。
|
||
TimeoutMs uint64 // SDK 请求超时,默认 5000ms。
|
||
}
|
||
|
||
// NacosRegistry 负责将当前实例注册到 Nacos,并在退出时反注册。
|
||
type NacosRegistry struct {
|
||
client naming_client.INamingClient
|
||
serviceName string
|
||
groupName string
|
||
clusterName string
|
||
ip string
|
||
port uint64
|
||
ephemeral bool
|
||
|
||
closeOnce sync.Once
|
||
}
|
||
|
||
func NewNacosRegistry(cfg NacosRegistryConfig) (*NacosRegistry, error) {
|
||
serverHost, serverPort, err := parseServerAddr(cfg.ServerAddr)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if strings.TrimSpace(cfg.ServiceName) == "" {
|
||
return nil, errors.New("nacos service name is required")
|
||
}
|
||
if strings.TrimSpace(cfg.IP) == "" {
|
||
return nil, errors.New("nacos register ip is required")
|
||
}
|
||
if cfg.Port == 0 {
|
||
return nil, errors.New("nacos register port is required")
|
||
}
|
||
if strings.TrimSpace(cfg.Group) == "" {
|
||
cfg.Group = "DEFAULT_GROUP"
|
||
}
|
||
if cfg.Weight <= 0 {
|
||
cfg.Weight = 1
|
||
}
|
||
if cfg.TimeoutMs == 0 {
|
||
cfg.TimeoutMs = 5000
|
||
}
|
||
|
||
clientConfig := constant.NewClientConfig(
|
||
constant.WithNamespaceId(strings.TrimSpace(cfg.Namespace)),
|
||
constant.WithTimeoutMs(cfg.TimeoutMs),
|
||
constant.WithUsername(strings.TrimSpace(cfg.Username)),
|
||
constant.WithPassword(strings.TrimSpace(cfg.Password)),
|
||
)
|
||
|
||
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{
|
||
ClientConfig: clientConfig,
|
||
ServerConfigs: []constant.ServerConfig{
|
||
{
|
||
IpAddr: serverHost,
|
||
Port: serverPort,
|
||
},
|
||
},
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create nacos naming client failed: %w", err)
|
||
}
|
||
|
||
registry := &NacosRegistry{
|
||
client: namingClient,
|
||
serviceName: strings.TrimSpace(cfg.ServiceName),
|
||
groupName: strings.TrimSpace(cfg.Group),
|
||
clusterName: strings.TrimSpace(cfg.ClusterName),
|
||
ip: strings.TrimSpace(cfg.IP),
|
||
port: cfg.Port,
|
||
ephemeral: cfg.Ephemeral,
|
||
}
|
||
|
||
ok, err := registry.client.RegisterInstance(vo.RegisterInstanceParam{
|
||
Ip: registry.ip,
|
||
Port: registry.port,
|
||
Weight: cfg.Weight,
|
||
Enable: true,
|
||
Healthy: true,
|
||
Metadata: cfg.Metadata,
|
||
ClusterName: registry.clusterName,
|
||
ServiceName: registry.serviceName,
|
||
GroupName: registry.groupName,
|
||
Ephemeral: registry.ephemeral,
|
||
})
|
||
if err != nil {
|
||
registry.client.CloseClient()
|
||
return nil, fmt.Errorf("register nacos instance failed: %w", err)
|
||
}
|
||
if !ok {
|
||
registry.client.CloseClient()
|
||
return nil, errors.New("register nacos instance returned false")
|
||
}
|
||
|
||
return registry, nil
|
||
}
|
||
|
||
func (r *NacosRegistry) Close() error {
|
||
var closeErr error
|
||
r.closeOnce.Do(func() {
|
||
ok, err := r.client.DeregisterInstance(vo.DeregisterInstanceParam{
|
||
Ip: r.ip,
|
||
Port: r.port,
|
||
Cluster: r.clusterName,
|
||
ServiceName: r.serviceName,
|
||
GroupName: r.groupName,
|
||
Ephemeral: r.ephemeral,
|
||
})
|
||
if err != nil {
|
||
closeErr = fmt.Errorf("deregister nacos instance failed: %w", err)
|
||
} else if !ok {
|
||
closeErr = errors.New("deregister nacos instance returned false")
|
||
}
|
||
r.client.CloseClient()
|
||
})
|
||
return closeErr
|
||
}
|
||
|
||
func parseServerAddr(addr string) (string, uint64, error) {
|
||
trimmed := strings.TrimSpace(addr)
|
||
if trimmed == "" {
|
||
return "", 0, errors.New("nacos server addr is required")
|
||
}
|
||
|
||
host := trimmed
|
||
port := uint64(8848)
|
||
if strings.Contains(trimmed, ":") {
|
||
parsedHost, parsedPort, err := net.SplitHostPort(trimmed)
|
||
if err != nil {
|
||
return "", 0, fmt.Errorf("invalid nacos server addr %q: %w", addr, err)
|
||
}
|
||
host = parsedHost
|
||
rawPort, err := strconv.ParseUint(parsedPort, 10, 64)
|
||
if err != nil {
|
||
return "", 0, fmt.Errorf("invalid nacos server port %q: %w", parsedPort, err)
|
||
}
|
||
port = rawPort
|
||
}
|
||
|
||
host = strings.TrimSpace(host)
|
||
if host == "" {
|
||
return "", 0, errors.New("nacos server host is required")
|
||
}
|
||
return host, port, nil
|
||
}
|
||
|
||
func ResolveRegisterIP(explicitIP, instanceURL string) string {
|
||
if ip := strings.TrimSpace(explicitIP); ip != "" {
|
||
return ip
|
||
}
|
||
|
||
u, err := url.Parse(strings.TrimSpace(instanceURL))
|
||
if err == nil {
|
||
if host := strings.TrimSpace(u.Hostname()); host != "" {
|
||
return host
|
||
}
|
||
}
|
||
return ""
|
||
}
|