module Streaming.Sort
(
sort
, sortBy
, sortOn
, withFileSort
, withFileSortBy
, SortException (..)
, Config
, defaultConfig
, setConfig
, chunkSize
, maxFiles
, useDirectory
) where
import Streaming (Of(..), Stream)
import qualified Streaming as S
import Streaming.Binary (decoded)
import qualified Streaming.Prelude as S
import Streaming.With
import Data.Binary (Binary, encode)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Streaming as BS
import Control.Exception (Exception(..), IOException,
mapException)
import Control.Monad (join, void)
import Control.Monad.Catch (MonadMask, MonadThrow, finally,
throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Data.Bool (bool)
import Data.Coerce (Coercible, coerce)
import Data.Function (on)
import Data.Functor.Identity (Identity(Identity), runIdentity)
import Data.Int (Int64)
import qualified Data.List as L
import Data.Maybe (catMaybes)
import System.Directory (doesDirectoryExist, getPermissions,
removeFile, writable)
import System.IO (hClose, openBinaryTempFile)
sort :: (Monad m, Ord a) => Stream (Of a) m r -> Stream (Of a) m r
sort = sortBy compare
sortBy :: (Monad m) => (a -> a -> Ordering) -> Stream (Of a) m r -> Stream (Of a) m r
sortBy cmp s = lift (S.toList s) >>= srt
where
srt (as :> r) = S.each (L.sortBy cmp as) >> return r
sortOn :: (Ord b, Monad m) => (a -> b) -> Stream (Of a) m r -> Stream (Of a) m r
sortOn f = S.map fst
. sortBy (compare `on` snd)
. S.map ((,) <*> f)
data Config = Config
{ _chunkSize :: !Int
, _maxFiles :: !Int
, _useDirectory :: !(Maybe FilePath)
} deriving (Show)
defaultConfig :: Config
defaultConfig = Config
{ _chunkSize = 1000
, _maxFiles = 100
, _useDirectory = Nothing
}
setConfig :: (forall f. (Functor f) => (a -> f a) -> Config -> f Config)
-> a -> Config -> Config
setConfig lens a = runIdentity #. lens (const (Identity a))
(#.) :: (Coercible c b) => (b -> c) -> (a -> b) -> (a -> c)
(#.) _ = coerce (\x -> x :: b) :: forall a b. Coercible b a => a -> b
chunkSize :: (Functor f) => (Int -> f Int) -> Config -> f Config
chunkSize inj cfg = (\v -> cfg { _chunkSize = v}) <$> inj (_chunkSize cfg)
maxFiles :: (Functor f) => (Int -> f Int) -> Config -> f Config
maxFiles inj cfg = (\v -> cfg { _maxFiles = v}) <$> inj (_maxFiles cfg)
useDirectory :: (Functor f) => (Maybe FilePath -> f (Maybe FilePath)) -> Config -> f Config
useDirectory inj cfg = (\v -> cfg { _useDirectory = v}) <$> inj (_useDirectory cfg)
withFileSort :: (Ord a, Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
=> Config -> Stream (Of a) m v
-> (Stream (Of a) n () -> m r) -> m r
withFileSort cfg = withFileSortBy cfg compare
withFileSortBy :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
=> Config -> (a -> a -> Ordering) -> Stream (Of a) m v
-> (Stream (Of a) n () -> m r) -> m r
withFileSortBy cfg cmp str k = mapException SortIO . createDir $ \dir ->
mergeAllFiles (_maxFiles cfg) dir cmp (initStream dir) k
where
createDir k' = liftIO (traverse checkDir (_useDirectory cfg))
>>= (`getTmpDir` k') . join
checkDir dir = do exists <- doesDirectoryExist dir
canWrite <- writable <$> getPermissions dir
return (bool Nothing (Just dir) (exists && canWrite))
getTmpDir mdir = maybe withSystemTempDirectory withTempDirectory mdir "streaming-sort"
initStream dir = initialSort (_chunkSize cfg) dir cmp str
initialSort :: (Binary a, MonadMask m, MonadIO m)
=> Int -> FilePath -> (a -> a -> Ordering)
-> Stream (Of a) m r
-> Stream (Of FilePath) m r
initialSort chnkSize dir cmp =
S.mapped (writeSortedData dir)
. S.maps (sortBy cmp)
. S.chunksOf chnkSize
mergeAllFiles :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
=> Int -> FilePath -> (a -> a -> Ordering)
-> Stream (Of FilePath) m v
-> (Stream (Of a) n () -> m r) -> m r
mergeAllFiles numFiles tmpDir cmp files k = go files
where
go = checkEmpty . S.mapped S.toList . S.chunksOf numFiles
checkEmpty chunks = S.uncons chunks >>= maybe (k (return ()))
(uncurry checkSingleChunk)
checkSingleChunk ch chunks =
S.uncons chunks >>= maybe (withFilesSort cmp ch k)
(uncurry (withMultipleChunks ch))
withMultipleChunks ch1 ch2 chunks = go (S.mapM sortAndWrite allChunks)
where
allChunks = S.yield ch1 >> S.yield ch2 >> chunks
sortAndWrite fls = withFilesSort cmp fls (fmap S.fst' . writeSortedData tmpDir)
writeSortedData :: (Binary a, MonadMask m, MonadIO m)
=> FilePath -> Stream (Of a) m r -> m (Of FilePath r)
writeSortedData tmpDir str = do fl <- liftIO newTmpFile
(fl :>) <$> writeBinaryFile fl (encodeStream str)
where
newTmpFile = do (fl, h) <- openBinaryTempFile tmpDir "streaming-sort-chunk"
hClose h
return fl
withFilesSort :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
=> (a -> a -> Ordering) -> [FilePath]
-> (Stream (Of a) n () -> m r) -> m r
withFilesSort cmp files k = mergeContinuations readThenDelete
files
withMerged
where
withMerged = k . interleave cmp . map decodeStream
readThenDelete :: (MonadMask m, MonadIO m, MonadIO n) => FilePath
-> (BS.ByteString n () -> m r) -> m r
readThenDelete fl k = withBinaryFileContents fl k `finally` liftIO (removeFile fl)
mergeContinuations :: (Monad m) => (forall res. a -> (b -> m res) -> m res) -> [a] -> ([b] -> m r) -> m r
mergeContinuations toCont xs cont = go [] xs
where
go bs [] = cont bs
go bs (a:as) = toCont a $ \b -> go (b:bs) as
interleave :: (Monad m) => (a -> a -> Ordering) -> [Stream (Of a) m r] -> Stream (Of a) m ()
interleave cmp streams =
go =<< lift (L.sortBy cmper . catMaybes <$> mapM S.uncons streams)
where
go [] = return ()
go [(a,str)] = S.yield a >> void str
go ((a,str):astrs') = do S.yield a
mastr' <- lift (S.uncons str)
go (addBackIfNonEmpty mastr' astrs')
addBackIfNonEmpty = maybe id (L.insertBy cmper)
cmper = cmp `on` fst
encodeStream :: (Binary a, Monad m) => Stream (Of a) m r -> BS.ByteString m r
encodeStream = fromChunksLazy . S.map encode
decodeStream :: (Binary a, MonadThrow m) => BS.ByteString m r -> Stream (Of a) m r
decodeStream bs = decoded bs >>= handleResult
where
handleResult (_, bytes, res) = either (lift . throwM . SortDecode bytes) return res
fromChunksLazy :: (Monad m) => Stream (Of BL.ByteString) m r -> BS.ByteString m r
fromChunksLazy = BS.fromChunks . S.concat . S.map BL.toChunks
data SortException = SortIO IOException
| SortDecode Int64 String
deriving (Show)
instance Exception SortException