Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module Ouroboros.Consensus.Util.ResourceRegistry (
, withRegistry
, registryThread
-- * Allocating and releasing regular resources
, ResourceKey
, allocate
, allocateEither
, release
Expand Down Expand Up @@ -342,6 +343,7 @@ data RegistryStatus =
--
-- Resource keys are tied to a particular registry.
data ResourceKey m = ResourceKey !(ResourceRegistry m) !ResourceId
deriving (Generic, NoUnexpectedThunks)

-- | Resource ID
--
Expand Down Expand Up @@ -841,7 +843,7 @@ ensureKnownThread :: forall m. IOLike m
ensureKnownThread rr context = do
isKnown <- checkIsKnown
unless isKnown $
throwM $ ResourceRegistryUsedFromUnknownThread {
throwM $ ResourceRegistryUsedFromUntrackedThread {
resourceRegistryCreatedIn = registryContext rr
, resourceRegistryUsedIn = context
}
Expand All @@ -854,13 +856,14 @@ ensureKnownThread rr context = do
KnownThreads ts <- registryThreads <$> readTVar (registryState rr)
return $ contextThreadId context `Set.member` ts

-- | Registry used from unknown threads
-- | Registry used from untracked threads
--
-- If this exception is raised, it indicates a bug in the caller.
data ResourceRegistryThreadException =
-- | If the registry is used from an unknown thread, we cannot do proper
-- reference counting
forall m. IOLike m => ResourceRegistryUsedFromUnknownThread {
-- | If the registry is used from an untracked thread, we cannot do proper
-- reference counting. The following threads are /tracked/: the thread
-- that spawned the registry and all threads spawned by the registry.
forall m. IOLike m => ResourceRegistryUsedFromUntrackedThread {
-- | Information about the context in which the registry was created
resourceRegistryCreatedIn :: !(Context m)

Expand Down
45 changes: 14 additions & 31 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/ImmDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ import Ouroboros.Network.Point (WithOrigin (..))
import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
allocateEither, unsafeRelease)
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)

import Ouroboros.Storage.ChainDB.API (ChainDB)
import Ouroboros.Storage.ChainDB.API hiding (ChainDB (..),
Expand Down Expand Up @@ -417,14 +416,10 @@ appendBlock db@ImmDB{..} b = withDB db $ \imm -> case isEBB (getHeader b) of
Streaming
-------------------------------------------------------------------------------}

-- | Wrapper around 'ImmDB.stream' that 'allocate's the iterator in the
-- 'ResourceRegistry' so that 'ImmDB.iteratorClose' is registered as the
-- clean-up action. Translates the requested 'BlockComponent' into the
-- 'ImmDB.BlockComponent' the ImmutableDB understands.
--
-- When the returned iterator is closed, it will be 'release'd from the
-- 'ResourceRegistry'.
registeredStream
-- | Wrapper around 'ImmDB.stream' that translates the requested
-- 'BlockComponent' into the 'ImmDB.BlockComponent' the ImmutableDB
-- understands.
openIterator
:: forall m blk b. (IOLike m, HasHeader blk)
=> ImmDB m blk
-> ResourceRegistry m
Expand All @@ -433,20 +428,8 @@ registeredStream
-> Maybe (SlotNo, HeaderHash blk)
-> m (Either (ImmDB.WrongBoundError (HeaderHash blk))
(ImmDB.Iterator (HeaderHash blk) m b))
registeredStream db registry blockComponent start end = do
errOrKeyAndIt <- allocateEither registry
(\_key -> withDB db $ \imm ->
ImmDB.stream imm blockComponent' start end)
(iteratorClose db)
return $ case errOrKeyAndIt of
Left e -> Left e
-- The iterator will be used by a thread that is unknown to the registry
-- (which, after all, is entirely internal to the chain DB). This means
-- that the registry cannot guarantee that the iterator will be live for
-- the duration of that thread, and indeed, it may not be: the chain DB
-- might be closed before that thread terminates. We will deal with this
-- in the chain DB itself (throw ClosedDBError exception).
Right (key, it) -> Right it { ImmDB.iteratorClose = unsafeRelease key }
openIterator db registry blockComponent start end =
withDB db $ \imm -> ImmDB.stream imm registry blockComponent' start end
where
blockComponent' = translateToRawDB (parse db) (addHdrEnv db) blockComponent

Expand Down Expand Up @@ -489,29 +472,29 @@ stream db registry blockComponent from to = runExceptT $ do
case from of
StreamFromExclusive pt@BlockPoint { atSlot = slot, withHash = hash } -> do
when (pointSlot pt > slotNoAtTip) $ throwError $ MissingBlock pt
it <- openRegisteredStream (Just (slot, hash)) end
it <- openStream (Just (slot, hash)) end
-- Skip the first block, as the bound is exclusive
void $ lift $ iteratorNext db it
return it
StreamFromExclusive GenesisPoint ->
openRegisteredStream Nothing end
openStream Nothing end
StreamFromInclusive pt@BlockPoint { atSlot = slot, withHash = hash } -> do
when (pointSlot pt > slotNoAtTip) $ throwError $ MissingBlock pt
openRegisteredStream (Just (slot, hash)) end
openStream (Just (slot, hash)) end
StreamFromInclusive GenesisPoint ->
throwM NoGenesisBlock
where
openRegisteredStream
openStream
:: Maybe (SlotNo, HeaderHash blk)
-> Maybe (SlotNo, HeaderHash blk)
-> ExceptT (UnknownRange blk)
m
(ImmDB.Iterator (HeaderHash blk) m b)
openRegisteredStream start end = ExceptT $
openStream start end = ExceptT $
bimap toUnknownRange (fmap snd . stopAt to) <$>
-- 'stopAt' needs to know the hash of each streamed block, so we \"Get\"
-- it in addition to @b@, but we drop it afterwards.
registeredStream db registry ((,) <$> GetHash <*> blockComponent) start end
openIterator db registry ((,) <$> GetHash <*> blockComponent) start end
where
toUnknownRange :: ImmDB.WrongBoundError (HeaderHash blk) -> UnknownRange blk
toUnknownRange e
Expand Down Expand Up @@ -570,7 +553,7 @@ streamAfter
(ImmDB.WrongBoundError (HeaderHash blk))
(ImmDB.Iterator (HeaderHash blk) m b))
streamAfter db registry blockComponent low =
registeredStream db registry blockComponent low' Nothing >>= \case
openIterator db registry blockComponent low' Nothing >>= \case
Left err -> return $ Left err
Right itr -> do
case low of
Expand Down
5 changes: 4 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Storage/ImmutableDB/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import Data.ByteString.Builder (Builder)
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)

import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)

import Ouroboros.Storage.Common
import Ouroboros.Storage.ImmutableDB.Types
import Ouroboros.Storage.Util.ErrorHandling (ErrorHandling)
Expand Down Expand Up @@ -197,7 +199,8 @@ data ImmutableDB hash m = ImmutableDB
-- prematurely closed with 'iteratorClose'.
, stream
:: forall b. HasCallStack
=> BlockComponent (ImmutableDB hash m) b
=> ResourceRegistry m
-> BlockComponent (ImmutableDB hash m) b
-> Maybe (SlotNo, hash)
-> Maybe (SlotNo, hash)
-> m (Either (WrongBoundError hash)
Expand Down
Loading