-- | Operations for running IO operations asynchronously.

-- These are the same as in the 'async' package. We do not use
-- 'async' to avoid its dependencies.

{- License for the 'async' package
Copyright (c) 2012, Simon Marlow

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.

    * Redistributions in binary form must reproduce the above
      copyright notice, this list of conditions and the following
      disclaimer in the documentation and/or other materials provided
      with the distribution.

    * Neither the name of Simon Marlow nor the names of other
      contributors may be used to endorse or promote products derived
      from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-}

{-# LANGUAGE CPP, MagicHash, UnboxedTuples #-}

module Control.Concurrent.Async (
  async, withAsync, wait, asyncThreadId, cancel, concurrently
  ) where

import Control.Concurrent.STM
import Control.Exception
  ( BlockedIndefinitelyOnMVar(..)
  , BlockedIndefinitelyOnSTM(..)
  , Exception
  , SomeException
  , asyncExceptionFromException
  , asyncExceptionToException
  , catch
  , fromException
  , onException
  , toException
  , try
  )
import Control.Concurrent
import Control.Monad
import Data.IORef
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO hiding (onException)

#if MIN_VERSION_base(4,21,0)
import Control.Exception (ExceptionWithContext, tryWithContext, catchNoPropagate, rethrowIO)
#else
type ExceptionWithContext x = x

catchNoPropagate :: IO a -> (ExceptionWithContext SomeException -> IO a) -> IO a
catchNoPropagate = catchAll

tryWithContext :: IO a -> IO (Either (ExceptionWithContext SomeException) a)
tryWithContext = try

rethrowIO :: ExceptionWithContext SomeException -> IO a
rethrowIO = throwIO
#endif

-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
  { forall a. Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
                  -- ^ Returns the t'ThreadId' of the thread running
                  -- the given t'Async'.
  , forall a.
Async a -> STM (Either (ExceptionWithContext SomeException) a)
_asyncWait    :: STM (Either (ExceptionWithContext SomeException) a)
  }

-- | Spawn an asynchronous action in a separate thread.
async :: IO a -> IO (Async a)
async :: forall a. IO a -> IO (Async a)
async = ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO

asyncUsing :: (IO () -> IO ThreadId)
           -> IO a -> IO (Async a)
asyncUsing :: forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
   var <- IO (TMVar (Either (ExceptionWithContext SomeException) a))
forall a. IO (TMVar a)
newEmptyTMVarIO
   -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
   -- slightly faster:
   t <- mask $ \forall a. IO a -> IO a
restore ->
          IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either (ExceptionWithContext SomeException) a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either (ExceptionWithContext SomeException) a)
-> (Either (ExceptionWithContext SomeException) a -> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either (ExceptionWithContext SomeException) a -> STM ())
-> Either (ExceptionWithContext SomeException) a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either (ExceptionWithContext SomeException) a)
-> Either (ExceptionWithContext SomeException) a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either (ExceptionWithContext SomeException) a)
var
   return (Async t (readTMVar var))

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
--
-- > withAsync action inner = mask $ \restore -> do
-- >   a <- async (restore action)
-- >   restore (inner a) `finally` uninterruptibleCancel a
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Note: a reference to the child thread is kept alive until the call
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync = ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  var <- IO (TMVar (Either (ExceptionWithContext SomeException) a))
forall a. IO (TMVar a)
newEmptyTMVarIO
  mask $ \forall a. IO a -> IO a
restore -> do
    t <- IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either (ExceptionWithContext SomeException) a)
forall e a.
Exception e =>
IO a -> IO (Either (ExceptionWithContext e) a)
tryWithContext (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either (ExceptionWithContext SomeException) a)
-> (Either (ExceptionWithContext SomeException) a -> IO ())
-> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either (ExceptionWithContext SomeException) a -> STM ())
-> Either (ExceptionWithContext SomeException) a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either (ExceptionWithContext SomeException) a)
-> Either (ExceptionWithContext SomeException) a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either (ExceptionWithContext SomeException) a)
var
    let a = ThreadId
-> STM (Either (ExceptionWithContext SomeException) a) -> Async a
forall a.
ThreadId
-> STM (Either (ExceptionWithContext SomeException) a) -> Async a
Async ThreadId
t (TMVar (Either (ExceptionWithContext SomeException) a)
-> STM (Either (ExceptionWithContext SomeException) a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either (ExceptionWithContext SomeException) a)
var)
    r <- restore (inner a) `catchNoPropagate` \ExceptionWithContext SomeException
e -> do
      Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
      ExceptionWithContext SomeException -> IO b
forall e a. Exception e => ExceptionWithContext e -> IO a
rethrowIO (ExceptionWithContext SomeException
e :: ExceptionWithContext SomeException)
    uninterruptibleCancel a
    return r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: forall a. Async a -> IO a
wait = IO a -> IO a
forall a. IO a -> IO a
tryAgain (IO a -> IO a) -> (Async a -> IO a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (Async a -> STM a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM a
forall a. Async a -> STM a
waitSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either (ExceptionWithContext SomeException) a)
waitCatch :: forall a.
Async a -> IO (Either (ExceptionWithContext SomeException) a)
waitCatch = IO (Either (ExceptionWithContext SomeException) a)
-> IO (Either (ExceptionWithContext SomeException) a)
forall a. IO a -> IO a
tryAgain (IO (Either (ExceptionWithContext SomeException) a)
 -> IO (Either (ExceptionWithContext SomeException) a))
-> (Async a -> IO (Either (ExceptionWithContext SomeException) a))
-> Async a
-> IO (Either (ExceptionWithContext SomeException) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either (ExceptionWithContext SomeException) a)
-> IO (Either (ExceptionWithContext SomeException) a)
forall a. STM a -> IO a
atomically (STM (Either (ExceptionWithContext SomeException) a)
 -> IO (Either (ExceptionWithContext SomeException) a))
-> (Async a -> STM (Either (ExceptionWithContext SomeException) a))
-> Async a
-> IO (Either (ExceptionWithContext SomeException) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Either (ExceptionWithContext SomeException) a)
forall a.
Async a -> STM (Either (ExceptionWithContext SomeException) a)
waitCatchSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: forall a. Async a -> STM a
waitSTM Async a
a = do
   r <- Async a -> STM (Either (ExceptionWithContext SomeException) a)
forall a.
Async a -> STM (Either (ExceptionWithContext SomeException) a)
waitCatchSTM Async a
a
   either throwSTM return r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either (ExceptionWithContext SomeException) a)
waitCatchSTM :: forall a.
Async a -> STM (Either (ExceptionWithContext SomeException) a)
waitCatchSTM (Async ThreadId
_ STM (Either (ExceptionWithContext SomeException) a)
w) = STM (Either (ExceptionWithContext SomeException) a)
w

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the t'Async' thread to quit.
-- Has no effect if the t'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the t'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: forall a. Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either (ExceptionWithContext SomeException) a)
_) = ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either (ExceptionWithContext SomeException) a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Async a -> IO (Either (ExceptionWithContext SomeException) a)
forall a.
Async a -> IO (Either (ExceptionWithContext SomeException) a)
waitCatch Async a
a)

-- | The exception thrown by `cancel` to terminate a thread.
data AsyncCancelled = AsyncCancelled
  deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
(Int -> AsyncCancelled -> ShowS)
-> (AsyncCancelled -> String)
-> ([AsyncCancelled] -> ShowS)
-> Show AsyncCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshow :: AsyncCancelled -> String
show :: AsyncCancelled -> String
$cshowList :: [AsyncCancelled] -> ShowS
showList :: [AsyncCancelled] -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
(AsyncCancelled -> AsyncCancelled -> Bool)
-> (AsyncCancelled -> AsyncCancelled -> Bool) -> Eq AsyncCancelled
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
/= :: AsyncCancelled -> AsyncCancelled -> Bool
Eq)

instance Exception AsyncCancelled where
  fromException :: SomeException -> Maybe AsyncCancelled
fromException = SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: AsyncCancelled -> SomeException
toException = AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException

-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: forall a. Async a -> IO ()
uninterruptibleCancel = IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> (Async a -> IO ()) -> Async a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
cancel

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: forall a b. IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (a, b))
-> IO (a, b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right ([Either a b] -> IO (Either SomeException (Either a b)) -> IO (a, b)
forall {e} {a} {b}.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
  where
    collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Either a b]
xs IO (Either e (Either a b))
m = do
        e <- IO (Either e (Either a b))
m
        case e of
            Left e
ex -> e -> IO (a, b)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO e
ex
            Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m

concurrently' :: IO a -> IO b
             -> (IO (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' :: forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
    done <- IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
    mask $ \forall a. IO a -> IO a
restore -> do
        -- Note: uninterruptibleMask here is because we must not allow
        -- the putMVar in the exception handler to be interrupted,
        -- otherwise the parent thread will deadlock when it waits for
        -- the thread to terminate.
        lid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO () -> IO ()
forall a. IO a -> IO a
restore (IO a
left IO a -> (a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
            IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
        rid <- forkIO $ uninterruptibleMask_ $
          restore (right >>= putMVar done . Right . Right)
            `catchAll` (putMVar done . Left)

        count <- newIORef (2 :: Int)
        let takeDone = do
                r <- MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done      -- interruptible
                -- Decrement the counter so we know how many takes are left.
                -- Since only the parent thread is calling this, we can
                -- use non-atomic modifications.
                -- NB. do this *after* takeMVar, because takeMVar might be
                -- interrupted.
                modifyIORef count (subtract 1)
                return r

        let tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnMVar -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f

            stop = do
                -- kill right before left, to match the semantics of
                -- the version using withAsync. (#27)
                IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  count' <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
count
                  -- we only need to use killThread if there are still
                  -- children alive.  Note: forkIO here is because the
                  -- child thread could be in an uninterruptible
                  -- putMVar.
                  when (count' > 0) $
                    void $ forkIO $ do
                      throwTo rid AsyncCancelled
                      throwTo lid AsyncCancelled
                  -- ensure the children are really dead
                  replicateM_ count' (tryAgain $ takeMVar done)

        r <- collect (tryAgain $ takeDone) `onException` stop
        stop
        return r

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: forall a. IO a -> (SomeException -> IO a) -> IO a
catchAll = IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
Control.Exception.catch

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
 -> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case ((State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forall a.
(State# RealWorld -> (# State# RealWorld, a #))
-> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# (IO () -> State# RealWorld -> (# State# RealWorld, () #)
forall a. IO a -> State# RealWorld -> (# State# RealWorld, a #)
unIO IO ()
action) State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)