{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Torch.Data.Internal where
import Control.Concurrent.Async.Lifted (concurrently)
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe (bracket, finally)
import Control.Monad (when)
import Control.Monad.Base (MonadBase (..))
import Control.Monad.Cont (ContT (ContT))
import Control.Monad.Trans.Control
import Pipes
import Pipes.Concurrent hiding (atomically)
import qualified Pipes.Prelude as P
runWithBuffer ::
forall a m b.
(MonadBaseControl IO m) =>
Int ->
(Output a -> m ()) ->
ContT b m (ListT m a)
runWithBuffer :: forall a (m :: * -> *) b.
MonadBaseControl IO m =>
Int -> (Output a -> m ()) -> ContT b m (ListT m a)
runWithBuffer Int
bufferSize Output a -> m ()
batchHandler = forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT forall a b. (a -> b) -> a -> b
$ \ListT m a -> m b
f ->
forall a b. (a, b) -> b
snd
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a l r.
MonadBaseControl IO m =>
Buffer a -> (Output a -> m l) -> (Input a -> m r) -> m (l, r)
withBufferLifted
(forall a. Int -> Buffer a
bounded Int
bufferSize)
(\Output a
batchOutput -> Output a -> m ()
batchHandler Output a
batchOutput)
(\Input a
input -> ListT m a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Producer a m () -> ListT m a
Select forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadBase IO m =>
Input a -> Producer' a m ()
fromInput' Input a
input)
liftedBracket :: MonadBaseControl IO m => m a -> (a -> m b) -> (a -> m c) -> m c
liftedBracket :: forall (m :: * -> *) a b c.
MonadBaseControl IO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
liftedBracket m a
acquire a -> m b
release a -> m c
action = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(RunInBase m IO
runInIO m a
acquire)
(\StM m a
saved -> RunInBase m IO
runInIO (forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m a
saved forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m b
release))
(\StM m a
saved -> RunInBase m IO
runInIO (forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m a
saved forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m c
action))
withBufferLifted ::
(MonadBaseControl IO m) =>
Buffer a ->
(Output a -> m l) ->
(Input a -> m r) ->
m (l, r)
withBufferLifted :: forall (m :: * -> *) a l r.
MonadBaseControl IO m =>
Buffer a -> (Output a -> m l) -> (Input a -> m r) -> m (l, r)
withBufferLifted Buffer a
buffer Output a -> m l
fOutput Input a -> m r
fInput =
forall (m :: * -> *) a b c.
MonadBaseControl IO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
liftedBracket
(forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
(\(Output a
_, Input a
_, STM ()
seal) -> forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM ()
seal)
( \(Output a
output, Input a
input, STM ()
seal) ->
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m (a, b)
concurrently
(Output a -> m l
fOutput Output a
output forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`liftedFinally` (forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM ()
seal))
(Input a -> m r
fInput Input a
input forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`liftedFinally` (forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM ()
seal))
)
fromInput' :: (MonadBase IO m) => Input a -> Producer' a m ()
fromInput' :: forall (m :: * -> *) a.
MonadBase IO m =>
Input a -> Producer' a m ()
fromInput' Input a
input = forall {x'} {x}. Proxy x' x () a m ()
loop
where
loop :: Proxy x' x () a m ()
loop = do
Maybe a
ma <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Input a -> STM (Maybe a)
recv Input a
input
case Maybe a
ma of
Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
a -> do
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a
Proxy x' x () a m ()
loop
toOutput' :: (MonadBase IO m) => Output a -> Consumer' a m ()
toOutput' :: forall (m :: * -> *) a.
MonadBase IO m =>
Output a -> Consumer' a m ()
toOutput' Output a
output = forall {y'} {y}. Proxy () a y' y m ()
loop
where
loop :: Proxy () a y' y m ()
loop = do
a
a <- forall (m :: * -> *) a. Functor m => Consumer' a m a
await
Bool
alive <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Output a -> a -> STM Bool
send Output a
output a
a
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive Proxy () a y' y m ()
loop
liftedFinally :: MonadBaseControl IO m => m a -> m b -> m a
liftedFinally :: forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
liftedFinally m a
a m b
sequel = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
finally
(RunInBase m IO
runInIO m a
a)
(RunInBase m IO
runInIO m b
sequel)
atomically :: MonadIO m => STM a -> m a
atomically :: forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
STM.atomically
instance (MonadBase IO m) => MonadBase IO (Proxy a' a b' b m) where
liftBase :: forall α. IO α -> Proxy a' a b' b m α
liftBase = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase