{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker
  ) where

import Control.Concurrent.STM
import Control.Exception (SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Data.IORef
import qualified Network.HTTP.Types as H
import qualified System.TimeManager as T

import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2
import Network.HTTP2.Priority
import Network.HTTP2.Server.API
import Network.HTTP2.Server.Context
import Network.HTTP2.Server.EncodeFrame
import Network.HTTP2.Server.Manager
import Network.HTTP2.Server.Queue
import Network.HTTP2.Server.Stream
import Network.HTTP2.Server.Types

----------------------------------------------------------------

pushStream :: Context
           -> Stream -- parent stream
           -> ValueTable -- request
           -> [PushPromise]
           -> IO OutputType
pushStream :: Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
_ Stream
_ ValueTable
_ [] = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
pushStream ctx :: Context
ctx@Context{TVar WindowSize
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef Settings
TQueue Control
TQueue Input
DynamicTable
PriorityTree Output
StreamTable
connectionWindow :: Context -> TVar WindowSize
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQ :: Context -> PriorityTree Output
inputQ :: Context -> TQueue Input
serverStreamId :: Context -> IORef WindowSize
clientStreamId :: Context -> IORef WindowSize
continued :: Context -> IORef (Maybe WindowSize)
priorityTreeSize :: Context -> IORef WindowSize
concurrency :: Context -> IORef WindowSize
streamTable :: Context -> StreamTable
firstSettings :: Context -> IORef Bool
http2settings :: Context -> IORef Settings
connectionWindow :: TVar WindowSize
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQ :: PriorityTree Output
inputQ :: TQueue Input
serverStreamId :: IORef WindowSize
clientStreamId :: IORef WindowSize
continued :: IORef (Maybe WindowSize)
priorityTreeSize :: IORef WindowSize
concurrency :: IORef WindowSize
streamTable :: StreamTable
firstSettings :: IORef Bool
http2settings :: IORef Settings
..} Stream
pstrm ValueTable
reqvt [PushPromise]
pps0
  | WindowSize
len WindowSize -> WindowSize -> Bool
forall a. Eq a => a -> a -> Bool
== WindowSize
0 = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
  | Bool
otherwise = do
        Bool
pushable <- Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
        if Bool
pushable then do
            TVar WindowSize
tvar <- WindowSize -> IO (TVar WindowSize)
forall a. a -> IO (TVar a)
newTVarIO WindowSize
0
            WindowSize
lim <- TVar WindowSize -> [PushPromise] -> WindowSize -> IO WindowSize
forall a.
Num a =>
TVar a -> [PushPromise] -> WindowSize -> IO WindowSize
push TVar WindowSize
tvar [PushPromise]
pps0 WindowSize
0
            if WindowSize
lim WindowSize -> WindowSize -> Bool
forall a. Eq a => a -> a -> Bool
== WindowSize
0 then
              OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
             else
              OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (WindowSize -> TVar WindowSize -> IO ()
forall a. Ord a => a -> TVar a -> IO ()
waiter WindowSize
lim TVar WindowSize
tvar)
          else
            OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
ORspn
  where
    !pid :: WindowSize
pid = Stream -> WindowSize
streamNumber Stream
pstrm
    !len :: WindowSize
len = [PushPromise] -> WindowSize
forall (t :: * -> *) a. Foldable t => t a -> WindowSize
length [PushPromise]
pps0
    increment :: TVar a -> IO ()
increment TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+a
1)
    waiter :: a -> TVar a -> IO ()
waiter a
lim TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
check (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> WindowSize -> IO WindowSize
push TVar a
_ [] !WindowSize
n = WindowSize -> IO WindowSize
forall (m :: * -> *) a. Monad m => a -> m a
return (WindowSize
n :: Int)
    push TVar a
tvar (PushPromise
pp:[PushPromise]
pps) !WindowSize
n = do
        WindowSize
ws <- Settings -> WindowSize
initialWindowSize (Settings -> WindowSize) -> IO Settings -> IO WindowSize
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
        let !w :: WindowSize
w = PushPromise -> WindowSize
promiseWeight PushPromise
pp
            !pri :: Priority
pri = Priority
defaultPriority { weight :: WindowSize
weight = WindowSize
w }
            !pre :: Precedence
pre = Priority -> Precedence
toPrecedence Priority
pri
        Stream
newstrm <- Context -> WindowSize -> Precedence -> IO Stream
newPushStream Context
ctx WindowSize
ws Precedence
pre
        let !sid :: WindowSize
sid = Stream -> WindowSize
streamNumber Stream
newstrm
        StreamTable -> WindowSize -> Stream -> IO ()
insert StreamTable
streamTable WindowSize
sid Stream
newstrm
        let !scheme :: HeaderValue
scheme = Maybe HeaderValue -> HeaderValue
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe HeaderValue -> HeaderValue)
-> Maybe HeaderValue -> HeaderValue
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenScheme ValueTable
reqvt
            -- fixme: this value can be Nothing
            !auth :: HeaderValue
auth   = Maybe HeaderValue -> HeaderValue
forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenHost ValueTable
reqvt
                            Maybe HeaderValue -> Maybe HeaderValue -> Maybe HeaderValue
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe HeaderValue
getHeaderValue Token
tokenAuthority ValueTable
reqvt)
            !path :: HeaderValue
path = PushPromise -> HeaderValue
promiseRequestPath PushPromise
pp
            !promiseRequest :: [(Token, HeaderValue)]
promiseRequest = [(Token
tokenMethod, HeaderValue
H.methodGet)
                               ,(Token
tokenScheme, HeaderValue
scheme)
                               ,(Token
tokenAuthority, HeaderValue
auth)
                               ,(Token
tokenPath, HeaderValue
path)]
            !ot :: OutputType
ot = [(Token, HeaderValue)] -> WindowSize -> OutputType
OPush [(Token, HeaderValue)]
promiseRequest WindowSize
pid
            !rsp :: Response
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
            !out :: Output
out = Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
newstrm Response
rsp OutputType
ot Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (IO () -> Output) -> IO () -> Output
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall a. Num a => TVar a -> IO ()
increment TVar a
tvar
        PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ Output
out
        TVar a -> [PushPromise] -> WindowSize -> IO WindowSize
push TVar a
tvar [PushPromise]
pps (WindowSize
n WindowSize -> WindowSize -> WindowSize
forall a. Num a => a -> a -> a
+ WindowSize
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response :: Context -> Manager -> T.Handle -> ThreadContinue -> Stream -> Request -> Response -> [PushPromise] -> IO ()
response :: Context
-> Manager
-> Handle
-> ThreadContinue
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
response ctx :: Context
ctx@Context{TVar WindowSize
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef Settings
TQueue Control
TQueue Input
DynamicTable
PriorityTree Output
StreamTable
connectionWindow :: TVar WindowSize
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQ :: PriorityTree Output
inputQ :: TQueue Input
serverStreamId :: IORef WindowSize
clientStreamId :: IORef WindowSize
continued :: IORef (Maybe WindowSize)
priorityTreeSize :: IORef WindowSize
concurrency :: IORef WindowSize
streamTable :: StreamTable
firstSettings :: IORef Bool
http2settings :: IORef Settings
connectionWindow :: Context -> TVar WindowSize
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQ :: Context -> PriorityTree Output
inputQ :: Context -> TQueue Input
serverStreamId :: Context -> IORef WindowSize
clientStreamId :: Context -> IORef WindowSize
continued :: Context -> IORef (Maybe WindowSize)
priorityTreeSize :: Context -> IORef WindowSize
concurrency :: Context -> IORef WindowSize
streamTable :: Context -> StreamTable
firstSettings :: Context -> IORef Bool
http2settings :: Context -> IORef Settings
..} Manager
mgr Handle
th ThreadContinue
tconf Stream
strm Request
req Response
rsp [PushPromise]
pps = case Response -> ResponseBody
responseBody Response
rsp of
  ResponseBody
RspNoBody -> do
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
ORspn Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  RspBuilder Builder
_ -> do
      OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  RspFile FileSpec
_ -> do
      OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp Maybe (TBQueue RspStreaming)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  RspStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
      OutputType
otyp <- Context -> Stream -> ValueTable -> [PushPromise] -> IO OutputType
pushStream Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
      -- We must not exit this WAI application.
      -- If the application exits, streaming would be also closed.
      -- So, this work occupies this thread.
      --
      -- We need to increase the number of workers.
      Manager -> IO ()
spawnAction Manager
mgr
      -- After this work, this thread stops to decease
      -- the number of workers.
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
      -- Since streaming body is loop, we cannot control it.
      -- So, let's serialize 'Builder' with a designated queue.
      TBQueue RspStreaming
tbq <- Natural -> IO (TBQueue RspStreaming)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
      PriorityTree Output -> Output -> IO ()
enqueueOutput PriorityTree Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream
-> Response
-> OutputType
-> Maybe (TBQueue RspStreaming)
-> IO ()
-> Output
Output Stream
strm Response
rsp OutputType
otyp (TBQueue RspStreaming -> Maybe (TBQueue RspStreaming)
forall a. a -> Maybe a
Just TBQueue RspStreaming
tbq) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
      let push :: Builder -> IO ()
push Builder
b = do
            Handle -> IO ()
T.pause Handle
th
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq (Builder -> RspStreaming
RSBuilder Builder
b)
            Handle -> IO ()
T.resume Handle
th
          flush :: IO ()
flush  = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq RspStreaming
RSFlush
      (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue RspStreaming -> RspStreaming -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue RspStreaming
tbq RspStreaming
RSFinish
      Manager -> IO ()
deleteMyId Manager
mgr
  where
    ([(Token, HeaderValue)]
_,ValueTable
reqvt) = Request -> ([(Token, HeaderValue)], ValueTable)
requestHeaders Request
req

worker :: Context -> Manager -> Server -> Action
worker :: Context -> Manager -> Server -> IO ()
worker ctx :: Context
ctx@Context{TQueue Input
inputQ :: TQueue Input
inputQ :: Context -> TQueue Input
inputQ,TQueue Control
controlQ :: TQueue Control
controlQ :: Context -> TQueue Control
controlQ} Manager
mgr Server
server = do
    StreamInfo
sinfo <- IO StreamInfo
newStreamInfo
    ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
    Manager -> (Handle -> IO ()) -> IO ()
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo -> ThreadContinue -> Handle -> IO ()
go StreamInfo
sinfo ThreadContinue
tcont
  where
    go :: StreamInfo -> ThreadContinue -> Handle -> IO ()
go StreamInfo
sinfo ThreadContinue
tcont Handle
th = do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
        Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
T.pause Handle
th
            Input Stream
strm Request
req <- STM Input -> IO Input
forall a. STM a -> IO a
atomically (STM Input -> IO Input) -> STM Input -> IO Input
forall a b. (a -> b) -> a -> b
$ TQueue Input -> STM Input
forall a. TQueue a -> STM a
readTQueue TQueue Input
inputQ
            let req' :: Request
req' = Request -> Handle -> Request
pauseRequestBody Request
req Handle
th
            StreamInfo -> Stream -> IO ()
setStreamInfo StreamInfo
sinfo Stream
strm
            Handle -> IO ()
T.resume Handle
th
            Handle -> IO ()
T.tickle Handle
th
            let aux :: Aux
aux = Handle -> Aux
Aux Handle
th
            Server
server Request
req' Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Context
-> Manager
-> Handle
-> ThreadContinue
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
response Context
ctx Manager
mgr Handle
th ThreadContinue
tcont Stream
strm Request
req'
        Bool
cont1 <- case Either SomeException ()
ex of
            Right () -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Left e :: SomeException
e@(SomeException e
_)
              -- killed by the local worker manager
              | Just AsyncException
ThreadKilled    <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
              -- killed by the local timeout manager
              | Just TimeoutThread
T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
                  StreamInfo -> IO ()
cleanup StreamInfo
sinfo
                  Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              | Bool
otherwise -> do
                  StreamInfo -> IO ()
cleanup StreamInfo
sinfo
                  Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
        StreamInfo -> IO ()
clearStreamInfo StreamInfo
sinfo
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo -> ThreadContinue -> Handle -> IO ()
go StreamInfo
sinfo ThreadContinue
tcont Handle
th
    pauseRequestBody :: Request -> Handle -> Request
pauseRequestBody Request
req Handle
th = Request
req { requestBody :: IO HeaderValue
requestBody = IO HeaderValue
readBody' }
      where
        !readBody :: IO HeaderValue
readBody = Request -> IO HeaderValue
requestBody Request
req
        !readBody' :: IO HeaderValue
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            HeaderValue
bs <- IO HeaderValue
readBody
            Handle -> IO ()
T.resume Handle
th
            HeaderValue -> IO HeaderValue
forall (m :: * -> *) a. Monad m => a -> m a
return HeaderValue
bs
    cleanup :: StreamInfo -> IO ()
cleanup StreamInfo
sinfo = do
        Maybe Stream
minp <- StreamInfo -> IO (Maybe Stream)
getStreamInfo StreamInfo
sinfo
        case Maybe Stream
minp of
            Maybe Stream
Nothing   -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just Stream
strm -> do
                Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
                let !frame :: HeaderValue
frame = ErrorCodeId -> WindowSize -> HeaderValue
resetFrame ErrorCodeId
InternalError (Stream -> WindowSize
streamNumber Stream
strm)
                TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ HeaderValue -> Control
CFrame HeaderValue
frame

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo = StreamInfo (IORef (Maybe Stream))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO StreamInfo
newStreamInfo :: IO StreamInfo
newStreamInfo = IORef (Maybe Stream) -> StreamInfo
StreamInfo (IORef (Maybe Stream) -> StreamInfo)
-> IO (IORef (Maybe Stream)) -> IO StreamInfo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Stream -> IO (IORef (Maybe Stream))
forall a. a -> IO (IORef a)
newIORef Maybe Stream
forall a. Maybe a
Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe Stream)
ref) = IORef (Maybe Stream) -> Maybe Stream -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe Stream)
ref Maybe Stream
forall a. Maybe a
Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo -> Stream -> IO ()
setStreamInfo :: StreamInfo -> Stream -> IO ()
setStreamInfo (StreamInfo IORef (Maybe Stream)
ref) Stream
inp = IORef (Maybe Stream) -> Maybe Stream -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe Stream)
ref (Maybe Stream -> IO ()) -> Maybe Stream -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo -> IO (Maybe Stream)
getStreamInfo :: StreamInfo -> IO (Maybe Stream)
getStreamInfo (StreamInfo IORef (Maybe Stream)
ref) = IORef (Maybe Stream) -> IO (Maybe Stream)
forall a. IORef a -> IO a
readIORef IORef (Maybe Stream)
ref