Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Please check the [releases](https://github.com/mochidev/CodableDatastore/release
dependencies: [
.package(
url: "https://github.com/mochidev/CodableDatastore.git",
.upToNextMinor(from: "0.3.4")
.upToNextMinor(from: "0.3.8")
),
],
...
Expand Down
5 changes: 4 additions & 1 deletion Sources/CodableDatastore/Datastore/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ public struct Configuration: Sendable {
) {
self.pageSize = pageSize
}


static let minimumPageSize = 4*1024
static let defaultPageSize = 4*1024
static let maximumPageSize = 1024*1024*1024
}
22 changes: 11 additions & 11 deletions Sources/CodableDatastore/Persistence/DatastoreInterfaceError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,27 @@ public enum DatastoreInterfaceError: LocalizedError {
public var errorDescription: String? {
switch self {
case .multipleRegistrations:
return "The datastore has already been registered with another persistence. Make sure to only register a datastore with a single persistence."
"The datastore has already been registered with another persistence. Make sure to only register a datastore with a single persistence."
case .alreadyRegistered:
return "The datastore has already been registered with this persistence. Make sure to not call register multiple times per persistence."
"The datastore has already been registered with this persistence. Make sure to not call register multiple times per persistence."
case .datastoreNotFound:
return "The datastore was not found and has likely not been registered with this persistence."
"The datastore was not found and has likely not been registered with this persistence."
case .duplicateWriters:
return "An existing datastore that can write to the persistence has already been registered for this key. Only one writer is suppored per key."
"An existing datastore that can write to the persistence has already been registered for this key. Only one writer is suppored per key."
case .instanceNotFound:
return "The requested instance could not be found with the specified identifier."
"The requested instance could not be found with the specified identifier."
case .instanceAlreadyExists:
return "The requested insertion cursor conflicts with an already existing identifier."
"The requested insertion cursor conflicts with an already existing identifier."
case .datastoreKeyNotFound:
return "The datastore being manipulated does not yet exist in the persistence."
"The datastore being manipulated does not yet exist in the persistence."
case .indexNotFound:
return "The index being manipulated does not yet exist in the datastore."
"The index being manipulated does not yet exist in the datastore."
case .transactionInactive:
return "The transaction was accessed outside of its activity window. Please make sure the transaction wasn't escaped."
"The transaction was accessed outside of its activity window. Please make sure the transaction wasn't escaped."
case .unknownCursor:
return "The cursor does not match the one provided by the persistence."
"The cursor does not match the one provided by the persistence."
case .staleCursor:
return "The cursor no longer refers to fresh data. Please make sure to use them as soon as possible and not interspaced with other writes."
"The cursor no longer refers to fresh data. Please make sure to use them as soon as possible and not interspaced with other writes."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extension DiskPersistence.Datastore {
actor Index: Identifiable {
let datastore: DiskPersistence<AccessMode>.Datastore

let id: ID
let id: PersistenceDatastoreIndexID

var _manifest: DatastoreIndexManifest?
var manifestTask: Task<DatastoreIndexManifest, Error>?
Expand All @@ -30,7 +30,7 @@ extension DiskPersistence.Datastore {

init(
datastore: DiskPersistence<AccessMode>.Datastore,
id: ID,
id: PersistenceDatastoreIndexID,
manifest: DatastoreIndexManifest? = nil
) {
self.datastore = datastore
Expand All @@ -50,7 +50,7 @@ extension DiskPersistence.Datastore {
// MARK: Hashable

extension DiskPersistence.Datastore.Index: Hashable {
static func == (lhs: DiskPersistence<AccessMode>.Datastore.Index, rhs: DiskPersistence<AccessMode>.Datastore.Index) -> Bool {
static func == (lhs: DiskPersistence.Datastore.Index, rhs: DiskPersistence.Datastore.Index) -> Bool {
lhs === rhs
}

Expand All @@ -61,8 +61,10 @@ extension DiskPersistence.Datastore.Index: Hashable {

// MARK: - Helper Types

extension DiskPersistence.Datastore.Index {
enum ID: Hashable {
typealias PersistenceDatastoreIndexID = DiskPersistence<ReadOnly>.Datastore.IndexID

extension DiskPersistence<ReadOnly>.Datastore {
enum IndexID: Hashable {
case primary(manifest: DatastoreIndexManifestIdentifier)
case direct(index: DatastoreIndexIdentifier, manifest: DatastoreIndexManifestIdentifier)
case secondary(index: DatastoreIndexIdentifier, manifest: DatastoreIndexManifestIdentifier)
Expand Down Expand Up @@ -867,13 +869,13 @@ extension DiskPersistence.Datastore.Index {
func manifest(
inserting entry: DatastorePageEntry,
at insertionCursor: DiskPersistence.InsertionCursor,
targetPageSize: Int = 4*1024
targetPageSize: Int = Configuration.defaultPageSize
) async throws -> (
manifest: DatastoreIndexManifest,
createdPages: Set<DiskPersistence.Datastore.Page>,
removedPages: Set<DiskPersistence.Datastore.Page>
) {
let actualPageSize = max(targetPageSize, 4*1024) - DiskPersistence.Datastore.Page.headerSize
let actualPageSize = min(max(targetPageSize, Configuration.minimumPageSize), Configuration.maximumPageSize) - DiskPersistence.Datastore.Page.headerSize

guard
insertionCursor.datastore === datastore,
Expand Down Expand Up @@ -1092,7 +1094,7 @@ extension DiskPersistence.Datastore.Index {
func manifest(
replacing entry: DatastorePageEntry,
at instanceCursor: DiskPersistence.InstanceCursor,
targetPageSize: Int = 4*1024
targetPageSize: Int = Configuration.defaultPageSize
) async throws -> (
manifest: DatastoreIndexManifest,
createdPages: Set<DiskPersistence.Datastore.Page>,
Expand All @@ -1107,7 +1109,7 @@ extension DiskPersistence.Datastore.Index {
firstInstanceBlock.pageIndex != lastInstanceBlock.pageIndex || firstInstanceBlock.blockIndex <= lastInstanceBlock.blockIndex
else { throw DatastoreInterfaceError.staleCursor }

let actualPageSize = max(targetPageSize, 4*1024) - DiskPersistence.Datastore.Page.headerSize
let actualPageSize = min(max(targetPageSize, Configuration.minimumPageSize), Configuration.maximumPageSize) - DiskPersistence.Datastore.Page.headerSize

var manifest = try await manifest

Expand Down Expand Up @@ -1289,7 +1291,7 @@ extension DiskPersistence.Datastore.Index {

func manifest(
deletingEntryAt instanceCursor: DiskPersistence.InstanceCursor,
targetPageSize: Int = 4*1024
targetPageSize: Int = Configuration.defaultPageSize
) async throws -> (
manifest: DatastoreIndexManifest,
createdPages: Set<DiskPersistence.Datastore.Page>,
Expand All @@ -1304,7 +1306,7 @@ extension DiskPersistence.Datastore.Index {
firstInstanceBlock.pageIndex != lastInstanceBlock.pageIndex || firstInstanceBlock.blockIndex <= lastInstanceBlock.blockIndex
else { throw DatastoreInterfaceError.staleCursor }

let actualPageSize = max(targetPageSize, 4*1024) - DiskPersistence.Datastore.Page.headerSize
let actualPageSize = min(max(targetPageSize, Configuration.minimumPageSize), Configuration.maximumPageSize) - DiskPersistence.Datastore.Page.headerSize

var manifest = try await manifest

Expand Down Expand Up @@ -1496,3 +1498,77 @@ extension DiskPersistence.Datastore.Index {
return (manifest: manifest, removedPages: removedPages)
}
}

// MARK: - Snapshotting

extension DiskPersistence.Datastore.Index {
@discardableResult
func copy(
into newDatastore: DiskPersistence<ReadWrite>.Datastore,
rootObjectManifest: inout DatastoreRootManifest,
targetPageSize: Int
) async throws -> DiskPersistence<ReadWrite>.Datastore.Index {
let actualPageSize = min(max(targetPageSize, Configuration.minimumPageSize), Configuration.maximumPageSize) - DiskPersistence.Datastore.Page.headerSize

var newIndexManifest = DatastoreIndexManifest(
id: self.id.manifestID,
orderedPages: []
)

let stream = AsyncThrowingBackpressureStream { continuation in
try await self.forwardScanEntries(after: self.firstInsertionCursor) { entry in
try await continuation.yield(entry)
return true
}
}

var currentPageBlocks: [DatastorePageEntryBlock] = []
var remainingSpace = actualPageSize

var count = 0

for try await entry in stream {
count += 1
let blocks = entry.blocks(remainingPageSpace: remainingSpace, maxPageSpace: actualPageSize)

for block in blocks {
let encodedSize = block.encodedSize
if encodedSize > remainingSpace {
let newPage = DiskPersistence<ReadWrite>.Datastore.Page(
datastore: newDatastore,
id: .init(index: id, page: .init()),
blocks: currentPageBlocks
)
newIndexManifest.orderedPages.append(.added(newPage.id.page))
try await newPage.persistIfNeeded()

currentPageBlocks.removeAll(keepingCapacity: true)
remainingSpace = actualPageSize
}

remainingSpace -= encodedSize
currentPageBlocks.append(block)
}
}

if !currentPageBlocks.isEmpty {
let newPage = DiskPersistence<ReadWrite>.Datastore.Page(
datastore: newDatastore,
id: .init(index: id, page: .init()),
blocks: currentPageBlocks
)
newIndexManifest.orderedPages.append(.added(newPage.id.page))
try await newPage.persistIfNeeded()
}

rootObjectManifest.addedIndexes.insert(.init(id))
rootObjectManifest.addedIndexManifests.insert(.init(id))
let newIndex = DiskPersistence<ReadWrite>.Datastore.Index(
datastore: newDatastore,
id: id,
manifest: newIndexManifest
)
try await newIndex.persistIfNeeded()
return newIndex
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ extension DiskPersistence.Datastore {
actor Page: Identifiable {
let datastore: DiskPersistence<AccessMode>.Datastore

let id: ID
let id: PersistenceDatastorePageID

var blocksReaderTask: Task<MultiplexedAsyncSequence<AnyReadableSequence<DatastorePageEntryBlock>>, Error>?

var isPersisted: Bool

init(
datastore: DiskPersistence<AccessMode>.Datastore,
id: ID,
id: PersistenceDatastorePageID,
blocks: [DatastorePageEntryBlock]? = nil
) {
self.datastore = datastore
Expand Down Expand Up @@ -59,9 +59,11 @@ extension DiskPersistence.Datastore.Page: Hashable {

// MARK: - Helper Types

extension DiskPersistence.Datastore.Page {
struct ID: Hashable {
let index: DiskPersistence.Datastore.Index.ID
typealias PersistenceDatastorePageID = DiskPersistence<ReadOnly>.Datastore.PageID

extension DiskPersistence<ReadOnly>.Datastore {
struct PageID: Hashable {
let index: PersistenceDatastoreIndexID
let page: DatastorePageIdentifier

var withoutManifest: Self {
Expand Down Expand Up @@ -105,7 +107,7 @@ extension DiskPersistence.Datastore.Page {
try await iterator.check(Self.header)

/// Pages larger than 1 GB are unsupported.
let transformation = try await iterator.collect(max: 1024*1024*1024) { sequence in
let transformation = try await iterator.collect(max: Configuration.maximumPageSize) { sequence in
sequence.iteratorMap { iterator in
guard let block = try await iterator.next(DatastorePageEntryBlock.self)
else { throw DiskPersistenceError.invalidPageFormat }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,22 @@ extension DatastoreRootManifest {
case primary
case direct(index: DatastoreIndexIdentifier)
case secondary(index: DatastoreIndexIdentifier)

init(_ id: PersistenceDatastoreIndexID) {
switch id {
case .primary: self = .primary
case .direct(let index, _): self = .direct(index: index)
case .secondary(let index, _): self = .secondary(index: index)
}
}
}

enum IndexManifestID: Codable, Hashable {
case primary(manifest: DatastoreIndexManifestIdentifier)
case direct(index: DatastoreIndexIdentifier, manifest: DatastoreIndexManifestIdentifier)
case secondary(index: DatastoreIndexIdentifier, manifest: DatastoreIndexManifestIdentifier)

init<AccessMode>(_ id: DiskPersistence<AccessMode>.Datastore.Index.ID) {
init(_ id: PersistenceDatastoreIndexID) {
switch id {
case .primary(let manifest):
self = .primary(manifest: manifest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,80 @@ extension DiskPersistence.Datastore {

var hasObservers: Bool { !observers.isEmpty }
}

// MARK: - Snapshotting

extension DiskPersistence.Datastore {
@discardableResult
func copy(
rootIdentifier: DatastoreRootIdentifier?,
datastoreKey: DatastoreKey,
into newSnapshot: Snapshot<ReadWrite>,
iteration: inout SnapshotIteration,
targetPageSize: Int
) async throws -> DiskPersistence<ReadWrite>.Datastore {
let newDatastore = DiskPersistence<ReadWrite>.Datastore(id: id, snapshot: newSnapshot)

/// Copy the datastore over.
iteration.dataStores[datastoreKey] = SnapshotIteration.DatastoreInfo(key: datastoreKey, id: id, root: rootIdentifier)

/// Record the addition of a new datastore since it lives in a new location on disk.
iteration.addedDatastores.insert(id)

/// Stop here if there are no roots for the datastore — there is nothing else to migrate.
guard let datastoreRootIdentifier = rootIdentifier
else { return newDatastore }

/// Record the addition of a new datastore since it lives in a new location on disk.
iteration.addedDatastoreRoots.insert(DatastoreRootReference(
datastoreID: id,
datastoreRootID: datastoreRootIdentifier
))

let rootObject = rootObject(for: datastoreRootIdentifier)
let rootObjectManifest = try await rootObject.manifest
var newRootObjectManifest = DatastoreRootManifest(
id: rootObjectManifest.id,
modificationDate: rootObjectManifest.modificationDate,
descriptor: rootObjectManifest.descriptor,
primaryIndexManifest: rootObjectManifest.primaryIndexManifest,
directIndexManifests: rootObjectManifest.directIndexManifests,
secondaryIndexManifests: rootObjectManifest.secondaryIndexManifests,
addedIndexes: [],
removedIndexes: [],
addedIndexManifests: [],
removedIndexManifests: []
)

try await rootObject.primaryIndex.copy(
into: newDatastore,
rootObjectManifest: &newRootObjectManifest,
targetPageSize: targetPageSize
)

for index in try await rootObject.directIndexes.values {
try await index.copy(
into: newDatastore,
rootObjectManifest: &newRootObjectManifest,
targetPageSize: targetPageSize
)
}

for index in try await rootObject.secondaryIndexes.values {
try await index.copy(
into: newDatastore,
rootObjectManifest: &newRootObjectManifest,
targetPageSize: targetPageSize
)
}

let newRootObject = DiskPersistence<ReadWrite>.Datastore.RootObject(
datastore: newDatastore,
id: datastoreRootIdentifier,
rootObject: newRootObjectManifest
)
try await newRootObject.persistIfNeeded()

return newDatastore
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ struct DatedIdentifierComponents {
"\(hour)-\(minute)"
}

var date: Date? {
DateComponents(
calendar: Calendar(identifier: .gregorian),
timeZone: TimeZone(secondsFromGMT: 0),
year: Int(year),
month: Int(month),
day: Int(day),
hour: Int(hour),
minute: Int(minute),
second: Int(second),
nanosecond: Int(millisecond).map { $0*1_000_000 }
)
.date
}

static let size = 40
}

Expand Down
Loading
Loading