@@ -114,19 +114,19 @@ builder db@Database{..} keys = do
114114 val <- case fromMaybe (fromRight undefined idKey, Dirty Nothing ) status of
115115 (_, Clean r) -> pure r
116116 (_, Running force val _) -> do
117- liftIO $ modifyIORef toForce (force: )
117+ liftIO $ modifyIORef toForce (Wait force : )
118118 pure val
119119 (key, Dirty s) -> do
120120 act <- unliftAIO (refresh db key id s)
121121 let (force, val) = splitIO (join act)
122122 liftIO $ Ids. insert databaseValues id (key, Running force val s)
123- liftIO $ modifyIORef toForce (force: )
123+ liftIO $ modifyIORef toForce (Spawn force: )
124124 pure val
125125
126126 pure (id , val)
127127
128128 toForceList <- liftIO $ readIORef toForce
129- waitAll <- unliftAIO $ mapConcurrentlyAIO_ sequence_ $ increasingChunks toForceList
129+ waitAll <- unliftAIO $ mapConcurrentlyAIO_ id toForceList
130130 case toForceList of
131131 [] -> return $ Left results
132132 _ -> return $ Right $ do
@@ -204,7 +204,7 @@ updateReverseDeps
204204 -> [Id ] -- ^ Previous direct dependencies of Id
205205 -> IntSet -- ^ Current direct dependencies of Id
206206 -> IO ()
207- updateReverseDeps myId db prev new = uninterruptibleMask_ $ withLock (databaseReverseDepsLock db) $ do
207+ updateReverseDeps myId db prev new = withLock (databaseReverseDepsLock db) $ uninterruptibleMask_ $ do
208208 forM_ prev $ \ d ->
209209 unless (d `Set.member` new) $
210210 doOne (Set. delete myId) d
@@ -263,21 +263,23 @@ cleanupAsync ref = uninterruptibleMask_ $ do
263263 mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
264264 mapM_ waitCatch asyncs
265265
266+ data Wait a
267+ = Wait { justWait :: ! a }
268+ | Spawn { justWait :: ! a }
269+ deriving Functor
266270
267- mapConcurrentlyAIO_ :: (a -> IO () ) -> [a ] -> AIO ()
271+ waitOrSpawn :: Wait (IO a ) -> IO (Either (IO a ) (Async a ))
272+ waitOrSpawn (Wait io) = pure $ Left io
273+ waitOrSpawn (Spawn io) = Right <$> async io
274+
275+ mapConcurrentlyAIO_ :: (a -> IO () ) -> [Wait a ] -> AIO ()
268276mapConcurrentlyAIO_ _ [] = pure ()
269- mapConcurrentlyAIO_ f [one] = liftIO $ f one
277+ mapConcurrentlyAIO_ f [one] = liftIO $ justWait $ fmap f one
270278mapConcurrentlyAIO_ f many = do
271279 ref <- AIO ask
272- liftIO $ uninterruptibleMask $ \ restore -> do
273- asyncs <- liftIO $ traverse async (map (restore . f) many)
280+ waits <- liftIO $ uninterruptibleMask $ \ restore -> do
281+ waits <- liftIO $ traverse waitOrSpawn (map (fmap (restore . f)) many)
282+ let asyncs = rights waits
274283 liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
275- traverse_ wait asyncs
276-
277- -- >>> increasingChunks [1..20]
278- -- [[1,2],[3,4,5,6],[7,8,9,10,11,12,13,14],[15,16,17,18,19,20]]
279- increasingChunks :: [a ] -> [[a ]]
280- increasingChunks = go 2 where
281- go :: Int -> [a ] -> [[a ]]
282- go _ [] = []
283- go n xx = let (chunk, rest) = splitAt n xx in chunk : go (min 10 (n* 2 )) rest
284+ return waits
285+ liftIO $ traverse_ (either id wait) waits
0 commit comments