fix: 修复若干问题,添加java lsp
This commit is contained in:
@@ -22,10 +22,17 @@ type SessionStatsProvider interface {
|
||||
ActiveSessions() map[string]int
|
||||
}
|
||||
|
||||
// LSPStatusProvider 暴露按语言的 LSP 探测状态。
|
||||
type LSPStatusProvider interface {
|
||||
LspServiceStatus() map[string]any
|
||||
}
|
||||
|
||||
// RouteOptions 控制 HTTP/WS 接口的超时与请求体上限。
|
||||
type RouteOptions struct {
|
||||
RequestTimeout time.Duration // 单次补全调用超时时间。
|
||||
MaxBodyBytes int64 // 请求体最大字节数(HTTP/WS 共用)。
|
||||
// LSPStatusProvider 可选,用于输出语言服务在线状态。
|
||||
LSPStatusProvider LSPStatusProvider
|
||||
}
|
||||
|
||||
// RegisterRoutes 注册健康检查、HTTP 补全接口和 WebSocket 补全接口。
|
||||
@@ -41,6 +48,7 @@ func RegisterRoutes(router *gin.Engine, service CompletionService, options ...Ro
|
||||
if options[0].MaxBodyBytes > 0 {
|
||||
opts.MaxBodyBytes = options[0].MaxBodyBytes
|
||||
}
|
||||
opts.LSPStatusProvider = options[0].LSPStatusProvider
|
||||
}
|
||||
|
||||
router.GET("/health", func(c *gin.Context) {
|
||||
@@ -62,6 +70,17 @@ func RegisterRoutes(router *gin.Engine, service CompletionService, options ...Ro
|
||||
})
|
||||
})
|
||||
|
||||
router.GET("/health/lsp-status", func(c *gin.Context) {
|
||||
languages := map[string]any{}
|
||||
if opts.LSPStatusProvider != nil {
|
||||
languages = opts.LSPStatusProvider.LspServiceStatus()
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "ok",
|
||||
"languages": languages,
|
||||
})
|
||||
})
|
||||
|
||||
registerWSRoutes(router, service, opts)
|
||||
|
||||
handleCompletion := func(c *gin.Context) {
|
||||
|
||||
@@ -28,6 +28,14 @@ func (f *fakeCompletionService) Complete(_ context.Context, _ completion.Request
|
||||
return f.resp, nil
|
||||
}
|
||||
|
||||
type fakeLSPStatusProvider struct {
|
||||
status map[string]any
|
||||
}
|
||||
|
||||
func (f *fakeLSPStatusProvider) LspServiceStatus() map[string]any {
|
||||
return f.status
|
||||
}
|
||||
|
||||
// 验证 HTTP 补全接口的成功路径。
|
||||
func TestRegisterRoutesCompletionSuccess(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
@@ -174,3 +182,41 @@ func TestRegisterRoutesCompletionWebSocketSuccess(t *testing.T) {
|
||||
t.Fatalf("unexpected items: %+v", resp.Items)
|
||||
}
|
||||
}
|
||||
|
||||
// 验证 /health/lsp-status 会返回语言探测状态快照。
|
||||
func TestRegisterRoutesLspStatus(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
r := gin.New()
|
||||
RegisterRoutes(r, &fakeCompletionService{}, RouteOptions{
|
||||
LSPStatusProvider: &fakeLSPStatusProvider{
|
||||
status: map[string]any{
|
||||
"go": map[string]any{
|
||||
"online": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/health/lsp-status", nil)
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var got map[string]any
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if got["status"] != "ok" {
|
||||
t.Fatalf("expected status ok, got %#v", got["status"])
|
||||
}
|
||||
languages, ok := got["languages"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("languages field type mismatch: %#v", got["languages"])
|
||||
}
|
||||
if _, ok := languages["go"]; !ok {
|
||||
t.Fatalf("expected go language status, got %#v", languages)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var ErrUnsupportedLanguage = errors.New("unsupported language")
|
||||
@@ -38,12 +40,13 @@ type ClientFactory func(ctx context.Context, spec LanguageServerSpec, workspaceD
|
||||
|
||||
// ManagerConfig 控制会话池容量、TTL 与实例信息。
|
||||
type ManagerConfig struct {
|
||||
WorkspaceDir string // LSP 进程工作区目录。
|
||||
MaxSessions int // 本实例会话上限。
|
||||
SessionTTL time.Duration // 会话空闲超时。
|
||||
CleanupInterval time.Duration // 会话清理周期。
|
||||
InstanceID string // 当前实例 ID。
|
||||
Registry SessionRegistry // 可选分布式会话注册中心。
|
||||
WorkspaceDir string // LSP 进程工作区目录。
|
||||
MaxSessions int // 本实例会话上限。
|
||||
SessionTTL time.Duration // 会话空闲超时。
|
||||
CleanupInterval time.Duration // 会话清理周期。
|
||||
InstanceID string // 当前实例 ID。
|
||||
Registry SessionRegistry // 可选分布式会话注册中心。
|
||||
SessionInitTimeout time.Duration // LSP 客户端初始化超时(独立于请求超时)。
|
||||
}
|
||||
|
||||
// Manager 按 language/session 复用 LSP 会话,并负责清理与淘汰。
|
||||
@@ -92,6 +95,9 @@ func NewManager(config ManagerConfig, specs []LanguageServerSpec, factory Client
|
||||
if config.CleanupInterval <= 0 {
|
||||
config.CleanupInterval = 2 * time.Minute
|
||||
}
|
||||
if config.SessionInitTimeout <= 0 {
|
||||
config.SessionInitTimeout = 60 * time.Second
|
||||
}
|
||||
if strings.TrimSpace(config.InstanceID) == "" {
|
||||
config.InstanceID = "instance-local"
|
||||
}
|
||||
@@ -116,6 +122,34 @@ func NewManager(config ManagerConfig, specs []LanguageServerSpec, factory Client
|
||||
return m
|
||||
}
|
||||
|
||||
// WarmUp 预热指定语言的 LSP 会话,使首次请求无需等待冷启动。
|
||||
func (m *Manager) WarmUp(ctx context.Context, languages ...string) {
|
||||
targets := languages
|
||||
if len(targets) == 0 {
|
||||
targets = make([]string, 0, len(m.specByLang))
|
||||
for lang := range m.specByLang {
|
||||
targets = append(targets, lang)
|
||||
}
|
||||
}
|
||||
for _, lang := range targets {
|
||||
lang = normalizeLanguage(lang)
|
||||
spec, ok := m.specByLang[lang]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
sessionKey := buildSessionKey(lang, "default")
|
||||
_, err := m.getOrCreateSession(ctx, sessionKey, "default", spec)
|
||||
if err != nil {
|
||||
zap.L().Warn("warm-up failed",
|
||||
zap.String("language", lang),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
zap.L().Info("warm-up succeeded", zap.String("language", lang))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Complete 处理补全请求,包含语言匹配、会话归属、会话复用/创建。
|
||||
func (m *Manager) Complete(ctx context.Context, req Request) (Response, error) {
|
||||
language := normalizeLanguage(req.Language)
|
||||
@@ -253,7 +287,11 @@ func (m *Manager) getOrCreateSession(
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
client, err := m.newClient(ctx, spec, m.config.WorkspaceDir)
|
||||
// 使用独立超时创建 LSP 客户端,避免被较短的请求超时截断。
|
||||
initCtx, initCancel := context.WithTimeout(context.Background(), m.config.SessionInitTimeout)
|
||||
defer initCancel()
|
||||
|
||||
client, err := m.newClient(initCtx, spec, m.config.WorkspaceDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ func NewClient(parent context.Context, cfg Config) (*Client, error) {
|
||||
cfg.ClientName = "monica-lsp-gateway"
|
||||
}
|
||||
|
||||
cmd := exec.Command(cfg.Command, cfg.Args...)
|
||||
cmd := exec.Command(filepath.FromSlash(cfg.Command), cfg.Args...)
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create stdin pipe: %w", err)
|
||||
@@ -138,7 +138,7 @@ func NewClient(parent context.Context, cfg Config) (*Client, error) {
|
||||
// 独立协程持续读取 stdout 并分发响应。
|
||||
go client.readLoop(stdout)
|
||||
|
||||
initCtx, cancel := context.WithTimeout(parent, 10*time.Second)
|
||||
initCtx, cancel := context.WithTimeout(parent, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.initialize(initCtx, cfg.RootPath); err != nil {
|
||||
@@ -277,7 +277,20 @@ func (c *Client) Completion(ctx context.Context, uri string, line, character int
|
||||
|
||||
var windowsDrivePattern = regexp.MustCompile(`^[A-Za-z]:`)
|
||||
|
||||
// normalizeURI 将相对 file URI 重写为工作区绝对路径 URI。
|
||||
// languageExtensions 定义 languageId 对应的规范文件扩展名。
|
||||
var languageExtensions = map[string]string{
|
||||
"java": ".java",
|
||||
"go": ".go",
|
||||
"javascript": ".js",
|
||||
"typescript": ".ts",
|
||||
"python": ".py",
|
||||
"c": ".c",
|
||||
"cpp": ".cpp",
|
||||
"rust": ".rs",
|
||||
}
|
||||
|
||||
// normalizeURI 将相对 file URI 重写为工作区绝对路径 URI,
|
||||
// 并将不匹配 languageId 的扩展名(如 .txt)替换为语言对应扩展名。
|
||||
func (c *Client) normalizeURI(rawURI string) (string, error) {
|
||||
if rawURI == "" {
|
||||
return "", errors.New("empty uri")
|
||||
@@ -305,6 +318,14 @@ func (c *Client) normalizeURI(rawURI string) (string, error) {
|
||||
localPath = filepath.Join(c.workspaceDir, filepath.FromSlash(rel))
|
||||
}
|
||||
|
||||
// 若文件扩展名与 languageId 不匹配(如 .txt),替换为语言对应扩展名。
|
||||
if expectedExt, ok := languageExtensions[strings.ToLower(c.languageID)]; ok {
|
||||
currentExt := strings.ToLower(filepath.Ext(localPath))
|
||||
if currentExt != expectedExt {
|
||||
localPath = strings.TrimSuffix(localPath, filepath.Ext(localPath)) + expectedExt
|
||||
}
|
||||
}
|
||||
|
||||
return pathToURI(localPath)
|
||||
}
|
||||
|
||||
|
||||
180
backend/internal/monitor/lsp_status_monitor.go
Normal file
180
backend/internal/monitor/lsp_status_monitor.go
Normal file
@@ -0,0 +1,180 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"monica-go-completion-backend/internal/completion"
|
||||
)
|
||||
|
||||
// Config 控制探测间隔与下线阈值。
|
||||
type Config struct {
|
||||
ProbeInterval time.Duration
|
||||
ProbeTimeout time.Duration
|
||||
FailureThreshold int
|
||||
}
|
||||
|
||||
// LanguageStatus 表示单语言探测状态。
|
||||
type LanguageStatus struct {
|
||||
Language string `json:"language"`
|
||||
Online bool `json:"online"`
|
||||
LastCheckedAt time.Time `json:"lastCheckedAt"`
|
||||
LastSuccessAt time.Time `json:"lastSuccessAt,omitempty"`
|
||||
ConsecutiveFailures int `json:"consecutiveFailures"`
|
||||
LastError string `json:"lastError,omitempty"`
|
||||
}
|
||||
|
||||
// LSPStatusMonitor 周期性探测各语言 LSP 是否可用。
|
||||
type LSPStatusMonitor struct {
|
||||
specs []completion.LanguageServerSpec
|
||||
workspaceDir string
|
||||
factory completion.ClientFactory
|
||||
cfg Config
|
||||
|
||||
mu sync.RWMutex
|
||||
statuses map[string]LanguageStatus
|
||||
|
||||
stopCh chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
// NewLSPStatusMonitor 创建并启动探测协程。
|
||||
func NewLSPStatusMonitor(
|
||||
specs []completion.LanguageServerSpec,
|
||||
workspaceDir string,
|
||||
factory completion.ClientFactory,
|
||||
cfg Config,
|
||||
) *LSPStatusMonitor {
|
||||
if cfg.ProbeInterval <= 0 {
|
||||
cfg.ProbeInterval = 8 * time.Second
|
||||
}
|
||||
if cfg.ProbeTimeout <= 0 {
|
||||
cfg.ProbeTimeout = 5 * time.Second
|
||||
}
|
||||
if cfg.FailureThreshold <= 0 {
|
||||
cfg.FailureThreshold = 2
|
||||
}
|
||||
|
||||
normalized := make([]completion.LanguageServerSpec, 0, len(specs))
|
||||
statuses := make(map[string]LanguageStatus)
|
||||
for _, spec := range specs {
|
||||
lang := normalizeLanguage(spec.Language)
|
||||
if lang == "" {
|
||||
continue
|
||||
}
|
||||
spec.Language = lang
|
||||
normalized = append(normalized, spec)
|
||||
statuses[lang] = LanguageStatus{
|
||||
Language: lang,
|
||||
Online: false,
|
||||
}
|
||||
}
|
||||
|
||||
m := &LSPStatusMonitor{
|
||||
specs: normalized,
|
||||
workspaceDir: workspaceDir,
|
||||
factory: factory,
|
||||
cfg: cfg,
|
||||
statuses: statuses,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
go m.loop()
|
||||
return m
|
||||
}
|
||||
|
||||
// LspServiceStatus 返回按语言聚合的状态快照。
|
||||
func (m *LSPStatusMonitor) LspServiceStatus() map[string]any {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
out := make(map[string]any, len(m.statuses))
|
||||
for language, status := range m.statuses {
|
||||
out[language] = status
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Close 停止后台探测循环。
|
||||
func (m *LSPStatusMonitor) Close() {
|
||||
m.stopOnce.Do(func() {
|
||||
close(m.stopCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *LSPStatusMonitor) loop() {
|
||||
// 启动后先做一轮探测,避免首次读取全是未知状态。
|
||||
m.probeAll()
|
||||
|
||||
ticker := time.NewTicker(m.cfg.ProbeInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
m.probeAll()
|
||||
case <-m.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *LSPStatusMonitor) probeAll() {
|
||||
for _, spec := range m.specs {
|
||||
m.probeOne(spec)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *LSPStatusMonitor) probeOne(spec completion.LanguageServerSpec) {
|
||||
language := normalizeLanguage(spec.Language)
|
||||
if language == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.cfg.ProbeTimeout)
|
||||
defer cancel()
|
||||
|
||||
client, err := m.factory(ctx, spec, m.workspaceDir)
|
||||
if err != nil {
|
||||
m.markFailure(language, err)
|
||||
return
|
||||
}
|
||||
_ = client.Close()
|
||||
m.markSuccess(language)
|
||||
}
|
||||
|
||||
func (m *LSPStatusMonitor) markSuccess(language string) {
|
||||
now := time.Now()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
st := m.statuses[language]
|
||||
st.LastCheckedAt = now
|
||||
st.LastSuccessAt = now
|
||||
st.ConsecutiveFailures = 0
|
||||
st.LastError = ""
|
||||
st.Online = true
|
||||
m.statuses[language] = st
|
||||
}
|
||||
|
||||
func (m *LSPStatusMonitor) markFailure(language string, err error) {
|
||||
now := time.Now()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
st := m.statuses[language]
|
||||
st.LastCheckedAt = now
|
||||
st.ConsecutiveFailures++
|
||||
if err != nil {
|
||||
st.LastError = err.Error()
|
||||
}
|
||||
if st.ConsecutiveFailures >= m.cfg.FailureThreshold {
|
||||
st.Online = false
|
||||
}
|
||||
m.statuses[language] = st
|
||||
}
|
||||
|
||||
func normalizeLanguage(language string) string {
|
||||
return strings.ToLower(strings.TrimSpace(language))
|
||||
}
|
||||
109
backend/internal/monitor/lsp_status_monitor_test.go
Normal file
109
backend/internal/monitor/lsp_status_monitor_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"monica-go-completion-backend/internal/completion"
|
||||
)
|
||||
|
||||
type fakeRuntimeClient struct{}
|
||||
|
||||
func (f *fakeRuntimeClient) DidOpen(_ context.Context, _ string, _ string, _ int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeRuntimeClient) DidChange(_ context.Context, _ string, _ string, _ int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeRuntimeClient) Completion(
|
||||
_ context.Context,
|
||||
_ string,
|
||||
_ int,
|
||||
_ int,
|
||||
) (completion.Response, error) {
|
||||
return completion.Response{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeRuntimeClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitUntil(t *testing.T, timeout time.Duration, cond func() bool) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if cond() {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("condition not met within %s", timeout)
|
||||
}
|
||||
|
||||
func TestMonitorMarksOnlineAfterSuccessfulProbe(t *testing.T) {
|
||||
specs := []completion.LanguageServerSpec{
|
||||
{Language: "go", LanguageID: "go", Command: "gopls"},
|
||||
}
|
||||
factory := func(
|
||||
_ context.Context,
|
||||
_ completion.LanguageServerSpec,
|
||||
_ string,
|
||||
) (completion.RuntimeClient, error) {
|
||||
return &fakeRuntimeClient{}, nil
|
||||
}
|
||||
|
||||
m := NewLSPStatusMonitor(specs, ".", factory, Config{
|
||||
ProbeInterval: 20 * time.Millisecond,
|
||||
ProbeTimeout: 100 * time.Millisecond,
|
||||
FailureThreshold: 2,
|
||||
})
|
||||
defer m.Close()
|
||||
|
||||
waitUntil(t, 500*time.Millisecond, func() bool {
|
||||
raw := m.LspServiceStatus()["go"]
|
||||
status, ok := raw.(LanguageStatus)
|
||||
return ok && status.Online
|
||||
})
|
||||
}
|
||||
|
||||
func TestMonitorMarksOfflineAfterConsecutiveFailures(t *testing.T) {
|
||||
specs := []completion.LanguageServerSpec{
|
||||
{Language: "go", LanguageID: "go", Command: "gopls"},
|
||||
}
|
||||
var calls atomic.Int32
|
||||
factory := func(
|
||||
_ context.Context,
|
||||
_ completion.LanguageServerSpec,
|
||||
_ string,
|
||||
) (completion.RuntimeClient, error) {
|
||||
n := calls.Add(1)
|
||||
if n == 1 {
|
||||
return &fakeRuntimeClient{}, nil
|
||||
}
|
||||
return nil, errors.New("probe failed")
|
||||
}
|
||||
|
||||
m := NewLSPStatusMonitor(specs, ".", factory, Config{
|
||||
ProbeInterval: 20 * time.Millisecond,
|
||||
ProbeTimeout: 100 * time.Millisecond,
|
||||
FailureThreshold: 2,
|
||||
})
|
||||
defer m.Close()
|
||||
|
||||
waitUntil(t, 500*time.Millisecond, func() bool {
|
||||
raw := m.LspServiceStatus()["go"]
|
||||
status, ok := raw.(LanguageStatus)
|
||||
return ok && status.LastSuccessAt.UnixNano() > 0
|
||||
})
|
||||
|
||||
waitUntil(t, 800*time.Millisecond, func() bool {
|
||||
raw := m.LspServiceStatus()["go"]
|
||||
status, ok := raw.(LanguageStatus)
|
||||
return ok && !status.Online && status.ConsecutiveFailures >= 2
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user