package engine import ( "context" "sync" "time" "git.kingecg.top/kingecg/gomog/internal/database" "git.kingecg.top/kingecg/gomog/pkg/errors" "git.kingecg.top/kingecg/gomog/pkg/types" ) // MemoryStore 内存数据存储 type MemoryStore struct { mu sync.RWMutex collections map[string]*Collection adapter database.DatabaseAdapter } // Collection 内存集合 type Collection struct { name string documents map[string]types.Document // id -> Document mu sync.RWMutex } // NewMemoryStore 创建内存存储 func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore { return &MemoryStore{ collections: make(map[string]*Collection), adapter: adapter, } } // CreateTestCollectionForTesting 为测试创建集合(仅用于测试) func CreateTestCollectionForTesting(store *MemoryStore, name string, documents map[string]types.Document) { store.collections[name] = &Collection{ name: name, documents: documents, } } // LoadCollection 从数据库加载集合到内存 func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error { // 检查集合是否存在 exists, err := ms.adapter.CollectionExists(ctx, name) if err != nil { return err } if !exists { // 创建集合 if err := ms.adapter.CreateCollection(ctx, name); err != nil { return err } } // 从数据库加载所有文档 docs, err := ms.adapter.FindAll(ctx, name) if err != nil { return err } ms.mu.Lock() defer ms.mu.Unlock() coll := &Collection{ name: name, documents: make(map[string]types.Document), } for _, doc := range docs { coll.documents[doc.ID] = doc } ms.collections[name] = coll return nil } // GetCollection 获取集合 func (ms *MemoryStore) GetCollection(name string) (*Collection, error) { ms.mu.RLock() defer ms.mu.RUnlock() coll, exists := ms.collections[name] if !exists { return nil, errors.ErrCollectionNotFnd } return coll, nil } // Insert 插入文档到内存 func (ms *MemoryStore) Insert(collection string, doc types.Document) error { coll, err := ms.GetCollection(collection) if err != nil { return err } coll.mu.Lock() defer coll.mu.Unlock() coll.documents[doc.ID] = doc return nil } // Find 查询文档 func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) { coll, err := ms.GetCollection(collection) if err != nil { return nil, err } coll.mu.RLock() defer coll.mu.RUnlock() var results []types.Document for _, doc := range coll.documents { if MatchFilter(doc.Data, filter) { results = append(results, doc) } } return results, nil } // Update 更新文档(支持 upsert 和 arrayFilters) func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update, upsert bool, arrayFilters []types.Filter) (int, int, []string, error) { coll, err := ms.GetCollection(collection) if err != nil { return 0, 0, nil, err } coll.mu.Lock() defer coll.mu.Unlock() matched := 0 modified := 0 var upsertedIDs []string for id, doc := range coll.documents { if MatchFilter(doc.Data, filter) { matched++ // 应用更新 newData := applyUpdateWithFilters(doc.Data, update, false, arrayFilters) coll.documents[id] = types.Document{ ID: doc.ID, Data: newData, CreatedAt: doc.CreatedAt, UpdatedAt: time.Now(), } modified++ } } // 处理 upsert:如果没有匹配的文档且设置了 upsert if matched == 0 && upsert { // 创建新文档 newID := generateID() newDoc := make(map[string]interface{}) // 应用更新($setOnInsert 会生效) newData := applyUpdateWithFilters(newDoc, update, true, arrayFilters) coll.documents[newID] = types.Document{ ID: newID, Data: newData, CreatedAt: time.Now(), UpdatedAt: time.Now(), } matched = 1 modified = 1 upsertedIDs = append(upsertedIDs, newID) } return matched, modified, upsertedIDs, nil } // Delete 删除文档 func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) { coll, err := ms.GetCollection(collection) if err != nil { return 0, err } coll.mu.Lock() defer coll.mu.Unlock() deleted := 0 for id, doc := range coll.documents { if MatchFilter(doc.Data, filter) { delete(coll.documents, id) deleted++ } } return deleted, nil } // SyncToDB 同步集合到数据库 func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error { coll, err := ms.GetCollection(collection) if err != nil { return err } coll.mu.RLock() defer coll.mu.RUnlock() // 转换为文档数组 docs := make([]types.Document, 0, len(coll.documents)) for _, doc := range coll.documents { docs = append(docs, doc) } // 批量插入/更新到数据库 // 注意:这里简化处理,实际应该区分新增和更新 return ms.adapter.InsertMany(ctx, collection, docs) } // GetAllDocuments 获取集合的所有文档(用于聚合) func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) { coll, err := ms.GetCollection(collection) if err != nil { return nil, err } coll.mu.RLock() defer coll.mu.RUnlock() docs := make([]types.Document, 0, len(coll.documents)) for _, doc := range coll.documents { docs = append(docs, doc) } return docs, nil }