1+ {-
2+ Module : Development.IDE.Core.WorkerThread
3+ Author : @soulomoon
4+
5+ Description : This module provides an API for managing worker threads in the IDE.
6+ see Note [Serializing runs in separate thread]
7+ -}
18module Development.IDE.Core.WorkerThread
2- (withWorkerQueue , awaitRunInThread )
9+ (withWorkerQueue , awaitRunInThread , withWorkerQueueOfOne , WorkerQueue , writeWorkerQueue )
310 where
411
512import Control.Concurrent.Async
613import Control.Concurrent.STM
714import Control.Concurrent.Strict (newBarrier , signalBarrier ,
815 waitBarrier )
16+ import Control.Exception (finally )
917import Control.Monad (forever )
1018import Control.Monad.Cont (ContT (ContT ))
19+ import Control.Monad.IO.Class (liftIO )
1120
1221{-
1322Note [Serializing runs in separate thread]
@@ -18,31 +27,61 @@ Like the db writes, session loading in session loader, shake session restarts.
1827
1928Originally we used various ways to implement this, but it was hard to maintain and error prone.
2029Moreover, we can not stop these threads uniformly when we are shutting down the server.
21-
22- `Development.IDE.Core.WorkerThread` module provides a simple api to implement this easily.
2330-}
2431
25- -- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
32+ data WorkerQueue a = WorkerQueueOfOne (TMVar a ) | WorkerQueueOfMany (TQueue a )
33+
34+ writeWorkerQueue :: WorkerQueue a -> a -> STM ()
35+ writeWorkerQueue (WorkerQueueOfOne tvar) action = putTMVar tvar action
36+ writeWorkerQueue (WorkerQueueOfMany tqueue) action = writeTQueue tqueue action
37+
38+ newWorkerQueue :: STM (WorkerQueue a )
39+ newWorkerQueue = WorkerQueueOfMany <$> newTQueue
40+
41+ newWorkerQueueOfOne :: STM (WorkerQueue a )
42+ newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar
43+
44+
45+ -- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
2646-- thread which polls the queue for requests and runs the given worker
2747-- function on them.
28- withWorkerQueue :: (t -> IO a ) -> ContT () IO (TQueue t )
29- withWorkerQueue workerAction = ContT $ \ mainAction -> do
30- q <- newTQueueIO
48+ withWorkerQueue :: (t -> IO a ) -> ContT () IO (WorkerQueue t )
49+ withWorkerQueue workerAction = do
50+ q <- liftIO $ atomically newWorkerQueue
51+ runWorkerQueue q workerAction
52+
53+ -- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
54+ -- and one action can only be queued after the previous action has been done.
55+ -- this is useful when we want to cancel the action waiting in the queue, if it's thread is cancelled.
56+ -- e.g. session loading in session loader. When a shake session is restarted, we want to cancel the previous pending session loading.
57+ withWorkerQueueOfOne :: (t -> IO a ) -> ContT () IO (WorkerQueue t )
58+ withWorkerQueueOfOne workerAction = do
59+ q <- liftIO $ atomically newWorkerQueueOfOne
60+ runWorkerQueue q workerAction
61+
62+ runWorkerQueue :: WorkerQueue t -> (t -> IO a ) -> ContT () IO (WorkerQueue t )
63+ runWorkerQueue q workerAction = ContT $ \ mainAction -> do
3164 withAsync (writerThread q) $ \ _ -> mainAction q
3265 where
3366 writerThread q =
3467 forever $ do
35- l <- atomically $ readTQueue q
36- workerAction l
68+ case q of
69+ -- only remove the action from the queue after it has been run if it is a one-shot queue
70+ WorkerQueueOfOne tvar -> do
71+ l <- atomically $ readTMVar tvar
72+ workerAction l `finally` atomically (takeTMVar tvar)
73+ WorkerQueueOfMany q -> do
74+ l <- atomically $ readTQueue q
75+ workerAction l
3776
3877-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
3978-- and then blocks until the result is computed.
40- awaitRunInThread :: TQueue (IO () ) -> IO result -> IO result
79+ awaitRunInThread :: WorkerQueue (IO () ) -> IO result -> IO result
4180awaitRunInThread q act = do
4281 -- Take an action from TQueue, run it and
4382 -- use barrier to wait for the result
4483 barrier <- newBarrier
45- atomically $ writeTQueue q $ do
84+ atomically $ writeWorkerQueue q $ do
4685 res <- act
4786 signalBarrier barrier res
4887 waitBarrier barrier
0 commit comments