From 9312af047e69ea40382a492464ddfebf990313f9 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Mon, 15 Dec 2025 17:34:16 +0800 Subject: [PATCH 1/2] [giga] add executor interfaces for VM --- giga/executor/types/common.go | 5 +++++ giga/executor/types/interfaces.go | 21 ++++++++++++++++++++ giga/executor/vm/evmc/vm.go | 32 +++++++++++++++++++++++++++++++ giga/executor/vm/geth/vm.go | 28 +++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ 6 files changed, 89 insertions(+) create mode 100644 giga/executor/types/common.go create mode 100644 giga/executor/types/interfaces.go create mode 100644 giga/executor/vm/evmc/vm.go create mode 100644 giga/executor/vm/geth/vm.go diff --git a/giga/executor/types/common.go b/giga/executor/types/common.go new file mode 100644 index 0000000000..5240e68ebe --- /dev/null +++ b/giga/executor/types/common.go @@ -0,0 +1,5 @@ +package types + +type Hash [32]byte + +type Address [20]byte diff --git a/giga/executor/types/interfaces.go b/giga/executor/types/interfaces.go new file mode 100644 index 0000000000..f071fcb640 --- /dev/null +++ b/giga/executor/types/interfaces.go @@ -0,0 +1,21 @@ +package types + +// The interface defines the entrypoints of a transaction execution. +type VM interface { + Create(sender Address, code []byte, gas uint64, value Hash) (ret []byte, contractAddr Address, gasLeft uint64, err error) + Call(sender Address, to Address, input []byte, gas uint64, value Hash) (ret []byte, gasLeft uint64, err error) +} + +// The interface defines access to states. These are needed mainly +// for the preprocessing before the VM entrypoints are called ( +// e.g. nonce checking/setting, fee charging, value transfer, etc.) +type Storage interface { + GetCode(addr Address) ([]byte, error) + GetState(addr Address, key Hash) (Hash, error) + SetState(addr Address, key Hash, value Hash) error + GetBalance(addr Address) (Hash, error) + SetBalance(addr Address, value Hash) error + GetNonce(addr Address) (uint64, error) + SetNonce(addr Address, nonce uint64) error + // TODO: accesslist setting +} diff --git a/giga/executor/vm/evmc/vm.go b/giga/executor/vm/evmc/vm.go new file mode 100644 index 0000000000..3f733965ba --- /dev/null +++ b/giga/executor/vm/evmc/vm.go @@ -0,0 +1,32 @@ +package evmc + +import ( + "math" + + "github.com/ethereum/evmc/v12/bindings/go/evmc" + "github.com/sei-protocol/sei-chain/giga/executor/types" +) + +type VMImpl struct { + hostContext evmc.HostContext +} + +func NewVM(hostContext evmc.HostContext) types.VM { + return &VMImpl{hostContext: hostContext} +} + +func (v *VMImpl) Create(sender types.Address, code []byte, gas uint64, value types.Hash) (ret []byte, contractAddr types.Address, gasLeft uint64, err error) { + if gas > math.MaxInt64 { + panic("gas overflow") + } + ret, left, _, addr, err := v.hostContext.Call(evmc.Create, evmc.Address{}, evmc.Address(sender), evmc.Hash(value), code, int64(gas), 0, false, evmc.Hash{}, evmc.Address{}) + return ret, types.Address(addr), uint64(left), err //nolint:gosec +} + +func (v *VMImpl) Call(sender types.Address, to types.Address, input []byte, gas uint64, value types.Hash) (ret []byte, gasLeft uint64, err error) { + if gas > math.MaxInt64 { + panic("gas overflow") + } + ret, left, _, _, err := v.hostContext.Call(evmc.Call, evmc.Address(to), evmc.Address(sender), evmc.Hash(value), input, int64(gas), 0, false, evmc.Hash{}, evmc.Address(to)) + return ret, uint64(left), err //nolint:gosec +} diff --git a/giga/executor/vm/geth/vm.go b/giga/executor/vm/geth/vm.go new file mode 100644 index 0000000000..ab5e10d946 --- /dev/null +++ b/giga/executor/vm/geth/vm.go @@ -0,0 +1,28 @@ +package geth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/holiman/uint256" + "github.com/sei-protocol/sei-chain/giga/executor/types" +) + +var _ types.VM = &VMImpl{} + +type VMImpl struct { + evm *vm.EVM +} + +func NewVM(evm *vm.EVM) types.VM { + return &VMImpl{evm: evm} +} + +func (v *VMImpl) Create(sender types.Address, code []byte, gas uint64, value types.Hash) (ret []byte, contractAddr types.Address, gasLeft uint64, err error) { + ret, addr, gasLeft, err := v.evm.Create(common.Address(sender), code, gas, new(uint256.Int).SetBytes(value[:])) + return ret, types.Address(addr), gasLeft, err +} + +func (v *VMImpl) Call(sender types.Address, to types.Address, input []byte, gas uint64, value types.Hash) (ret []byte, gasLeft uint64, err error) { + ret, gasLeft, err = v.evm.Call(common.Address(sender), common.Address(to), input, gas, new(uint256.Int).SetBytes(value[:])) + return ret, gasLeft, err +} diff --git a/go.mod b/go.mod index aaa82bdc19..5995dcdc6b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/cosmos/go-bip39 v1.0.0 github.com/cosmos/iavl v0.21.0-alpha.1.0.20230904092046-df3db2d96583 github.com/cosmos/ibc-go/v3 v3.0.0 + github.com/ethereum/evmc/v12 v12.1.0 github.com/ethereum/go-ethereum v1.16.1 github.com/go-playground/validator/v10 v10.11.1 github.com/gogo/protobuf v1.3.3 diff --git a/go.sum b/go.sum index ca617c702a..114269617d 100644 --- a/go.sum +++ b/go.sum @@ -971,6 +971,8 @@ github.com/esimonov/ifshort v1.0.4 h1:6SID4yGWfRae/M7hkVDVVyppy8q/v9OuxNdmjLQStB github.com/esimonov/ifshort v1.0.4/go.mod h1:Pe8zjlRrJ80+q2CxHLfEOfTwxCZ4O+MuhcHcfgNWTk0= github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA= github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= +github.com/ethereum/evmc/v12 v12.1.0 h1:fUIzJNnXa9VPYx253lDS7L9iBZtP+tlpTdZst5e6Pks= +github.com/ethereum/evmc/v12 v12.1.0/go.mod h1:80jmft01io35nSmrX70bKFR/lncwFuqE90iLLSMyMAE= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= github.com/ettle/strcase v0.1.1 h1:htFueZyVeE1XNnMEfbqp5r67qAN/4r6ya1ysq8Q+Zcw= From 1c987ddd406381830d678eeb8dec870e92b04777 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Tue, 16 Dec 2025 12:20:40 +0800 Subject: [PATCH 2/2] Abstract scheduler into interface --- app/abci.go | 10 +- sei-cosmos/baseapp/abci.go | 43 +++- sei-cosmos/tasks/scheduler.go | 400 +++++++++++++++++------------ sei-cosmos/tasks/scheduler_test.go | 129 ++++++---- 4 files changed, 369 insertions(+), 213 deletions(-) diff --git a/app/abci.go b/app/abci.go index ce41f979b2..39eb0cf298 100644 --- a/app/abci.go +++ b/app/abci.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "time" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/tasks" "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" @@ -179,14 +180,17 @@ func (app *App) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) ( } // avoid overhead for empty batches - scheduler := tasks.NewScheduler(app.ConcurrencyWorkers(), app.TracingInfo, app.DeliverTx) - txRes, err := scheduler.ProcessAll(ctx, req.TxEntries) + scheduler := tasks.NewScheduler(app.ConcurrencyWorkers(), app.TracingInfo, app.Executor) + txRes, stats, err := scheduler.ProcessAll(ctx.Context(), baseapp.TxEntriesToTasks(ctx, req.TxEntries), baseapp.GetMultiVersionStore(ctx)) if err != nil { ctx.Logger().Error("error while processing scheduler", "err", err) panic(err) } + logStats := []interface{}{"height", ctx.BlockHeight()} + logStats = append(logStats, stats...) + ctx.Logger().Info("occ scheduler", logStats...) for _, tx := range txRes { - responses = append(responses, &sdk.DeliverTxResult{Response: tx}) + responses = append(responses, &sdk.DeliverTxResult{Response: *tx}) } return sdk.DeliverTxBatchResponse{Results: responses} diff --git a/sei-cosmos/baseapp/abci.go b/sei-cosmos/baseapp/abci.go index df6dc9e93f..19b62dc409 100644 --- a/sei-cosmos/baseapp/abci.go +++ b/sei-cosmos/baseapp/abci.go @@ -15,6 +15,7 @@ import ( "github.com/armon/go-metrics" "github.com/cosmos/cosmos-sdk/codec" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/multiversion" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/tasks" "github.com/cosmos/cosmos-sdk/telemetry" @@ -176,14 +177,17 @@ func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchReques } // avoid overhead for empty batches - scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx) - txRes, err := scheduler.ProcessAll(ctx, req.TxEntries) + scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.Executor) + txRes, stats, err := scheduler.ProcessAll(ctx.Context(), TxEntriesToTasks(ctx, req.TxEntries), GetMultiVersionStore(ctx)) if err != nil { ctx.Logger().Error("error while processing scheduler", "err", err) panic(err) } + logStats := []interface{}{"height", ctx.BlockHeight()} + logStats = append(logStats, stats...) + ctx.Logger().Info("occ scheduler", logStats...) for _, tx := range txRes { - responses = append(responses, &sdk.DeliverTxResult{Response: tx}) + responses = append(responses, &sdk.DeliverTxResult{Response: *tx}) } return sdk.DeliverTxBatchResponse{Results: responses} @@ -1159,3 +1163,36 @@ func (app *BaseApp) GetTxPriorityHint(_ context.Context, req *abci.RequestGetTxP Priority: priority, }, nil } + +func (app *BaseApp) Executor(task tasks.SchedulerTask[*abci.ResponseDeliverTx]) *abci.ResponseDeliverTx { + typedTask := task.(*tasks.DeliverTxTask) + resp := app.DeliverTx(typedTask.Ctx, typedTask.Request, typedTask.SdkTx, typedTask.Checksum) + return &resp +} + +func TxEntriesToTasks(ctx sdk.Context, entries []*sdk.DeliverTxEntry) []tasks.SchedulerTask[*abci.ResponseDeliverTx] { + allTasks := make([]tasks.SchedulerTask[*abci.ResponseDeliverTx], 0, len(entries)) + for _, r := range entries { + task := &tasks.DeliverTxTask{ + Ctx: ctx, + Request: r.Request, + SdkTx: r.SdkTx, + Checksum: r.Checksum, + } + task.SetStatus("pending") + task.SetAbsoluteIndex(r.AbsoluteIndex) + task.SetDependencies(map[int]struct{}{}) + task.SetTxTracer(r.TxTracer) + allTasks = append(allTasks, task) + } + return allTasks +} + +func GetMultiVersionStore(ctx sdk.Context) map[string]multiversion.MultiVersionStore { + mvs := make(map[string]multiversion.MultiVersionStore) + keys := ctx.MultiStore().StoreKeys() + for _, sk := range keys { + mvs[sk.String()] = multiversion.NewMultiVersionStore(ctx.MultiStore().GetKVStore(sk)) + } + return mvs +} diff --git a/sei-cosmos/tasks/scheduler.go b/sei-cosmos/tasks/scheduler.go index 0cf92afc25..604b281fb0 100644 --- a/sei-cosmos/tasks/scheduler.go +++ b/sei-cosmos/tasks/scheduler.go @@ -40,73 +40,205 @@ const ( maximumIterations = 10 ) -type deliverTxTask struct { +var _ SchedulerTask[*types.ResponseDeliverTx] = (*DeliverTxTask)(nil) + +type DeliverTxTask struct { Ctx sdk.Context - AbortCh chan occ.Abort + abortCh chan occ.Abort mx sync.RWMutex - Status status - Dependencies map[int]struct{} + status status + dependencies map[int]struct{} Abort *occ.Abort - Incarnation int + incarnation int Request types.RequestDeliverTxV2 SdkTx sdk.Tx Checksum [32]byte - AbsoluteIndex int - Response *types.ResponseDeliverTx - VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore - TxTracer sdk.TxTracer + absoluteIndex int + response *types.ResponseDeliverTx + versionStores map[string]*multiversion.VersionIndexedStore + txTracer sdk.TxTracer } // AppendDependencies appends the given indexes to the task's dependencies -func (dt *deliverTxTask) AppendDependencies(deps []int) { +func (dt *DeliverTxTask) AppendDependencies(deps []int) { dt.mx.Lock() defer dt.mx.Unlock() for _, taskIdx := range deps { - dt.Dependencies[taskIdx] = struct{}{} + dt.dependencies[taskIdx] = struct{}{} } } -func (dt *deliverTxTask) IsStatus(s status) bool { +func (dt *DeliverTxTask) IsStatus(s status) bool { dt.mx.RLock() defer dt.mx.RUnlock() - return dt.Status == s + return dt.status == s } -func (dt *deliverTxTask) SetStatus(s status) { +func (dt *DeliverTxTask) SetStatus(s status) { dt.mx.Lock() defer dt.mx.Unlock() - dt.Status = s + dt.status = s +} + +func (dt *DeliverTxTask) Status() status { + return dt.status } -func (dt *deliverTxTask) Reset() { +func (dt *DeliverTxTask) Reset() { dt.SetStatus(statusPending) - dt.Response = nil + dt.response = nil dt.Abort = nil - dt.AbortCh = nil - dt.VersionStores = nil + dt.abortCh = nil + dt.versionStores = nil - if dt.TxTracer != nil { - dt.TxTracer.Reset() + if dt.txTracer != nil { + dt.txTracer.Reset() } } -func (dt *deliverTxTask) Increment() { - dt.Incarnation++ +func (dt *DeliverTxTask) Increment() { + dt.incarnation++ } -// Scheduler processes tasks concurrently -type Scheduler interface { - ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) +func (dt *DeliverTxTask) Prepare(mvs map[string]multiversion.MultiVersionStore) { + ctx := dt.Ctx.WithTxIndex(dt.absoluteIndex) + + // initialize the context + abortCh := make(chan occ.Abort, len(mvs)) + + // if there are no stores, don't try to wrap, because there's nothing to wrap + if len(mvs) > 0 { + // non-blocking + cms := ctx.MultiStore().CacheMultiStore() + + // init version stores by store key + vs := make(map[string]*multiversion.VersionIndexedStore) + for storeKey, mvs := range mvs { + vs[storeKey] = mvs.VersionedIndexedStore(dt.absoluteIndex, dt.incarnation, abortCh) + } + + // save off version store so we can ask it things later + dt.versionStores = vs + ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap { + return vs[k.String()] + }) + + ctx = ctx.WithMultiStore(ms) + } + + if dt.txTracer != nil { + ctx = dt.txTracer.InjectInContext(ctx) + } + + dt.abortCh = abortCh + dt.Ctx = ctx +} + +func (dt *DeliverTxTask) AbortCh() chan occ.Abort { + return dt.abortCh +} + +func (dt *DeliverTxTask) SetAbort(abort *occ.Abort) { + dt.Abort = abort +} + +func (dt *DeliverTxTask) VersionStores() map[string]*multiversion.VersionIndexedStore { + return dt.versionStores +} + +func (dt *DeliverTxTask) SetResponse(response *types.ResponseDeliverTx) { + dt.response = response +} + +func (dt *DeliverTxTask) TxTracer() sdk.TxTracer { + return dt.txTracer +} + +func (dt *DeliverTxTask) SetTxTracer(txTracer sdk.TxTracer) { + dt.txTracer = txTracer +} + +func (dt *DeliverTxTask) Tx() []byte { + return dt.Request.Tx +} + +func (dt *DeliverTxTask) AbsoluteIndex() int { + return dt.absoluteIndex +} + +func (dt *DeliverTxTask) SetAbsoluteIndex(absoluteIndex int) { + dt.absoluteIndex = absoluteIndex +} + +func (dt *DeliverTxTask) Incarnation() int { + return dt.incarnation +} + +func (dt *DeliverTxTask) Response() *types.ResponseDeliverTx { + return dt.response +} + +func (dt *DeliverTxTask) Dependencies() map[int]struct{} { + return dt.dependencies } -type scheduler struct { - deliverTx func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) +func (dt *DeliverTxTask) SetDependencies(dependencies map[int]struct{}) { + dt.dependencies = dependencies +} + +func (dt *DeliverTxTask) SetTracerCtx(ctx context.Context) { + dt.Ctx = dt.Ctx.WithContext(ctx) +} + +func (dt *DeliverTxTask) TracerCtx() context.Context { + return dt.Ctx.Context() +} + +type Stats []interface{} + +// Scheduler processes tasks concurrently +type Scheduler[Response any] interface { + ProcessAll( + ctx context.Context, + tasks []SchedulerTask[Response], + mvs map[string]multiversion.MultiVersionStore, + ) ([]Response, Stats, error) +} + +type SchedulerTask[Response any] interface { + Tx() []byte + AbsoluteIndex() int + Incarnation() int + Response() Response + IsStatus(status) bool + Status() status + TracerCtx() context.Context + SetTracerCtx(context.Context) + Reset() + Increment() + Prepare(map[string]multiversion.MultiVersionStore) + AbortCh() chan occ.Abort + SetAbort(*occ.Abort) + AppendDependencies(deps []int) + Dependencies() map[int]struct{} + SetStatus(s status) + VersionStores() map[string]*multiversion.VersionIndexedStore + SetResponse(Response) + + TxTracer() sdk.TxTracer // to be deprecated +} + +var _ Scheduler[any] = (*scheduler[any])(nil) + +type scheduler[Response any] struct { + // deliverTx func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) + executor func(SchedulerTask[Response]) Response workers int - multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore + multiVersionStores map[string]multiversion.MultiVersionStore tracingInfo *tracing.Info - allTasksMap map[int]*deliverTxTask - allTasks []*deliverTxTask + allTasksMap map[int]SchedulerTask[Response] + allTasks []SchedulerTask[Response] executeCh chan func() validateCh chan func() metrics *schedulerMetrics @@ -115,20 +247,20 @@ type scheduler struct { } // NewScheduler creates a new scheduler -func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx)) Scheduler { - return &scheduler{ +func NewScheduler[Response any](workers int, tracingInfo *tracing.Info, executor func(SchedulerTask[Response]) Response) Scheduler[Response] { + return &scheduler[Response]{ workers: workers, - deliverTx: deliverTxFunc, + executor: executor, tracingInfo: tracingInfo, metrics: &schedulerMetrics{}, } } -func (s *scheduler) invalidateTask(task *deliverTxTask) { +func (s *scheduler[Response]) invalidateTask(task SchedulerTask[Response]) { for _, mv := range s.multiVersionStores { - mv.InvalidateWriteset(task.AbsoluteIndex, task.Incarnation) - mv.ClearReadset(task.AbsoluteIndex) - mv.ClearIterateset(task.AbsoluteIndex) + mv.InvalidateWriteset(task.AbsoluteIndex(), task.Incarnation()) + mv.ClearReadset(task.AbsoluteIndex()) + mv.ClearIterateset(task.AbsoluteIndex()) } } @@ -147,7 +279,7 @@ func start(ctx context.Context, ch chan func(), workers int) { } } -func (s *scheduler) DoValidate(work func()) { +func (s *scheduler[Response]) DoValidate(work func()) { if s.synchronous { work() return @@ -155,7 +287,7 @@ func (s *scheduler) DoValidate(work func()) { s.validateCh <- work } -func (s *scheduler) DoExecute(work func()) { +func (s *scheduler[Response]) DoExecute(work func()) { if s.synchronous { work() return @@ -163,12 +295,12 @@ func (s *scheduler) DoExecute(work func()) { s.executeCh <- work } -func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { +func (s *scheduler[Response]) findConflicts(task SchedulerTask[Response]) (bool, []int) { var conflicts []int uniq := make(map[int]struct{}) valid := true for _, mv := range s.multiVersionStores { - ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex) + ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex()) for _, c := range mvConflicts { if _, ok := uniq[c]; !ok { conflicts = append(conflicts, c) @@ -182,51 +314,31 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { return valid, conflicts } -func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { - tasksMap := make(map[int]*deliverTxTask) - allTasks := make([]*deliverTxTask, 0, len(reqs)) - for _, r := range reqs { - task := &deliverTxTask{ - Request: r.Request, - SdkTx: r.SdkTx, - Checksum: r.Checksum, - AbsoluteIndex: r.AbsoluteIndex, - Status: statusPending, - Dependencies: map[int]struct{}{}, - TxTracer: r.TxTracer, - } - - tasksMap[r.AbsoluteIndex] = task - allTasks = append(allTasks, task) - } - return allTasks, tasksMap -} - -func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { - res := make([]types.ResponseDeliverTx, 0, len(tasks)) +func (s *scheduler[Response]) collectResponses(tasks []SchedulerTask[Response]) []Response { + res := make([]Response, 0, len(tasks)) for _, t := range tasks { - res = append(res, *t.Response) + res = append(res, t.Response()) if t.TxTracer != nil { - t.TxTracer.Commit() + t.TxTracer().Commit() } } return res } -func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { +func (s *scheduler[Response]) tryInitMultiVersionStore(ctx sdk.Context) { if s.multiVersionStores != nil { return } - mvs := make(map[sdk.StoreKey]multiversion.MultiVersionStore) + mvs := make(map[string]multiversion.MultiVersionStore) keys := ctx.MultiStore().StoreKeys() for _, sk := range keys { - mvs[sk] = multiversion.NewMultiVersionStore(ctx.MultiStore().GetKVStore(sk)) + mvs[sk.String()] = multiversion.NewMultiVersionStore(ctx.MultiStore().GetKVStore(sk)) } s.multiVersionStores = mvs } -func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{}) bool { +func dependenciesValidated[Response any](tasksMap map[int]SchedulerTask[Response], deps map[int]struct{}) bool { for i := range deps { // because idx contains absoluteIndices, we need to fetch from map task := tasksMap[i] @@ -237,17 +349,7 @@ func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{ return true } -func filterTasks(tasks []*deliverTxTask, filter func(*deliverTxTask) bool) []*deliverTxTask { - var res []*deliverTxTask - for _, t := range tasks { - if filter(t) { - res = append(res, t) - } - } - return res -} - -func allValidated(tasks []*deliverTxTask) bool { +func allValidated[Response any](tasks []SchedulerTask[Response]) bool { for _, t := range tasks { if !t.IsStatus(statusValidated) { return false @@ -264,19 +366,25 @@ type schedulerMetrics struct { retries int } -func (s *scheduler) emitMetrics() { +func (s *scheduler[Response]) emitMetrics() { telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries") telemetry.IncrCounter(float32(s.metrics.maxIncarnation), "scheduler", "incarnations") } -func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) { +func (s *scheduler[Response]) ProcessAll( + ctx context.Context, + tasks []SchedulerTask[Response], + mvs map[string]multiversion.MultiVersionStore, +) ([]Response, Stats, error) { startTime := time.Now() var iterations int // initialize mutli-version stores if they haven't been initialized yet - s.tryInitMultiVersionStore(ctx) - tasks, tasksMap := toTasks(reqs) + s.multiVersionStores = mvs s.allTasks = tasks - s.allTasksMap = tasksMap + s.allTasksMap = make(map[int]SchedulerTask[Response], len(tasks)) + for _, t := range tasks { + s.allTasksMap[t.AbsoluteIndex()] = t + } s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -287,7 +395,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t workers = len(tasks) } - workerCtx, cancel := context.WithCancel(ctx.Context()) + workerCtx, cancel := context.WithCancel(ctx) defer cancel() // execution tasks are limited by workers @@ -311,7 +419,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t // execute sets statuses of tasks to either executed or aborted if err := s.executeAll(ctx, toExecute); err != nil { - return nil, err + return nil, nil, err } // validate returns any that should be re-executed @@ -319,7 +427,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t var err error toExecute, err = s.validateAll(ctx, tasks) if err != nil { - return nil, err + return nil, nil, err } // these are retries which apply to metrics s.metrics.retries += len(toExecute) @@ -331,13 +439,18 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation - ctx.Logger().Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) - - return s.collectResponses(tasks), nil + return s.collectResponses(tasks), Stats{ + "txs", len(tasks), + "latency_ms", time.Since(startTime).Milliseconds(), + "retries", s.metrics.retries, + "maxIncarnation", s.maxIncarnation, + "iterations", iterations, + "sync", s.synchronous, + "workers", s.workers}, nil } -func (s *scheduler) shouldRerun(task *deliverTxTask) bool { - switch task.Status { +func (s *scheduler[Response]) shouldRerun(task SchedulerTask[Response]) bool { + switch task.Status() { case statusAborted, statusPending: return true @@ -352,7 +465,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { task.AppendDependencies(conflicts) // if the conflicts are now validated, then rerun this task - if dependenciesValidated(s.allTasksMap, task.Dependencies) { + if dependenciesValidated(s.allTasksMap, task.Dependencies()) { return true } else { // otherwise, wait for completion @@ -369,12 +482,12 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { case statusWaiting: // if conflicts are done, then this task is ready to run again - return dependenciesValidated(s.allTasksMap, task.Dependencies) + return dependenciesValidated(s.allTasksMap, task.Dependencies()) } - panic("unexpected status: " + task.Status) + panic("unexpected status: " + task.Status()) } -func (s *scheduler) validateTask(ctx sdk.Context, task *deliverTxTask) bool { +func (s *scheduler[Response]) validateTask(ctx context.Context, task SchedulerTask[Response]) bool { _, span := s.traceSpan(ctx, "SchedulerValidate", task) defer span.End() @@ -384,21 +497,21 @@ func (s *scheduler) validateTask(ctx sdk.Context, task *deliverTxTask) bool { return true } -func (s *scheduler) findFirstNonValidated() (int, bool) { +func (s *scheduler[Response]) findFirstNonValidated() (int, bool) { for i, t := range s.allTasks { - if t.Status != statusValidated { + if t.Status() != statusValidated { return i, true } } return 0, false } -func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) { +func (s *scheduler[Response]) validateAll(ctx context.Context, tasks []SchedulerTask[Response]) ([]SchedulerTask[Response], error) { ctx, span := s.traceSpan(ctx, "SchedulerValidateAll", nil) defer span.End() var mx sync.Mutex - var res []*deliverTxTask + var res []SchedulerTask[Response] startIdx, anyLeft := s.findFirstNonValidated() @@ -418,8 +531,8 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del t.Reset() t.Increment() // update max incarnation for scheduler - if t.Incarnation > s.maxIncarnation { - s.maxIncarnation = t.Incarnation + if t.Incarnation() > s.maxIncarnation { + s.maxIncarnation = t.Incarnation() } res = append(res, t) } @@ -431,7 +544,7 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del } // ExecuteAll executes all tasks concurrently -func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { +func (s *scheduler[Response]) executeAll(ctx context.Context, tasks []SchedulerTask[Response]) error { if len(tasks) == 0 { return nil } @@ -456,68 +569,29 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { return nil } -func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task *deliverTxTask) { +func (s *scheduler[Response]) prepareAndRunTask(wg *sync.WaitGroup, ctx context.Context, task SchedulerTask[Response]) { eCtx, eSpan := s.traceSpan(ctx, "SchedulerExecute", task) defer eSpan.End() - task.Ctx = eCtx + task.SetTracerCtx(eCtx) s.executeTask(task) wg.Done() } -func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) { - spanCtx, span := s.tracingInfo.StartWithContext(name, ctx.TraceSpanContext()) +func (s *scheduler[Response]) traceSpan(ctx context.Context, name string, task SchedulerTask[Response]) (context.Context, trace.Span) { + spanCtx, span := s.tracingInfo.StartWithContext(name, ctx) if task != nil { - span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx)))) - span.SetAttributes(attribute.Int("absoluteIndex", task.AbsoluteIndex)) - span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation)) + span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Tx())))) + span.SetAttributes(attribute.Int("absoluteIndex", task.AbsoluteIndex())) + span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation())) } - ctx = ctx.WithTraceSpanContext(spanCtx) - return ctx, span -} - -// prepareTask initializes the context and version stores for a task -func (s *scheduler) prepareTask(task *deliverTxTask) { - ctx := task.Ctx.WithTxIndex(task.AbsoluteIndex) - - _, span := s.traceSpan(ctx, "SchedulerPrepare", task) - defer span.End() - - // initialize the context - abortCh := make(chan occ.Abort, len(s.multiVersionStores)) - - // if there are no stores, don't try to wrap, because there's nothing to wrap - if len(s.multiVersionStores) > 0 { - // non-blocking - cms := ctx.MultiStore().CacheMultiStore() - - // init version stores by store key - vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore) - for storeKey, mvs := range s.multiVersionStores { - vs[storeKey] = mvs.VersionedIndexedStore(task.AbsoluteIndex, task.Incarnation, abortCh) - } - - // save off version store so we can ask it things later - task.VersionStores = vs - ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap { - return vs[k] - }) - - ctx = ctx.WithMultiStore(ms) - } - - if task.TxTracer != nil { - ctx = task.TxTracer.InjectInContext(ctx) - } - - task.AbortCh = abortCh - task.Ctx = ctx + return spanCtx, span } -func (s *scheduler) executeTask(task *deliverTxTask) { - dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerExecuteTask", task) +func (s *scheduler[Response]) executeTask(task SchedulerTask[Response]) { + dCtx, dSpan := s.traceSpan(task.TracerCtx(), "SchedulerExecuteTask", task) defer dSpan.End() - task.Ctx = dCtx + task.SetTracerCtx(dCtx) // in the synchronous case, we only want to re-execute tasks that need re-executing if s.synchronous { @@ -535,29 +609,29 @@ func (s *scheduler) executeTask(task *deliverTxTask) { } } - s.prepareTask(task) + task.Prepare(s.multiVersionStores) - resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum) + resp := s.executor(task) // close the abort channel - close(task.AbortCh) - abort, ok := <-task.AbortCh + close(task.AbortCh()) + abort, ok := <-task.AbortCh() if ok { // if there is an abort item that means we need to wait on the dependent tx task.SetStatus(statusAborted) - task.Abort = &abort + task.SetAbort(&abort) task.AppendDependencies([]int{abort.DependentTxIdx}) // write from version store to multiversion stores - for _, v := range task.VersionStores { + for _, v := range task.VersionStores() { v.WriteEstimatesToMultiVersionStore() } return } task.SetStatus(statusExecuted) - task.Response = &resp + task.SetResponse(resp) // write from version store to multiversion stores - for _, v := range task.VersionStores { + for _, v := range task.VersionStores() { v.WriteToMultiVersionStore() } } diff --git a/sei-cosmos/tasks/scheduler_test.go b/sei-cosmos/tasks/scheduler_test.go index 2819c84299..69b33390e4 100644 --- a/sei-cosmos/tasks/scheduler_test.go +++ b/sei-cosmos/tasks/scheduler_test.go @@ -21,12 +21,13 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/occ" "github.com/cosmos/cosmos-sdk/utils/tracing" ) -type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) +type mockDeliverTxFunc func(task SchedulerTask[*types.ResponseDeliverTx]) *types.ResponseDeliverTx var testStoreKey = sdk.NewKVStoreKey("mock") var itemKey = []byte("key") @@ -89,7 +90,7 @@ func TestProcessAll(t *testing.T) { deliverTxFunc mockDeliverTxFunc addStores bool expectedErr error - assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) + assertions func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) }{ { name: "Test zero txs does not hang", @@ -97,10 +98,10 @@ func TestProcessAll(t *testing.T) { runs: 10, addStores: true, requests: requestList(0), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + deliverTxFunc: func(task SchedulerTask[*types.ResponseDeliverTx]) *types.ResponseDeliverTx { panic("should not deliver") }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { require.Len(t, res, 0) }, expectedErr: nil, @@ -118,31 +119,33 @@ func TestProcessAll(t *testing.T) { kv.Set([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))) } }, - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx kv := ctx.MultiStore().GetKVStore(testStoreKey) if ctx.TxIndex()%2 == 0 { // For even-indexed transactions, write to the store - kv.Set(req.Tx, req.Tx) - return types.ResponseDeliverTx{ + kv.Set(task.Tx(), task.Tx()) + return &types.ResponseDeliverTx{ Info: "write", } } else { // For odd-indexed transactions, iterate over the store // just write so we have more writes going on - kv.Set(req.Tx, req.Tx) + kv.Set(task.Tx(), task.Tx()) iterator := kv.Iterator(nil, nil) defer iterator.Close() for ; iterator.Valid(); iterator.Next() { // Do nothing, just iterate } - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: "iterate", } } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { for idx, response := range res { if idx%2 == 0 { require.Equal(t, "write", response.Info) @@ -159,21 +162,21 @@ func TestProcessAll(t *testing.T) { runs: 10, addStores: true, requests: requestList(1000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) - - // write to the store with this tx's index - kv.Set(req.Tx, req.Tx) - val := string(kv.Get(req.Tx)) + kv.Set(task.Tx(), task.Tx()) + val := string(kv.Get(task.Tx())) // return what was read from the store (final attempt should be index-1) - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: val, } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { for idx, response := range res { require.Equal(t, fmt.Sprintf("%d", idx), response.Info) } @@ -191,21 +194,23 @@ func TestProcessAll(t *testing.T) { runs: 5, addStores: true, requests: requestList(1000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) val := string(kv.Get(itemKey)) // write to the store with this tx's index - kv.Set(itemKey, req.Tx) + kv.Set(itemKey, task.Tx()) // return what was read from the store (final attempt should be index-1) - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: val, } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { for idx, response := range res { if idx == 0 { require.Equal(t, "", response.Info) @@ -227,10 +232,12 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: requestList(2000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + task := t.(*DeliverTxTask) + ctx := task.Ctx + defer abortRecoveryFunc(res) if ctx.TxIndex()%10 != 0 { - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: "none", } } @@ -239,14 +246,14 @@ func TestProcessAll(t *testing.T) { val := string(kv.Get(itemKey)) // write to the store with this tx's index - kv.Set(itemKey, req.Tx) + kv.Set(itemKey, task.Tx()) // return what was read from the store (final attempt should be index-1) - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: val, } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {}, + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) {}, expectedErr: nil, }, { @@ -255,13 +262,15 @@ func TestProcessAll(t *testing.T) { runs: 10, addStores: false, requests: requestList(10), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) - return types.ResponseDeliverTx{ + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx + return &types.ResponseDeliverTx{ Info: fmt.Sprintf("%d", ctx.TxIndex()), } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { for idx, response := range res { require.Equal(t, fmt.Sprintf("%d", idx), response.Info) } @@ -274,8 +283,10 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: requestList(1000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx wait := rand.Intn(10) time.Sleep(time.Duration(wait) * time.Millisecond) // all txs read and write to the same key to maximize conflicts @@ -287,11 +298,11 @@ func TestProcessAll(t *testing.T) { kv.Set(itemKey, []byte(newVal)) // return what was read from the store (final attempt should be index-1) - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: newVal, } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { expected := "" for idx, response := range res { expected = expected + fmt.Sprintf("%d", idx) @@ -309,8 +320,10 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: addTxTracerToTxEntries(requestList(250)), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&res) + deliverTxFunc: func(t SchedulerTask[*types.ResponseDeliverTx]) (res *types.ResponseDeliverTx) { + defer abortRecoveryFunc(res) + task := t.(*DeliverTxTask) + ctx := task.Ctx wait := rand.Intn(10) time.Sleep(time.Duration(wait) * time.Millisecond) // all txs read and write to the same key to maximize conflicts @@ -326,11 +339,11 @@ func TestProcessAll(t *testing.T) { } // return what was read from the store (final attempt should be index-1) - return types.ResponseDeliverTx{ + return &types.ResponseDeliverTx{ Info: newVal, } }, - assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + assertions: func(t *testing.T, ctx sdk.Context, res []*types.ResponseDeliverTx) { expected := "" for idx, response := range res { expected = expected + fmt.Sprintf("%d", idx) @@ -360,8 +373,9 @@ func TestProcessAll(t *testing.T) { tt.before(ctx) } - res, err := s.ProcessAll(ctx, tt.requests) - require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIterations) + tasks := TxEntriesToTasks(ctx, tt.requests) + res, stats, err := s.ProcessAll(ctx.Context(), tasks, GetMultiVersionStore(ctx)) + require.LessOrEqual(t, stats[7], maximumIterations) require.Len(t, res, len(tt.requests)) if !errors.Is(err, tt.expectedErr) { @@ -382,6 +396,33 @@ func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEnt return txEntries } +func TxEntriesToTasks(ctx sdk.Context, entries []*sdk.DeliverTxEntry) []SchedulerTask[*types.ResponseDeliverTx] { + allTasks := make([]SchedulerTask[*types.ResponseDeliverTx], 0, len(entries)) + for _, r := range entries { + task := &DeliverTxTask{ + Ctx: ctx, + Request: r.Request, + SdkTx: r.SdkTx, + Checksum: r.Checksum, + } + task.SetStatus("pending") + task.SetAbsoluteIndex(r.AbsoluteIndex) + task.SetDependencies(map[int]struct{}{}) + task.SetTxTracer(r.TxTracer) + allTasks = append(allTasks, task) + } + return allTasks +} + +func GetMultiVersionStore(ctx sdk.Context) map[string]multiversion.MultiVersionStore { + mvs := make(map[string]multiversion.MultiVersionStore) + keys := ctx.MultiStore().StoreKeys() + for _, sk := range keys { + mvs[sk.String()] = multiversion.NewMultiVersionStore(ctx.MultiStore().GetKVStore(sk)) + } + return mvs +} + var _ sdk.TxTracer = &testTxTracer{} func newTestTxTracer(txIndex int) *testTxTracer {