Files

371 lines
9.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package completion
import (
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
"go.uber.org/zap"
)
var ErrUnsupportedLanguage = errors.New("unsupported language")
var ErrTooManySessions = errors.New("too many active lsp sessions")
// RuntimeClient 是带生命周期管理能力的补全客户端。
type RuntimeClient interface {
Client
Close() error
}
// SessionRegistry 抽象跨实例会话归属协调能力(如 Redis
type SessionRegistry interface {
ClaimSession(ctx context.Context, language, sessionID string) (ownerID string, ownerEndpoint string, err error)
ReleaseSession(ctx context.Context, language, sessionID string) error
Close() error
}
// LanguageServerSpec 描述某种语言对应的 LSP 启动参数。
type LanguageServerSpec struct {
Language string // 语言名(如 go/typescript用于路由匹配。
LanguageID string // 传给 LSP 的 languageId。
Command string // LSP 可执行命令。
Args []string // LSP 启动参数。
}
type ClientFactory func(ctx context.Context, spec LanguageServerSpec, workspaceDir string) (RuntimeClient, error)
// ManagerConfig 控制会话池容量、TTL 与实例信息。
type ManagerConfig struct {
WorkspaceDir string // LSP 进程工作区目录。
MaxSessions int // 本实例会话上限。
SessionTTL time.Duration // 会话空闲超时。
CleanupInterval time.Duration // 会话清理周期。
InstanceID string // 当前实例 ID。
Registry SessionRegistry // 可选分布式会话注册中心。
SessionInitTimeout time.Duration // LSP 客户端初始化超时(独立于请求超时)。
}
// Manager 按 language/session 复用 LSP 会话,并负责清理与淘汰。
type Manager struct {
mu sync.Mutex
config ManagerConfig
specByLang map[string]LanguageServerSpec
sessions map[string]*managedSession
newClient ClientFactory
stopCh chan struct{}
stoppedOnce sync.Once
}
type managedSession struct {
key string
sessionID string
language string
service *Service
client RuntimeClient
lastUsed time.Time
createdAt time.Time
}
// ErrSessionOwnedByOtherInstance 表示会话已被其他实例持有。
type ErrSessionOwnedByOtherInstance struct {
OwnerID string
OwnerEndpoint string
}
func (e *ErrSessionOwnedByOtherInstance) Error() string {
if e.OwnerEndpoint != "" {
return fmt.Sprintf("session owned by another instance: %s (%s)", e.OwnerID, e.OwnerEndpoint)
}
return fmt.Sprintf("session owned by another instance: %s", e.OwnerID)
}
// NewManager 构建会话管理器并启动后台清理协程。
func NewManager(config ManagerConfig, specs []LanguageServerSpec, factory ClientFactory) *Manager {
if config.MaxSessions <= 0 {
config.MaxSessions = 256
}
if config.SessionTTL <= 0 {
config.SessionTTL = 20 * time.Minute
}
if config.CleanupInterval <= 0 {
config.CleanupInterval = 2 * time.Minute
}
if config.SessionInitTimeout <= 0 {
config.SessionInitTimeout = 60 * time.Second
}
if strings.TrimSpace(config.InstanceID) == "" {
config.InstanceID = "instance-local"
}
specByLang := make(map[string]LanguageServerSpec)
for _, spec := range specs {
key := normalizeLanguage(spec.Language)
if key == "" {
continue
}
specByLang[key] = spec
}
m := &Manager{
config: config,
specByLang: specByLang,
sessions: make(map[string]*managedSession),
newClient: factory,
stopCh: make(chan struct{}),
}
go m.cleanupLoop()
return m
}
// WarmUp 预热指定语言的 LSP 会话,使首次请求无需等待冷启动。
func (m *Manager) WarmUp(ctx context.Context, languages ...string) {
targets := languages
if len(targets) == 0 {
targets = make([]string, 0, len(m.specByLang))
for lang := range m.specByLang {
targets = append(targets, lang)
}
}
for _, lang := range targets {
lang = normalizeLanguage(lang)
spec, ok := m.specByLang[lang]
if !ok {
continue
}
sessionKey := buildSessionKey(lang, "default")
_, err := m.getOrCreateSession(ctx, sessionKey, "default", spec)
if err != nil {
zap.L().Warn("warm-up failed",
zap.String("language", lang),
zap.Error(err),
)
} else {
zap.L().Info("warm-up succeeded", zap.String("language", lang))
}
}
}
// Complete 处理补全请求,包含语言匹配、会话归属、会话复用/创建。
func (m *Manager) Complete(ctx context.Context, req Request) (Response, error) {
language := normalizeLanguage(req.Language)
if language == "" {
return Response{}, ErrInvalidRequest
}
spec, ok := m.specByLang[language]
if !ok {
return Response{}, fmt.Errorf("%w: %s", ErrUnsupportedLanguage, req.Language)
}
sessionKey := buildSessionKey(language, req.SessionID)
sessionID := normalizeSessionID(req.SessionID)
if m.config.Registry != nil {
// 分布式模式下先声明会话归属,避免多实例并发写同一会话。
ownerID, ownerEndpoint, err := m.config.Registry.ClaimSession(ctx, language, sessionID)
if err != nil {
return Response{}, err
}
if ownerID != m.config.InstanceID {
return Response{}, &ErrSessionOwnedByOtherInstance{
OwnerID: ownerID,
OwnerEndpoint: ownerEndpoint,
}
}
}
session, err := m.getOrCreateSession(ctx, sessionKey, sessionID, spec)
if err != nil {
return Response{}, err
}
resp, err := session.service.Complete(ctx, req)
if err != nil {
return Response{}, err
}
m.mu.Lock()
if current, ok := m.sessions[sessionKey]; ok {
current.lastUsed = time.Now()
}
m.mu.Unlock()
return resp, nil
}
// ActiveSessions 统计当前实例内各语言活跃会话数。
func (m *Manager) ActiveSessions() map[string]int {
m.mu.Lock()
defer m.mu.Unlock()
out := make(map[string]int)
for _, session := range m.sessions {
out[session.language]++
}
return out
}
// Close 停止后台任务并释放所有会话与注册中心资源。
func (m *Manager) Close() error {
m.stoppedOnce.Do(func() {
close(m.stopCh)
m.mu.Lock()
defer m.mu.Unlock()
for key, session := range m.sessions {
if m.config.Registry != nil {
_ = m.config.Registry.ReleaseSession(context.Background(), session.language, session.sessionID)
}
_ = session.client.Close()
delete(m.sessions, key)
}
if m.config.Registry != nil {
_ = m.config.Registry.Close()
}
})
return nil
}
// cleanupLoop 周期清理闲置会话。
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(m.config.CleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.cleanupIdleSessions()
case <-m.stopCh:
return
}
}
}
// cleanupIdleSessions 关闭超过 TTL 未使用的会话。
func (m *Manager) cleanupIdleSessions() {
cutoff := time.Now().Add(-m.config.SessionTTL)
m.mu.Lock()
defer m.mu.Unlock()
for key, session := range m.sessions {
if session.lastUsed.After(cutoff) {
continue
}
if m.config.Registry != nil {
_ = m.config.Registry.ReleaseSession(context.Background(), session.language, session.sessionID)
}
_ = session.client.Close()
delete(m.sessions, key)
}
}
// getOrCreateSession 返回已有会话,或按需新建一个会话。
func (m *Manager) getOrCreateSession(
ctx context.Context,
sessionKey string,
sessionID string,
spec LanguageServerSpec,
) (*managedSession, error) {
m.mu.Lock()
if existing, ok := m.sessions[sessionKey]; ok {
existing.lastUsed = time.Now()
m.mu.Unlock()
return existing, nil
}
if len(m.sessions) >= m.config.MaxSessions {
if !m.evictLeastRecentlyUsedLocked() {
m.mu.Unlock()
return nil, ErrTooManySessions
}
}
m.mu.Unlock()
// 使用独立超时创建 LSP 客户端,避免被较短的请求超时截断。
initCtx, initCancel := context.WithTimeout(context.Background(), m.config.SessionInitTimeout)
defer initCancel()
client, err := m.newClient(initCtx, spec, m.config.WorkspaceDir)
if err != nil {
return nil, err
}
now := time.Now()
newSession := &managedSession{
key: sessionKey,
sessionID: sessionID,
language: normalizeLanguage(spec.Language),
service: NewService(client),
client: client,
lastUsed: now,
createdAt: now,
}
m.mu.Lock()
defer m.mu.Unlock()
if existing, ok := m.sessions[sessionKey]; ok {
// 并发竞争下可能已经被其他协程创建,直接复用并关闭新 client。
_ = client.Close()
existing.lastUsed = now
return existing, nil
}
m.sessions[sessionKey] = newSession
return newSession, nil
}
// evictLeastRecentlyUsedLocked 在达到上限时淘汰最久未使用会话。
func (m *Manager) evictLeastRecentlyUsedLocked() bool {
if len(m.sessions) == 0 {
return false
}
keys := make([]string, 0, len(m.sessions))
for key := range m.sessions {
keys = append(keys, key)
}
slices.SortFunc(keys, func(a, b string) int {
as := m.sessions[a]
bs := m.sessions[b]
if as.lastUsed.Before(bs.lastUsed) {
return -1
}
if as.lastUsed.After(bs.lastUsed) {
return 1
}
return strings.Compare(as.key, bs.key)
})
victimKey := keys[0]
victim := m.sessions[victimKey]
if m.config.Registry != nil {
_ = m.config.Registry.ReleaseSession(context.Background(), victim.language, victim.sessionID)
}
_ = victim.client.Close()
delete(m.sessions, victimKey)
return true
}
func buildSessionKey(language, sessionID string) string {
return language + ":" + normalizeSessionID(sessionID)
}
// normalizeSessionID 将空 session 归一为 default便于复用同一会话键。
func normalizeSessionID(sessionID string) string {
sid := strings.TrimSpace(sessionID)
if sid == "" {
return "default"
}
return sid
}
// normalizeLanguage 统一语言名大小写和空白。
func normalizeLanguage(language string) string {
return strings.ToLower(strings.TrimSpace(language))
}