Skip to content

Commit be46266

Browse files
committed
fix: remove sync_status, fix export command
1 parent 5b0a086 commit be46266

20 files changed

+861
-418
lines changed

cspell.config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ words:
66
- guiiai
77
- Logg
88
- onnotice
9+
- pgvector
910
- tiktoken
1011
- unocss
1112
- xsai

packages/cli/src/commands/embed.ts

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
import * as input from '@inquirer/prompts'
22
import { useLogger } from '@tg-search/common'
33
import { EmbeddingService } from '@tg-search/core'
4-
import {
5-
getMessageCount,
6-
getMessagesWithoutEmbedding,
7-
getPartitionTables,
8-
updateMessageEmbedding,
9-
} from '@tg-search/db'
4+
import { findMessagesByChatId, updateMessageEmbeddings } from '@tg-search/db'
105

116
import { TelegramCommand } from '../command'
127

@@ -29,20 +24,24 @@ export class EmbedCommand extends TelegramCommand {
2924
}
3025

3126
async execute(_args: string[], options: EmbedOptions): Promise<void> {
27+
// Get chat ID (required)
28+
const chatId = options.chatId || await input.input({
29+
message: '请输入会话 ID:',
30+
default: '',
31+
})
32+
33+
if (!chatId) {
34+
throw new Error('会话 ID 是必需的')
35+
}
36+
3237
// Get batch size
3338
const batchSize = options.batchSize || await input.input({
3439
message: '请输入每批处理的消息数量:',
3540
default: '1000',
3641
})
3742

38-
// Get chat ID
39-
const chatId = options.chatId || await input.input({
40-
message: '请输入会话 ID(留空处理所有会话):',
41-
default: '',
42-
})
43-
4443
// Get concurrency
45-
const _concurrency = options.concurrency || await input.input({
44+
const concurrency = options.concurrency || await input.input({
4645
message: '请输入并发数:',
4746
default: '4',
4847
})
@@ -51,48 +50,44 @@ export class EmbedCommand extends TelegramCommand {
5150
const embedding = new EmbeddingService()
5251

5352
try {
54-
// Get message count
55-
const tables = await getPartitionTables()
56-
let totalMessages = 0
57-
let totalProcessed = 0
58-
let failedEmbeddings = 0
59-
60-
for (const table of tables) {
61-
const count = await getMessageCount(Number(table.tableName))
62-
totalMessages += count
63-
}
53+
// Get all messages for the chat
54+
const messages = await findMessagesByChatId(Number(chatId))
55+
const messagesToEmbed = messages.filter(m => !m.embedding && m.content)
56+
const totalMessages = messagesToEmbed.length
6457

6558
logger.log(`共有 ${totalMessages} 条消息需要处理`)
6659

6760
// Process messages in batches
68-
while (true) {
69-
const messages = await getMessagesWithoutEmbedding(
70-
chatId ? Number(chatId) : undefined,
71-
Number(batchSize),
72-
)
73-
74-
if (messages.length === 0) {
75-
break
76-
}
61+
let totalProcessed = 0
62+
let failedEmbeddings = 0
7763

78-
logger.debug(`获取到 ${messages.length} 条消息`)
64+
// Split messages into batches
65+
for (let i = 0; i < messagesToEmbed.length; i += Number(batchSize)) {
66+
const batch = messagesToEmbed.slice(i, i + Number(batchSize))
67+
logger.debug(`处理第 ${i + 1}${i + batch.length} 条消息`)
7968

8069
// Generate embeddings in parallel
81-
const contents = messages.map(m => m.content || '')
70+
const contents = batch.map(m => m.content!)
8271
const embeddings = await embedding.generateEmbeddings(contents)
8372

84-
// Update embeddings
85-
for (let i = 0; i < messages.length; i++) {
86-
const message = messages[i]
87-
try {
88-
await updateMessageEmbedding(message.id, message.chatId, embeddings[i])
89-
totalProcessed++
90-
}
91-
catch (error) {
92-
logger.withError(error).warn(`更新消息 ${message.id} 的向量嵌入失败`)
93-
failedEmbeddings++
73+
// Prepare updates
74+
const updates = batch.map((message, index) => ({
75+
id: message.id,
76+
embedding: embeddings[index],
77+
}))
78+
79+
try {
80+
// Update embeddings in batches with concurrency control
81+
for (let j = 0; j < updates.length; j += Number(concurrency)) {
82+
const concurrentBatch = updates.slice(j, j + Number(concurrency))
83+
await updateMessageEmbeddings(Number(chatId), concurrentBatch)
84+
totalProcessed += concurrentBatch.length
9485
}
9586
}
87+
catch (error) {
88+
logger.withError(error).warn(`更新消息向量嵌入失败`)
89+
failedEmbeddings += batch.length
90+
}
9691

9792
logger.log(`已处理 ${totalProcessed}/${totalMessages} 条消息,${failedEmbeddings} 条失败`)
9893
}

packages/cli/src/commands/export.ts

Lines changed: 147 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
import type { Dialog, TelegramMessageType } from '@tg-search/core'
2+
import type { MessageCreateInput, NewChat } from '@tg-search/db'
23

34
import * as fs from 'node:fs/promises'
45
import * as path from 'node:path'
56
import * as input from '@inquirer/prompts'
6-
import { useLogger } from '@tg-search/common'
7+
import { getConfig, useLogger } from '@tg-search/common'
8+
import { createMessageBatch, updateChat } from '@tg-search/db'
79

810
import { TelegramCommand } from '../command'
911

1012
const logger = useLogger()
1113

1214
interface ExportOptions {
1315
chatId?: number
14-
format?: 'json' | 'html'
16+
format?: 'database' | 'html' | 'json'
1517
path?: string
1618
messageTypes?: TelegramMessageType[]
1719
startTime?: Date
@@ -20,6 +22,69 @@ interface ExportOptions {
2022
batchSize?: number
2123
}
2224

25+
/**
26+
* Process a batch of messages for database export
27+
*/
28+
async function processDatabaseBatch(
29+
messages: MessageCreateInput[],
30+
startIndex: number,
31+
): Promise<{ shouldStop: boolean, failedCount: number }> {
32+
try {
33+
// Create messages in batch
34+
const result = await createMessageBatch(messages)
35+
const firstMessage = messages[0]
36+
const lastMessage = messages[messages.length - 1]
37+
38+
logger.debug(
39+
`已保存 ${startIndex + 1} - ${startIndex + messages.length} 条消息 `
40+
+ `(${new Date(firstMessage.createdAt).toLocaleString()} - ${new Date(lastMessage.createdAt).toLocaleString()})`,
41+
)
42+
43+
// If any message already exists, stop the export
44+
if (!result.success || result.duplicateCount > 0) {
45+
logger.debug('检测到已存在的消息,导出完成')
46+
return { shouldStop: true, failedCount: 0 }
47+
}
48+
49+
return { shouldStop: false, failedCount: 0 }
50+
}
51+
catch (error) {
52+
logger.withError(error).error(`保存批次消息失败 (${startIndex + 1} - ${startIndex + messages.length})`)
53+
return { shouldStop: false, failedCount: messages.length }
54+
}
55+
}
56+
57+
/**
58+
* Save messages to JSON file
59+
*/
60+
async function saveToJsonFile(messages: any[], chatId: number, exportPath: string): Promise<boolean> {
61+
try {
62+
const fileName = `${chatId}_${new Date().toISOString().split('T')[0]}`
63+
const filePath = path.join(exportPath, `${fileName}.json`)
64+
await fs.writeFile(filePath, JSON.stringify(messages, null, 2))
65+
logger.debug(`已保存 JSON 文件: ${filePath}`)
66+
logger.log(`已导出到文件: ${filePath}`)
67+
return true
68+
}
69+
catch (error) {
70+
logger.withError(error).error('保存 JSON 文件失败')
71+
return false
72+
}
73+
}
74+
75+
/**
76+
* Update chat metadata
77+
*/
78+
async function updateChatMetadata(chat: Dialog): Promise<void> {
79+
const chatInput: NewChat = {
80+
id: chat.id,
81+
type: chat.type,
82+
title: chat.name,
83+
lastSyncTime: new Date(),
84+
}
85+
await updateChat(chatInput)
86+
}
87+
2388
/**
2489
* Export command to export messages from Telegram
2590
*/
@@ -55,16 +120,22 @@ export class ExportCommand extends TelegramCommand {
55120
const format = options.format || await input.select({
56121
message: '请选择导出格式:',
57122
choices: [
58-
{ name: 'HTML', value: 'html' },
59-
{ name: 'JSON', value: 'json' },
123+
{ name: 'Database', value: 'database' },
60124
],
61125
})
62126

63-
// Get export path
64-
const exportPath = options.path || await input.input({
65-
message: '请输入导出路径:',
66-
default: './export',
67-
})
127+
// Get export path if not exporting to database
128+
let exportPath = options.path
129+
if (format !== 'database') {
130+
exportPath = exportPath || await input.input({
131+
message: '请输入导出路径:',
132+
default: './export',
133+
})
134+
135+
// Create export directory
136+
await fs.mkdir(exportPath, { recursive: true })
137+
logger.debug(`已创建导出目录: ${exportPath}`)
138+
}
68139

69140
// Get message types
70141
const messageTypes = options.messageTypes || await input.checkbox({
@@ -109,21 +180,14 @@ export class ExportCommand extends TelegramCommand {
109180
default: '0',
110181
})
111182

112-
// Get batch size
113-
const batchSize = options.batchSize || await input.input({
114-
message: '每多少条消息提醒一次继续?',
115-
default: '3000',
116-
})
117-
118-
// Create export directory
119-
await fs.mkdir(exportPath, { recursive: true })
120-
logger.debug(`已创建导出目录: ${exportPath}`)
183+
// Get batch size from config
184+
const batchSize = options.batchSize || getConfig().messageBatchSize
121185

122186
// Export messages
123187
logger.log('正在导出消息...')
124188
let count = 0
125189
let failedCount = 0
126-
const skippedCount = 0
190+
let shouldStop = false
127191
const messages = []
128192

129193
// Export messages
@@ -138,17 +202,72 @@ export class ExportCommand extends TelegramCommand {
138202
count++
139203
if (count % Number(batchSize) === 0) {
140204
logger.debug(`已处理 ${count} 条消息`)
205+
206+
// Save current batch to database if format is database
207+
if (format === 'database') {
208+
const batch = messages.slice(count - Number(batchSize), count)
209+
const messageInputs: MessageCreateInput[] = batch.map(message => ({
210+
id: message.id,
211+
chatId: message.chatId,
212+
type: message.type,
213+
content: message.content,
214+
fromId: message.fromId,
215+
replyToId: message.replyToId,
216+
forwardFromChatId: message.forwardFromChatId,
217+
forwardFromMessageId: message.forwardFromMessageId,
218+
views: message.views,
219+
forwards: message.forwards,
220+
createdAt: message.createdAt,
221+
}))
222+
223+
const result = await processDatabaseBatch(messageInputs, count - Number(batchSize))
224+
shouldStop = result.shouldStop
225+
failedCount += result.failedCount
226+
227+
if (shouldStop) {
228+
break
229+
}
230+
}
141231
}
142232
}
143233

144-
// Save to file
145-
const fileName = `${chatId}_${new Date().toISOString().split('T')[0]}`
146-
const filePath = path.join(exportPath, `${fileName}.${format}`)
147-
234+
// Save to file or database
148235
try {
149236
if (format === 'json') {
150-
await fs.writeFile(filePath, JSON.stringify(messages, null, 2))
151-
logger.debug(`已保存 JSON 文件: ${filePath}`)
237+
const success = await saveToJsonFile(messages, chatId, exportPath!)
238+
if (!success) {
239+
failedCount = count
240+
}
241+
}
242+
else if (format === 'database' && !shouldStop) {
243+
// Update chat metadata
244+
await updateChatMetadata(selectedChat)
245+
246+
// Save remaining messages
247+
const remainingCount = count % Number(batchSize)
248+
if (remainingCount > 0) {
249+
const batch = messages.slice(-remainingCount)
250+
const messageInputs: MessageCreateInput[] = batch.map(message => ({
251+
id: message.id,
252+
chatId: message.chatId,
253+
type: message.type,
254+
content: message.content,
255+
fromId: message.fromId,
256+
replyToId: message.replyToId,
257+
forwardFromChatId: message.forwardFromChatId,
258+
forwardFromMessageId: message.forwardFromMessageId,
259+
views: message.views,
260+
forwards: message.forwards,
261+
createdAt: message.createdAt,
262+
}))
263+
264+
const result = await processDatabaseBatch(messageInputs, count - remainingCount)
265+
failedCount += result.failedCount
266+
}
267+
268+
if (failedCount === 0) {
269+
logger.log('已导出到数据库')
270+
}
152271
}
153272
else {
154273
// TODO: 实现 HTML 导出
@@ -158,27 +277,13 @@ export class ExportCommand extends TelegramCommand {
158277
}
159278
catch (error) {
160279
failedCount = count
161-
logger.withError(error).error(`保存文件失败: ${filePath}`)
162-
}
163-
164-
if (failedCount === 0) {
165-
logger.log(`已导出到文件: ${filePath}`)
280+
logger.withError(error).error(`保存失败`)
166281
}
167282

168283
const summary = failedCount > 0
169-
? `导出完成,共导出 ${count} 条消息,${failedCount} 条消息失败${skippedCount} 条消息已存在`
170-
: `导出完成,共导出 ${count} 条消息${skippedCount} 条消息已存在`
284+
? `导出完成,共导出 ${count} 条消息,${failedCount} 条消息失败`
285+
: `导出完成,共导出 ${count} 条消息`
171286
logger.log(summary)
172-
173-
// Ask if continue
174-
const shouldContinue = await input.confirm({
175-
message: '是否继续导出?',
176-
default: false,
177-
})
178-
179-
if (shouldContinue) {
180-
await this.execute(_args, options)
181-
}
182287
}
183288
}
184289

0 commit comments

Comments
 (0)