From c3716d7a6c0f637c32ed6468d3726886f4ff3170 Mon Sep 17 00:00:00 2001 From: lirui Date: Mon, 9 Feb 2026 22:06:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=95=B0=E6=8D=AE=E8=A1=A5?= =?UTF-8?q?=E5=85=A8=E5=92=8C=E6=8F=90=E4=BA=A4=E8=84=9A=E6=9C=AC=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BB=8E=20job=5Fbundle.json=20=E8=AF=BB?= =?UTF-8?q?=E5=8F=96=E6=95=B0=E6=8D=AE=E5=B9=B6=E6=89=A7=E8=A1=8C=20API=20?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=EF=BC=9B=E5=AE=9E=E7=8E=B0=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=8F=90=E4=BA=A4=E5=8A=9F=E8=83=BD=EF=BC=8C=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E6=88=90=E5=8A=9F=E4=B8=8E=E5=A4=B1=E8=B4=A5=E7=9A=84=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .claude/settings.local.json | 5 +- DEMAND-queryFiled.md | 137 ++++++++++++++++++++++++ scripts/node/enricher.mjs | 201 ++++++++++++++++++++++++++++++++++++ scripts/node/submitter.mjs | 177 +++++++++++++++++++++++++++++++ scripts/py/enricher.py | 186 +++++++++++++++++++++++++++++++++ scripts/py/submitter.py | 161 +++++++++++++++++++++++++++++ 6 files changed, 866 insertions(+), 1 deletion(-) create mode 100644 DEMAND-queryFiled.md create mode 100644 scripts/node/enricher.mjs create mode 100644 scripts/node/submitter.mjs create mode 100644 scripts/py/enricher.py create mode 100644 scripts/py/submitter.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 3c25678..3eed4d5 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -6,7 +6,10 @@ "Bash(npm run check:*)", "Bash(npx vitest:*)", "Bash(node -e:*)", - "Bash(npm run build:*)" + "Bash(npm run build:*)", + "Bash(npm run test:*)", + "Bash(python -m py_compile:*)", + "Bash(node --check:*)" ] }, "enableAllProjectMcpServers": true, diff --git a/DEMAND-queryFiled.md b/DEMAND-queryFiled.md new file mode 100644 index 0000000..e23b308 --- /dev/null +++ b/DEMAND-queryFiled.md @@ -0,0 +1,137 @@ + + +**Role:** 资深前端架构师 (Svelte 5 + TypeScript) + +**Project Pivot (重大架构调整):** +我们将 "Excel2JSON Mapper" 升级为一个 **"ETL 配置生成器 (ETL Blueprint Generator)"**。 +前端的任务是生成一个包含 **[源数据 + 处理逻辑]** 的 **Job Bundle (JSON 文件)**,用户将使用该文件配合 Python 脚本在后端执行实际的数据抓取和入库。 + +### Phase 2: ETL 配置生成器需求文档 + +#### 1. 项目概述 + +这是一个基于 Svelte 5 的单页应用。用户上传 Excel,配置字段映射规则(包含静态重命名和动态 API 获取规则),最后导出为一个标准化的 JSON 任务包 (`job_bundle.json`)。 + +#### 2. 用户界面与交互 (UI/UX) + +##### 2.1 顶部工具栏 + +* **导入/导出配置:** 支持保存当前所有的映射规则。 +* **导出任务包 (Export Job Bundle):** 這是核心操作。点击后下载 `job_bundle.json` 文件(包含数据+配置)。 +* **提交设置 (Submission Settings):** 一个模态框,配置最终数据推送到哪里。 +* `Target URL`: 最终数据接收接口 (e.g., `https://api.db.com/bulk-insert`). +* `Method`: POST / PUT. +* `Batch Size`: 批次大小 (默认 50). + + + +##### 2.2 主体区域 (左右分栏) + +* **左侧 (Source):** Excel 表格预览。 +* **右侧 (Preview):** 静态映射后的 JSON 预览(仅展示前 20 条以保证性能)。 + +##### 2.3 核心功能:列配置 (Column Configuration) + +在左侧表格区域,除了点击现有表头修改映射外,新增 **"添加计算列 (Add Computed Column)"** 功能。 + +**新增类型:`API_FETCH` (动态 API 字段)** +当用户选择此类型时,弹出一个详细配置面板: + +1. **Target Key:** 最终生成的 JSON 字段名 (例如 `user_balance`)。 +2. **Request URL (支持模板变量):** +* 允许使用 `{{ColumnName}}` 语法引用当前行的 Excel 数据。 +* *示例:* `https://api.example.com/users/{{用户ID}}/detail` +* *UI 交互:* 输入框旁应有“插入变量”按钮,点击列出所有可用 Excel 表头。 + + +3. **Request Method:** 下拉选择 `GET` (默认) 或 `POST`。 +4. **Headers:** Key-Value 编辑器 (用于传 `Authorization`, `Content-Type` 等)。 +5. **Request Body (仅 POST):** +* 多行文本域,支持 JSON 格式。 +* 同样支持 `{{ColumnName}}` 模板变量替换。 + + +6. **Response Extractor (取值路径):** +* 指定从接口返回的 JSON 中提取哪个字段。 +* 支持 `lodash.get` 风格的点号路径。 +* *示例:* 接口返回 `{ "data": { "balance": 100 } }`,用户填 `data.balance`。 + + + +#### 3. 核心输出:Job Bundle 数据结构 + +请严格按照以下 TypeScript 接口定义生成导出的 JSON 文件: + +```typescript +// 1. 静态映射规则 +interface StaticRule { + type: 'static'; + source: string; // Excel 原表头 + target: string; // JSON 目标 Key + dataType: 'string' | 'number' | 'boolean' | 'date'; + format?: string; // 日期格式化字符串 +} + +// 2. 动态 API 获取规则 (本次新增核心) +interface ApiEnrichmentRule { + type: 'api_fetch'; + target_key: string; // JSON 目标 Key + url_template: string; // "https://api.com/{{id}}" + method: 'GET' | 'POST'; + headers?: Record; + body_template?: string; // POST body 模板 + response_path: string; // "data.result.value" + fallback_value?: any; // 默认值 (null/0) +} + +// 3. 提交配置 +interface SubmissionConfig { + target_url: string; + method: 'POST' | 'PUT'; + batch_size: number; +} + +// 4. 最终导出的 Job Bundle 结构 +interface JobBundle { + meta: { + version: string; + generated_at: string; + }; + config: { + static_rules: StaticRule[]; + enrichment_rules: ApiEnrichmentRule[]; + submission: SubmissionConfig; + }; + source_data: Record[]; // 经过静态映射后的基础数据列表 +} + +``` + +#### 4. 开发任务清单 + +1. **Store 设计:** 更新 Svelte Store 以存储 `enrichmentRules` 和 `submissionConfig`。 +2. **UI 组件:** +* 开发 `ApiConfigModal.svelte`: 用于录入 URL、Headers、Body 等复杂信息。 +* 实现变量插入辅助功能 (点击列名自动插入 `{{...}}`)。 + + +3. **导出逻辑:** 编写 `generateJobBundle` 函数。 +* **步骤 1:** 根据 `static_rules` 转换 Excel 原始数据,生成基础 JSON 数组。 +* **步骤 2:** 将基础数据、API 规则、提交配置组装成 `JobBundle` 格式。 +* **步骤 3:** 触发浏览器下载 `job_bundle.json`。 + + + +#### 5. 特别说明 (给 AI 的提示) + +* **No Runtime Fetch:** 前端代码 **不需要** 执行 `fetch` 去调用用户配置的 API(避免 CORS)。前端只负责把 URL 字符串保存到 JSON 里。 +* **Template Validation:** 在 UI 上简单校验 URL 模板格式(检查是否包含 `{}`),但不做逻辑校验。 +* **Preview Limitations:** 右侧预览仅展示静态映射的结果。对于 API 字段,可以在预览中显示一个占位符(如 `[Pending API Fetch]`)。 + +--- + +### 后续步骤 + +Claude 完成这个前端代码后,你可以再发一条指令让它写对应的 Python 执行脚本: + +> "前端已经完成了。现在请帮我写一个 Python 脚本 (`executor.py`)。它读取上面定义的 `job_bundle.json`,使用 http请求 执行 `enrichment_rules` 里的请求(注意替换 URL 中的 {{变量}}),最后把结果推送到 `submission` 定义的接口。" \ No newline at end of file diff --git a/scripts/node/enricher.mjs b/scripts/node/enricher.mjs new file mode 100644 index 0000000..e88b8ad --- /dev/null +++ b/scripts/node/enricher.mjs @@ -0,0 +1,201 @@ +/** + * enricher.mjs — API 数据补全脚本 (Node.js) + * + * 读取前端生成的 job_bundle.json,执行 enrichment_rules 中的 API 请求, + * 将结果合并到 source_data 中,输出最终完整的 JSON 文件。 + * + * 用法: + * node enricher.mjs job_bundle.json + * node enricher.mjs job_bundle.json -o enriched_data.json + * node enricher.mjs job_bundle.json --concurrency 10 + */ + +import { readFileSync, writeFileSync } from "fs"; +import { basename, join, dirname } from "path"; + +// ── helpers ── + +function resolvePath(obj, path) { + const keys = path.split("."); + let current = obj; + for (const key of keys) { + if (current == null) return null; + if (Array.isArray(current)) { + const idx = Number(key); + if (Number.isNaN(idx) || idx < 0 || idx >= current.length) return null; + current = current[idx]; + } else if (typeof current === "object") { + current = current[key]; + } else { + return null; + } + } + return current ?? null; +} + +function renderTemplate(template, row) { + return template.replace(/\{\{(.+?)\}\}/g, (_, key) => { + const val = row[key]; + return val != null ? String(val) : ""; + }); +} + +// ── fetch one ── + +async function fetchOne(rule, row, rowIndex, semaphore) { + const targetKey = rule.target_key; + const fallback = rule.fallback_value ?? null; + const url = renderTemplate(rule.url_template, row); + const method = (rule.method || "GET").toUpperCase(); + + const headers = {}; + if (rule.headers) { + for (const [k, v] of Object.entries(rule.headers)) { + headers[k] = renderTemplate(v, row); + } + } + + /** @type {RequestInit} */ + const opts = { method, headers }; + + if (method === "POST" && rule.body_template) { + const body = renderTemplate(rule.body_template, row); + try { + JSON.parse(body); + headers["Content-Type"] = headers["Content-Type"] || "application/json"; + opts.body = body; + } catch { + opts.body = body; + } + } + + await semaphore.acquire(); + try { + const resp = await fetch(url, opts); + if (!resp.ok) { + console.log(` [WARN] Row ${rowIndex} | ${targetKey} | HTTP ${resp.status} <- ${url}`); + return { rowIndex, targetKey, value: fallback }; + } + const data = await resp.json(); + const value = resolvePath(data, rule.response_path); + return { rowIndex, targetKey, value: value ?? fallback }; + } catch (e) { + console.log(` [ERROR] Row ${rowIndex} | ${targetKey} | ${e.message}`); + return { rowIndex, targetKey, value: fallback }; + } finally { + semaphore.release(); + } +} + +// ── semaphore ── + +function createSemaphore(max) { + let current = 0; + /** @type {(() => void)[]} */ + const queue = []; + return { + acquire() { + if (current < max) { + current++; + return Promise.resolve(); + } + return new Promise((resolve) => queue.push(resolve)); + }, + release() { + current--; + if (queue.length > 0) { + current++; + queue.shift()(); + } + }, + }; +} + +// ── main ── + +function parseArgs() { + const args = process.argv.slice(2); + const opts = { bundle: "", output: "", concurrency: 5 }; + for (let i = 0; i < args.length; i++) { + if (args[i] === "-o" || args[i] === "--output") { + opts.output = args[++i]; + } else if (args[i] === "--concurrency") { + opts.concurrency = Number(args[++i]) || 5; + } else if (args[i] === "--help" || args[i] === "-h") { + console.log("Usage: node enricher.mjs [-o output.json] [--concurrency N]"); + process.exit(0); + } else if (!opts.bundle) { + opts.bundle = args[i]; + } + } + if (!opts.bundle) { + console.error("Error: Please provide a job_bundle.json path."); + console.error("Usage: node enricher.mjs "); + process.exit(1); + } + return opts; +} + +async function main() { + const opts = parseArgs(); + + const raw = readFileSync(opts.bundle, "utf-8"); + const bundle = JSON.parse(raw); + + const meta = bundle.meta || {}; + const config = bundle.config || {}; + const sourceData = bundle.source_data || []; + const rules = config.enrichment_rules || []; + + console.log(`=== Job Bundle v${meta.version || "?"} ===`); + console.log(`Generated: ${meta.generated_at || "?"}`); + console.log(`Rows: ${sourceData.length}`); + console.log(`Static rules: ${(config.static_rules || []).length}`); + console.log(`Enrichment rules: ${rules.length}`); + console.log(); + + if (rules.length === 0) { + console.log("No enrichment rules configured. Outputting source data as-is."); + } else { + const totalCalls = sourceData.length * rules.length; + console.log(`Enriching ${sourceData.length} rows x ${rules.length} rule(s) = ${totalCalls} API calls`); + console.log(`Concurrency: ${opts.concurrency}`); + console.log(); + + const semaphore = createSemaphore(opts.concurrency); + const tasks = []; + for (let rowIdx = 0; rowIdx < sourceData.length; rowIdx++) { + for (const rule of rules) { + tasks.push(fetchOne(rule, sourceData[rowIdx], rowIdx, semaphore)); + } + } + + const results = await Promise.all(tasks); + + let errorCount = 0; + for (const r of results) { + if (r.value === undefined) errorCount++; + sourceData[r.rowIndex][r.targetKey] = r.value; + } + + console.log(); + console.log(`Done. ${totalCalls - errorCount}/${totalCalls} calls succeeded.`); + } + + const output = { + meta, + submission: config.submission || {}, + data: sourceData, + }; + + const outName = opts.output || opts.bundle.replace(/\.json$/i, "_enriched.json"); + writeFileSync(outName, JSON.stringify(output, null, 2), "utf-8"); + + console.log(`\nEnriched data saved to: ${outName}`); + console.log(`Next step: node submitter.mjs ${outName}`); +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/scripts/node/submitter.mjs b/scripts/node/submitter.mjs new file mode 100644 index 0000000..328db36 --- /dev/null +++ b/scripts/node/submitter.mjs @@ -0,0 +1,177 @@ +/** + * submitter.mjs — 数据提交脚本 (Node.js) + * + * 读取 enricher.mjs 输出的 JSON 文件,按批次提交到目标接口。 + * 记录提交成功和失败的记录到单独的日志文件。 + * + * 用法: + * node submitter.mjs enriched_data.json + * node submitter.mjs enriched_data.json --batch-size 100 + * node submitter.mjs enriched_data.json --url https://api.example.com/import + * node submitter.mjs enriched_data.json --dry-run + */ + +import { readFileSync, writeFileSync } from "fs"; + +// ── args ── + +function parseArgs() { + const args = process.argv.slice(2); + const opts = { input: "", url: "", method: "", batchSize: 0, dryRun: false }; + for (let i = 0; i < args.length; i++) { + if (args[i] === "--url") { + opts.url = args[++i]; + } else if (args[i] === "--method") { + opts.method = args[++i]; + } else if (args[i] === "--batch-size") { + opts.batchSize = Number(args[++i]) || 0; + } else if (args[i] === "--dry-run") { + opts.dryRun = true; + } else if (args[i] === "--help" || args[i] === "-h") { + console.log("Usage: node submitter.mjs [--url URL] [--method POST|PUT] [--batch-size N] [--dry-run]"); + process.exit(0); + } else if (!opts.input) { + opts.input = args[i]; + } + } + if (!opts.input) { + console.error("Error: Please provide an enriched JSON file path."); + console.error("Usage: node submitter.mjs "); + process.exit(1); + } + return opts; +} + +// ── submit ── + +async function submitBatch(url, method, batch, batchIndex, totalBatches) { + try { + const resp = await fetch(url, { + method, + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(batch), + }); + const body = await resp.text(); + const ok = resp.status < 400; + const tag = ok ? "OK" : "FAIL"; + console.log(` Batch ${batchIndex}/${totalBatches}: HTTP ${resp.status} ${tag} (${batch.length} records)`); + return { ok, status: resp.status, body, data: batch }; + } catch (e) { + const msg = e.message; + console.log(` Batch ${batchIndex}/${totalBatches}: ERROR - ${msg} (${batch.length} records)`); + return { ok: false, status: 0, body: msg, data: batch }; + } +} + +// ── main ── + +async function main() { + const opts = parseArgs(); + + const raw = readFileSync(opts.input, "utf-8"); + const payload = JSON.parse(raw); + + const data = payload.data || []; + const submission = payload.submission || {}; + + const targetUrl = opts.url || submission.target_url || ""; + const method = (opts.method || submission.method || "POST").toUpperCase(); + const batchSize = opts.batchSize || submission.batch_size || 50; + + if (!targetUrl) { + console.error("Error: No target URL configured. Use --url or set in job bundle."); + process.exit(1); + } + + console.log("=== Submitter ==="); + console.log(`Input: ${opts.input} (${data.length} records)`); + console.log(`Target: ${method} ${targetUrl}`); + console.log(`Batch size: ${batchSize}`); + console.log(); + + // 分批 + const batches = []; + for (let i = 0; i < data.length; i += batchSize) { + batches.push(data.slice(i, i + batchSize)); + } + const totalBatches = batches.length; + + if (opts.dryRun) { + console.log(`[DRY RUN] Would submit ${data.length} records in ${totalBatches} batch(es)`); + for (let i = 0; i < batches.length; i++) { + console.log(` Batch ${i + 1}/${totalBatches}: ${batches[i].length} records`); + } + console.log("\nDry run complete. No data was sent."); + return; + } + + /** @type {object[]} */ + const successRecords = []; + /** @type {object[]} */ + const failedRecords = []; + /** @type {object[]} */ + const failedDetails = []; + + console.log(`Submitting ${data.length} records in ${totalBatches} batch(es)...\n`); + + for (let i = 0; i < batches.length; i++) { + const result = await submitBatch(targetUrl, method, batches[i], i + 1, totalBatches); + if (result.ok) { + successRecords.push(...result.data); + } else { + failedRecords.push(...result.data); + failedDetails.push({ + batch_index: i + 1, + status: result.status, + response: result.body.slice(0, 500), + record_count: result.data.length, + }); + } + } + + // 结果统计 + console.log(); + console.log("=== Result ==="); + console.log(`Success: ${successRecords.length} records`); + console.log(`Failed: ${failedRecords.length} records`); + + const timestamp = new Date().toISOString().replace(/[-:T]/g, "").slice(0, 15); + const baseName = opts.input.replace(/\.json$/i, ""); + + if (successRecords.length > 0) { + const successFile = `${baseName}_success_${timestamp}.json`; + writeFileSync(successFile, JSON.stringify(successRecords, null, 2), "utf-8"); + console.log(`\nSuccess log: ${successFile}`); + } + + if (failedRecords.length > 0) { + const failedFile = `${baseName}_failed_${timestamp}.json`; + const report = { + summary: { + total_failed: failedRecords.length, + failed_batches: failedDetails.length, + target_url: targetUrl, + timestamp, + }, + batch_errors: failedDetails, + failed_records: failedRecords, + }; + writeFileSync(failedFile, JSON.stringify(report, null, 2), "utf-8"); + console.log(`Failed log: ${failedFile}`); + + const retryFile = `${baseName}_retry_${timestamp}.json`; + const retryPayload = { submission, data: failedRecords }; + writeFileSync(retryFile, JSON.stringify(retryPayload, null, 2), "utf-8"); + console.log(`\nTo retry failed records:`); + console.log(` node submitter.mjs ${retryFile}`); + } + + if (successRecords.length === 0 && failedRecords.length === 0) { + console.log("\nNo records to submit."); + } +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/scripts/py/enricher.py b/scripts/py/enricher.py new file mode 100644 index 0000000..b6bc62e --- /dev/null +++ b/scripts/py/enricher.py @@ -0,0 +1,186 @@ +""" +enricher.py — API 数据补全脚本 + +读取前端生成的 job_bundle.json,执行 enrichment_rules 中的 API 请求, +将结果合并到 source_data 中,输出最终完整的 JSON 文件。 + +用法: + python enricher.py job_bundle.json + python enricher.py job_bundle.json -o enriched_data.json + python enricher.py job_bundle.json --concurrency 10 +""" + +import argparse +import asyncio +import json +import re +import sys +from pathlib import Path + +import aiohttp + + +def resolve_path(obj, path: str): + """按点号路径从 dict/list 中取值,类似 lodash.get""" + keys = path.split(".") + current = obj + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + elif isinstance(current, list): + try: + current = current[int(key)] + except (ValueError, IndexError): + return None + else: + return None + return current + + +def render_template(template: str, row: dict) -> str: + """将 {{变量名}} 替换为行数据中的对应值""" + def replacer(match: re.Match) -> str: + key = match.group(1) + value = row.get(key, "") + return str(value) if value is not None else "" + + return re.sub(r"\{\{(.+?)\}\}", replacer, template) + + +async def fetch_one( + session: aiohttp.ClientSession, + rule: dict, + row: dict, + row_index: int, + semaphore: asyncio.Semaphore, +) -> tuple[int, str, object]: + """对单行数据执行一条 enrichment rule 的 API 请求""" + target_key = rule["target_key"] + fallback = rule.get("fallback_value") + + url = render_template(rule["url_template"], row) + + headers = {} + if rule.get("headers"): + for k, v in rule["headers"].items(): + headers[k] = render_template(v, row) + + method = rule.get("method", "GET").upper() + + body = None + if method == "POST" and rule.get("body_template"): + body = render_template(rule["body_template"], row) + + async with semaphore: + try: + kwargs: dict = {"headers": headers} + if body is not None: + try: + kwargs["json"] = json.loads(body) + except json.JSONDecodeError: + kwargs["data"] = body + + async with session.request(method, url, **kwargs) as resp: + if resp.status >= 400: + print(f" [WARN] Row {row_index} | {target_key} | HTTP {resp.status} <- {url}") + return row_index, target_key, fallback + + data = await resp.json(content_type=None) + value = resolve_path(data, rule["response_path"]) + if value is None: + value = fallback + return row_index, target_key, value + + except Exception as e: + print(f" [ERROR] Row {row_index} | {target_key} | {type(e).__name__}: {e}") + return row_index, target_key, fallback + + +async def run_enrichments( + source_data: list[dict], + rules: list[dict], + concurrency: int, +) -> list[dict]: + """对所有行执行所有 enrichment rules""" + if not rules: + print("No enrichment rules configured. Outputting source data as-is.") + return source_data + + semaphore = asyncio.Semaphore(concurrency) + tasks = [] + + total_calls = len(source_data) * len(rules) + print(f"Enriching {len(source_data)} rows x {len(rules)} rule(s) = {total_calls} API calls") + print(f"Concurrency: {concurrency}") + print() + + async with aiohttp.ClientSession() as session: + for row_idx, row in enumerate(source_data): + for rule in rules: + tasks.append(fetch_one(session, rule, row, row_idx, semaphore)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + error_count = 0 + for result in results: + if isinstance(result, Exception): + print(f" [ERROR] Unexpected: {result}") + error_count += 1 + continue + row_idx, target_key, value = result + source_data[row_idx][target_key] = value + + print() + print(f"Done. {total_calls - error_count}/{total_calls} calls succeeded.") + return source_data + + +async def main(): + parser = argparse.ArgumentParser(description="Enrich job_bundle.json with API data and output final JSON.") + parser.add_argument("bundle", help="Path to job_bundle.json") + parser.add_argument("-o", "--output", type=str, default=None, help="Output file path (default: _enriched.json)") + parser.add_argument("--concurrency", type=int, default=5, help="Max concurrent API requests (default: 5)") + args = parser.parse_args() + + bundle_path = Path(args.bundle) + if not bundle_path.exists(): + print(f"Error: File not found: {bundle_path}") + sys.exit(1) + + with open(bundle_path, "r", encoding="utf-8") as f: + bundle = json.load(f) + + meta = bundle.get("meta", {}) + config = bundle.get("config", {}) + source_data = bundle.get("source_data", []) + + print(f"=== Job Bundle v{meta.get('version', '?')} ===") + print(f"Generated: {meta.get('generated_at', '?')}") + print(f"Rows: {len(source_data)}") + print(f"Static rules: {len(config.get('static_rules', []))}") + print(f"Enrichment rules: {len(config.get('enrichment_rules', []))}") + print() + + enriched = await run_enrichments( + source_data, + config.get("enrichment_rules", []), + args.concurrency, + ) + + # 构建输出,保留 submission 配置 + output = { + "meta": meta, + "submission": config.get("submission", {}), + "data": enriched, + } + + output_path = Path(args.output) if args.output else bundle_path.with_name(bundle_path.stem + "_enriched.json") + with open(output_path, "w", encoding="utf-8") as f: + json.dump(output, f, ensure_ascii=False, indent=2) + + print(f"\nEnriched data saved to: {output_path}") + print(f"Next step: python submitter.py {output_path}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/py/submitter.py b/scripts/py/submitter.py new file mode 100644 index 0000000..d95fe7d --- /dev/null +++ b/scripts/py/submitter.py @@ -0,0 +1,161 @@ +""" +submitter.py — 数据提交脚本 + +读取 enricher.py 输出的 JSON 文件,按批次提交到目标接口。 +记录提交成功和失败的记录到单独的日志文件。 + +用法: + python submitter.py enriched_data.json + python submitter.py enriched_data.json --batch-size 100 + python submitter.py enriched_data.json --url https://api.example.com/import # 覆盖 URL + python submitter.py enriched_data.json --dry-run # 只输出不提交 +""" + +import argparse +import asyncio +import json +import sys +from datetime import datetime +from pathlib import Path + +import aiohttp + + +async def submit_batch( + session: aiohttp.ClientSession, + url: str, + method: str, + batch: list[dict], + batch_index: int, + total_batches: int, +) -> tuple[bool, int, str, list[dict]]: + """提交一个批次,返回 (success, status_code, response_text, batch_data)""" + try: + async with session.request(method, url, json=batch) as resp: + status = resp.status + body = await resp.text() + ok = status < 400 + tag = "OK" if ok else "FAIL" + print(f" Batch {batch_index}/{total_batches}: HTTP {status} {tag} ({len(batch)} records)") + return ok, status, body, batch + except Exception as e: + msg = f"{type(e).__name__}: {e}" + print(f" Batch {batch_index}/{total_batches}: ERROR - {msg} ({len(batch)} records)") + return False, 0, msg, batch + + +async def main(): + parser = argparse.ArgumentParser(description="Submit enriched data to target API in batches.") + parser.add_argument("input", help="Path to enriched JSON file (from enricher.py)") + parser.add_argument("--url", type=str, default=None, help="Override target URL from config") + parser.add_argument("--method", type=str, default=None, choices=["POST", "PUT"], help="Override HTTP method") + parser.add_argument("--batch-size", type=int, default=None, help="Override batch size") + parser.add_argument("--dry-run", action="store_true", help="Print what would be submitted without sending") + args = parser.parse_args() + + input_path = Path(args.input) + if not input_path.exists(): + print(f"Error: File not found: {input_path}") + sys.exit(1) + + with open(input_path, "r", encoding="utf-8") as f: + payload = json.load(f) + + data = payload.get("data", []) + submission = payload.get("submission", {}) + + target_url = args.url or submission.get("target_url", "") + method = args.method or submission.get("method", "POST") + batch_size = args.batch_size or submission.get("batch_size", 50) + + if not target_url: + print("Error: No target URL configured. Use --url or set in job bundle.") + sys.exit(1) + + print(f"=== Submitter ===") + print(f"Input: {input_path} ({len(data)} records)") + print(f"Target: {method} {target_url}") + print(f"Batch size: {batch_size}") + print() + + batches = [data[i : i + batch_size] for i in range(0, len(data), batch_size)] + total_batches = len(batches) + + if args.dry_run: + print(f"[DRY RUN] Would submit {len(data)} records in {total_batches} batch(es)") + for i, batch in enumerate(batches, 1): + print(f" Batch {i}/{total_batches}: {len(batch)} records") + print("\nDry run complete. No data was sent.") + return + + success_records: list[dict] = [] + failed_records: list[dict] = [] + failed_details: list[dict] = [] + + print(f"Submitting {len(data)} records in {total_batches} batch(es)...\n") + + async with aiohttp.ClientSession() as session: + for i, batch in enumerate(batches, 1): + ok, status, body, batch_data = await submit_batch( + session, target_url, method, batch, i, total_batches + ) + if ok: + success_records.extend(batch_data) + else: + failed_records.extend(batch_data) + failed_details.append({ + "batch_index": i, + "status": status, + "response": body[:500], + "record_count": len(batch_data), + }) + + # 结果统计 + print() + print(f"=== Result ===") + print(f"Success: {len(success_records)} records") + print(f"Failed: {len(failed_records)} records") + + # 写入日志文件 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + base_name = input_path.stem + + if success_records: + success_file = input_path.with_name(f"{base_name}_success_{timestamp}.json") + with open(success_file, "w", encoding="utf-8") as f: + json.dump(success_records, f, ensure_ascii=False, indent=2) + print(f"\nSuccess log: {success_file}") + + if failed_records: + failed_file = input_path.with_name(f"{base_name}_failed_{timestamp}.json") + report = { + "summary": { + "total_failed": len(failed_records), + "failed_batches": len(failed_details), + "target_url": target_url, + "timestamp": timestamp, + }, + "batch_errors": failed_details, + "failed_records": failed_records, + } + with open(failed_file, "w", encoding="utf-8") as f: + json.dump(report, f, ensure_ascii=False, indent=2) + print(f"Failed log: {failed_file}") + print(f"\nTo retry failed records:") + print(f' python submitter.py {failed_file} --url "{target_url}"') + # 生成一个可直接重试的文件 + retry_file = input_path.with_name(f"{base_name}_retry_{timestamp}.json") + retry_payload = { + "submission": submission, + "data": failed_records, + } + with open(retry_file, "w", encoding="utf-8") as f: + json.dump(retry_payload, f, ensure_ascii=False, indent=2) + print(f" or: python submitter.py {retry_file}") + + if not success_records and not failed_records: + print("\nNo records to submit.") + + +if __name__ == "__main__": + asyncio.run(main())