{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings,FlexibleInstances, UndecidableInstances
,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
,GeneralizedNewtypeDeriving #-}
module Transient.Move.Internals where
import Prelude hiding(drop,length)
import Transient.Internals
import Transient.Parse
import Transient.Logged
import Transient.Indeterminism
import Transient.Mailboxes
import Data.Typeable
import Control.Applicative
import System.Random
import Data.String
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BS
import System.Time
import Data.ByteString.Builder
#ifndef ghcjs_HOST_OS
import Network
import Network.URI
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS
import qualified Network.WebSockets.Connection as WS
import Network.WebSockets.Stream hiding(parse)
import qualified Data.ByteString as B(ByteString)
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import Network.Socket.ByteString as SBS(sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk,CI)
import Data.Char
import Data.Aeson
import qualified Data.ByteString.Base64.Lazy as B64
#else
import JavaScript.Web.WebSocket
import qualified JavaScript.Web.MessageEvent as JM
import GHCJS.Prim (JSVal)
import GHCJS.Marshal(fromJSValUnchecked)
import qualified Data.JSString as JS
import JavaScript.Web.MessageEvent.Internal
import GHCJS.Foreign.Callback.Internal (Callback(..))
import qualified GHCJS.Foreign.Callback as CB
#endif
import Control.Monad.State
import Control.Monad.Fail
import Control.Exception hiding (onException,try)
import Data.Maybe
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (partition,union,(\\),length, nubBy,isPrefixOf)
import Data.IORef
import Control.Concurrent
import System.Mem.StableName
import Unsafe.Coerce
import System.Environment
pk= BS.pack
up= BS.unpack
#ifdef ghcjs_HOST_OS
type HostName = String
newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
#endif
data Node= Node{ nodeHost :: HostName
, nodePort :: Int
, connection :: Maybe (MVar Pool)
, nodeServices :: [Service]
}
deriving (Typeable)
instance Loggable Node
instance Ord Node where
compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)
newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (AdditionalOperators,Functor,
#ifdef MIN_VERSION_base(4,11,0)
Semigroup,
#endif
Monoid ,Applicative, Alternative,MonadFail, Monad, Num, Fractional, MonadState EventF)
type UPassword= BS.ByteString
type Host= BS.ByteString
type ProxyData= (UPassword,Host,Int)
rHTTPProxy= unsafePerformIO $ newIORef (Nothing :: Maybe (Maybe ProxyData, Maybe ProxyData))
getHTTProxyParams t= do
mp <- liftIO $ readIORef rHTTPProxy
case mp of
Just (p1,p2) -> return $ if t then p2 else p1
Nothing -> do
ps <- (,) <$> getp "http" <*> getp "https"
liftIO $ writeIORef rHTTPProxy $ Just ps
getHTTProxyParams t
where
getp t= do
let var= t ++ "_proxy"
p<- liftIO $ lookupEnv var
tr ("proxy",p)
case p of
Nothing -> return Nothing
Just hp -> do
pr<- withParseString (BS.pack hp) $ do
tDropUntilToken (BS.pack "//") <|> return ()
(,,) <$> getUPass <*> tTakeWhile' (/=':') <*> int
return $ Just pr
getUPass= tTakeUntilToken "@" <|> return ""
runCloud :: Cloud a -> TransIO a
runCloud x= do
closRemote <- getState <|> return (Closure 0)
runCloud' x <*** setState closRemote
#ifndef ghcjs_HOST_OS
{-# NOINLINE tlsHooks #-}
tlsHooks ::IORef (Bool
,SData -> BS.ByteString -> IO ()
,SData -> IO B.ByteString
,NS.Socket -> BS.ByteString -> TransIO ()
,String -> NS.Socket -> BS.ByteString -> TransIO ()
,SData -> IO ())
tlsHooks= unsafePerformIO $ newIORef
( False
, notneeded
, notneeded
, \_ i -> tlsNotSupported i
, \_ _ _-> return()
, \_ -> return())
where
notneeded= error "TLS hook function called"
tlsNotSupported input = do
if ((not $ BL.null input) && BL.head input == 0x16)
then do
conn <- getSData
sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\r\nContent-Length: 0\nConnection: close\r\n\r\n"
else return ()
(isTLSIncluded,sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake,tlsClose)= unsafePerformIO $ readIORef tlsHooks
#endif
local :: Loggable a => TransIO a -> Cloud a
local = Cloud . logged
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO (Cloud mx)= keep mx
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO' (Cloud mx)= keep' mx
onAll :: TransIO a -> Cloud a
onAll = Cloud
lazy :: TransIO a -> Cloud a
lazy mx= onAll $ do
st <- get
return $ fromJust $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst
fixRemote mx= do
r <- lazy mx
fixClosure
return r
fixClosure= atRemote $ local $ async $ return ()
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ do
closRemote <- getState <|> return (Closure 0 )
(fixRemote :: Maybe LocalFixData) <- getData
logged mx <*** do setData closRemote
when (isJust fixRemote) $ setState (fromJust fixRemote)
loggedc' :: Loggable a => Cloud a -> Cloud a
loggedc' (Cloud mx)= Cloud $ do
fixRemote :: Maybe LocalFixData <- getData
logged mx <*** (when (isJust fixRemote) $ setState (fromJust fixRemote))
lliftIO :: Loggable a => IO a -> Cloud a
lliftIO= local . liftIO
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO
beamTo :: Node -> Cloud ()
beamTo node = wormhole node teleport
forkTo :: Node -> Cloud ()
forkTo node= beamTo node <|> return()
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node remoteProc= wormhole' node $ atRemote remoteProc
#ifndef ghcjs_HOST_OS
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
callTo' node remoteProc= do
mynode <- local $ getNodes >>= return . Prelude.head
beamTo node
r <- remoteProc
beamTo mynode
return r
#endif
atRemote :: Loggable a => Cloud a -> Cloud a
atRemote proc= loggedc' $ do
teleport
modify $ \s -> s{execMode= if execMode s== Parallel then Parallel else Serial}
r <- loggedc $ proc <** modify (\s -> s{execMode= Remote})
teleport
return r
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo
single :: TransIO a -> TransIO a
single f= do
cutExceptions
Connection{closChildren=rmap} <- getSData <|> error "single: only works within a connection"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
case M.lookup id mapth of
Just tv -> liftIO $ killBranch' tv
Nothing -> return ()
tv <- get
f <** do
id <- liftIO $ makeStableName f >>= return . hashStableName
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
unique :: TransIO a -> TransIO a
unique f= do
Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
let mx = M.lookup id mapth
case mx of
Just _ -> empty
Nothing -> do
tv <- get
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
f
wormhole node comp= do
onAll $ onException $ \(e@(ConnectionError "no connection" nodeerr)) ->
if nodeerr== node then do runCloud' $ findRelay node ; continue else return ()
wormhole' node comp
where
findRelay node = do
relaynode <- exploreNetUntil $ do
nodes <- local getNodes
let thenode= filter (== node) nodes
if not (null thenode) && isJust(connection $ Prelude.head thenode ) then return $ Prelude.head nodes else empty
local $ addNodes [node{nodeServices= [[("relay", show (nodeHost (relaynode :: Node),nodePort relaynode ))]]}]
newtype DialogInWormholeInitiated= DialogInWormholeInitiated Bool
wormhole' :: Loggable a => Node -> Cloud a -> Cloud a
wormhole' node (Cloud comp) = local $ Transient $ do
moldconn <- getData :: StateIO (Maybe Connection)
mclosure <- getData :: StateIO (Maybe Closure)
mdialog <- getData :: StateIO (Maybe ( Ref DialogInWormholeInitiated))
labelState $ "wormhole" <> BC.pack (show node)
log <- getLog
if not $ recover log
then runTrans $ (do
conn <- mconnect node
liftIO $ writeIORef (remoteNode conn) $ Just node
setData conn{synchronous= maybe False id $ fmap synchronous moldconn, calling= True}
setState $ Closure 0
newRState $ DialogInWormholeInitiated False
comp )
<*** do
when (isJust moldconn) . setData $ fromJust moldconn
when (isJust mclosure) . setData $ fromJust mclosure
when (isJust mdialog) . setData $ fromJust mdialog
else do
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
setData $ conn{calling= False}
runTrans $ comp
<*** do when (isJust mclosure) . setData $ fromJust mclosure
data CloudException = CloudException Node IdClosure String deriving (Typeable, Show, Read)
instance Exception CloudException
setSynchronous :: Bool -> TransIO ()
setSynchronous sync= do
modifyData'(\con -> con{synchronous=sync}) (error "setSynchronous: no communication data")
return ()
syncStream :: Cloud a -> Cloud a
syncStream proc= do
sync <- local $ do
Connection{synchronous= synchronous} <- modifyData'(\con -> con{synchronous=True}) err
return synchronous
Cloud $ threads 0 $ runCloud' proc <*** modifyData'(\con -> con{synchronous=sync}) err
where err= error "syncStream: no communication data"
teleport :: Cloud ()
teleport = do
modify $ \s -> s{execMode=if execMode s == Remote then Remote else Parallel}
local $ do
conn@Connection{connData=contype, synchronous=synchronous, localClosures= localClosures} <- getData
`onNothing` error "teleport: No connection defined: use wormhole"
Transient $ do
labelState "teleport"
cont <- get
log <- getLog
if not $ recover log
then do
ty <- liftIO $ readIORef contype
case ty of
Just Self -> runTrans $ do
modify $ \s -> s{execMode= Parallel}
abduce
liftIO $ do
remote <- readIORef $ remoteNode conn
writeIORef (myNode conn) $ fromMaybe (error "teleport: no connection?") remote
_ -> do
DialogInWormholeInitiated initiated <- getRData `onNothing` return(DialogInWormholeInitiated True)
(closRemote',tosend) <- if initiated
then do
Closure closRemote <- getData `onNothing` return (Closure 0 )
tr ("REMOTE CLOSURE",closRemote)
return (closRemote, buildLog log)
else do
mfix <- getData
tr ("mfix", mfix)
let droplog Nothing= return (0, fulLog log)
droplog (Just localfix)= do
sent <- liftIO $ atomicModifyIORef' (fixedConnections localfix) $ \list -> do
let n= idConn conn
if n `Prelude.elem` list
then (list, True)
else (n:list,False)
tr ("LOCALFIXXXXXXXXXX",localfix)
let dropped= lazyByteString $ BS.drop (fromIntegral $ lengthFix localfix) $ toLazyByteString $ fulLog log
if sent then return (closure localfix, dropped)
else if isService localfix then return (0, dropped)
else droplog $ prevFix localfix
droplog mfix
let closLocal= hashClosure log
map <- liftIO $ readMVar localClosures
let mr = M.lookup closLocal map
pair <- case mr of
Just (chs,clos,mvar,_) -> do
when synchronous $ liftIO $ takeMVar mvar
return (children $ cont,clos,mvar,cont)
_ -> liftIO $ do mv <- newEmptyMVar; return ( children $ fromJust $ parent cont,closRemote',mv,cont)
liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert closLocal pair map
runTrans $ msend conn $ SMore $ ClosureData closRemote' closLocal tosend
return Nothing
else return $ Just ()
localFix= localFixServ False False
type ConnectionId= Int
type HasClosed= Bool
globalFix = unsafePerformIO $ newIORef (M.empty :: M.Map ConnectionId (HasClosed,[(IdClosure, IORef [ConnectionId ])]))
data LocalFixData= LocalFixData{ isService :: Bool
, lengthFix :: Int
, closure :: Int
, fixedConnections :: IORef [ConnectionId]
, prevFix :: Maybe LocalFixData} deriving Show
instance Show a => Show (IORef a) where
show r= show $ unsafePerformIO $ readIORef r
localFixServ isService isGlobal= Cloud $ noTrans $ do
log <- getLog
Connection{..} <- getData `onNothing` error "teleport: No connection set: use initNode"
if recover log
then do
cont <- get
mv <- liftIO newEmptyMVar
liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert (hashClosure log) ( children $ fromJust $ parent cont,0,mv,cont) map
else do
mprevFix <- getData
ref <- liftIO $ if not $ isGlobal then newIORef [] else do
map <- readIORef globalFix
return $ do
(_,l) <- M.lookup idConn map
lookup (hashClosure log) l
`onNothing` do
ref <- newIORef []
modifyIORef globalFix $ \map ->
let (closed,l)= fromMaybe (False,[]) $ M.lookup idConn map
in M.insert idConn (closed,(hashClosure log, ref):l) map
return ref
mmprevFix <- liftIO $ readIORef ref >>= \l -> return $ if Prelude.null l then Nothing else mprevFix
let newfix =LocalFixData{ isService = isService
, lengthFix = fromIntegral $ BS.length $ toLazyByteString $ fulLog log
, closure = hashClosure log
, fixedConnections = ref
, prevFix = mmprevFix}
setState newfix
!> ("SET LOCALFIX", newfix )
reportBack :: TransIO ()
reportBack= onException $ \(e :: SomeException) -> do
conn <- getData `onNothing` error "reportBack: No connection defined: use wormhole"
Closure closRemote <- getData `onNothing` error "teleport: no closRemote"
node <- getMyNode
let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e
msend conn msg !> "MSEND"
copyData def = do
r <- local getSData <|> return def
onAll $ setData r
return r
clustered :: Loggable a => Cloud a -> Cloud a
clustered proc= callNodes (<|>) empty proc
mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
mclustered proc= callNodes (<>) mempty proc
callNodes op init proc= loggedc' $ do
nodes <- local getEqualNodes
callNodes' nodes op init proc
callNodes' nodes op init proc= loggedc' $ Prelude.foldr op init $ Prelude.map (\node -> runAt node proc) nodes
#ifndef ghcjs_HOST_OS
sendRawRecover con r= do
c <- liftIO $ readIORef $ connData con
con' <- case c of
Nothing -> do
tr "CLOSED CON"
n <- liftIO $ readIORef $ remoteNode con
case n of
Nothing -> error "connection closed by caller"
Just node -> do
r <- mconnect' node
return r
Just _ -> return con
sendRaw con' r
`whileException` \(SomeException _)->
liftIO$ writeIORef (connData con) Nothing
sendRaw con r= do
let blocked= isBlocked con
c <- liftIO $ readIORef $ connData con
liftIO $ modifyMVar_ blocked $ const $ do
tr ("sendRaw",r)
case c of
Just (Node2Web sconn ) -> liftIO $ WS.sendTextData sconn r
Just (Node2Node _ sock _) ->
SBS.sendMany sock (BL.toChunks r )
Just (TLSNode2Node ctx ) ->
sendTLSData ctx r
_ -> error "No connection stablished"
TOD time _ <- getClockTime
return $ Just time
#else
sendRaw con r= do
c <- liftIO $ readIORef $ connData con
case c of
Just (Web2Node sconn) ->
JavaScript.Web.WebSocket.send r sconn
_ -> error "No connection stablished"
#endif
data NodeMSG= ClosureData IdClosure IdClosure Builder deriving (Read, Show)
instance Loggable NodeMSG where
serialize (ClosureData clos clos' build)= intDec clos <> "/" <> intDec clos' <> "/" <> build
deserialize= ClosureData <$> (int <* tChar '/') <*> (int <* tChar '/') <*> restOfIt
where
restOfIt= lazyByteString <$> giveParseString
instance Show Builder where
show b= BS.unpack $ toLazyByteString b
instance Read Builder where
readsPrec _ str= [(lazyByteString $ BS.pack $ read str,"")]
instance Loggable a => Loggable (StreamData a) where
serialize (SMore x)= byteString "SMore/" <> serialize x
serialize (SLast x)= byteString "SLast/" <> serialize x
serialize SDone= byteString "SDone"
serialize (SError e)= byteString "SError/" <> serialize e
deserialize = smore <|> slast <|> sdone <|> serror
where
smore = symbol "SMore/" >> (SMore <$> deserialize)
slast = symbol "SLast/" >> (SLast <$> deserialize)
sdone = symbol "SDone" >> return SDone
serror= symbol "SError/" >> (SError <$> deserialize)
msend :: Connection -> StreamData NodeMSG -> TransIO ()
#ifndef ghcjs_HOST_OS
msend con r= do
tr ("MSEND --------->------>", r)
c <- liftIO $ readIORef $ connData con
con' <- case c of
Nothing -> do
tr "CLOSED CON"
n <- liftIO $ readIORef $ remoteNode con
case n of
Nothing -> error "connection closed by caller"
Just node -> do
r <- mconnect node
return r
Just _ -> return con
let blocked= isBlocked con'
c <- liftIO $ readIORef $ connData con'
let bs = toLazyByteString $ serialize r
do
case c of
Just (TLSNode2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "TLSSSSSSSSSSS SEND"
sendTLSData ctx $ toLazyByteString $ int64Dec $ BS.length bs
sendTLSData ctx bs
TOD time _ <- getClockTime
return $ Just time
Just (Node2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "NODE2NODE SEND"
SBSL.send sock $ toLazyByteString $ int64Dec $ BS.length bs
SBSL.sendAll sock bs
TOD time _ <- getClockTime
return $ Just time
Just (HTTP2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "HTTP2NODE SEND"
SBSL.sendAll sock $ bs <> "\r\n"
TOD time _ <- getClockTime
return $ Just time
Just (HTTPS2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "HTTPS2NODE SEND"
sendTLSData ctx $ bs <> "\r\n"
TOD time _ <- getClockTime
return $ Just time
Just (Node2Web sconn) -> do
tr "NODE2WEB"
liftIO $ do
let bs = toLazyByteString $ serialize r
tr "ANTES SEND"
WS.sendTextData sconn bs
tr "AFTER SEND"
Just Self -> error "connection to the same node shouldn't happen, file a bug please"
_ -> error "msend out of connection context: use wormhole to connect"
#else
msend con r= do
tr ("MSEND --------->------>", r)
let blocked= isBlocked con
c <- liftIO $ readIORef $ connData con
case c of
Just (Web2Node sconn) -> liftIO $ do
tr "MSEND BROWSER"
let bs = toLazyByteString $ serialize r
JavaScript.Web.WebSocket.send (JS.pack $ BS.unpack bs) sconn
tr "AFTER MSEND"
_ -> error "msend out of connection context: use wormhole to connect"
#endif
#ifdef ghcjs_HOST_OS
mread con= do
labelState "mread"
sconn <- liftIO $ readIORef $ connData con
case sconn of
Just (Web2Node sconn) -> wsRead sconn
Nothing -> error "connection not opened"
wsRead :: Loggable a => WebSocket -> TransIO a
wsRead ws= do
dat <- react (hsonmessage ws) (return ())
tr "received"
case JM.getData dat of
JM.StringData ( text) -> do
setParseString $ BS.pack . JS.unpack $ text
tr ("Browser webSocket read", text) !> "<------<----<----<------"
deserialize
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
wsOpen :: JS.JSString -> TransIO WebSocket
wsOpen url= do
ws <- liftIO $ js_createDefault url
react (hsopen ws) (return ())
return ws
foreign import javascript safe
"window.location.hostname"
js_hostname :: JSVal
foreign import javascript safe
"window.location.pathname"
js_pathname :: JSVal
foreign import javascript safe
"window.location.protocol"
js_protocol :: JSVal
foreign import javascript safe
"(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"
js_port :: JSVal
foreign import javascript safe
"$1.onmessage =$2;"
js_onmessage :: WebSocket -> JSVal -> IO ()
getWebServerNode :: TransIO Node
getWebServerNode = liftIO $ do
h <- fromJSValUnchecked js_hostname
p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int)
createNode h p
hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()
hsonmessage ws hscb= do
cb <- makeCallback1 MessageEvent hscb
js_onmessage ws cb
foreign import javascript safe
"$1.onopen =$2;"
js_open :: WebSocket -> JSVal -> IO ()
foreign import javascript safe
"$1.readyState"
js_readystate :: WebSocket -> Int
newtype OpenEvent = OpenEvent JSVal deriving Typeable
hsopen :: WebSocket -> (OpenEvent ->IO()) -> IO ()
hsopen ws hscb= do
cb <- makeCallback1 OpenEvent hscb
js_open ws cb
makeCallback1 :: (JSVal -> a) -> (a -> IO ()) -> IO JSVal
makeCallback1 f g = do
Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)
return cb
makeCallback f = do
Callback cb <- CB.syncCallback CB.ContinueAsync f
return cb
foreign import javascript safe
"new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket
#else
mread conn= do
cc <- liftIO $ readIORef $ connData conn
case cc of
Just (Node2Node _ _ _) -> parallelReadHandler conn
Just (TLSNode2Node _ ) -> parallelReadHandler conn
receiveData' a b= NWS.receiveData b
many' p= p <|> many' p
parallelReadHandler :: Loggable a => Connection -> TransIO (StreamData a)
parallelReadHandler conn= do
onException $ \(e:: IOError) -> empty
many' extractPacket
where
extractPacket= do
len <- integer <|> (do s <- getParseBuffer; if BS.null s then empty else error $ show $ ("malformed data received: expected Int, received: ", BS.take 10 s))
str <- tTake (fromIntegral len)
tr ("MREAD <-------<-------",str)
TOD t _ <- liftIO $ getClockTime
liftIO $ modifyMVar_ (isBlocked conn) $ const $ Just <$> return t
abduce
setParseString str
deserialize
getWebServerNode :: TransIO Node
getWebServerNode = getNodes >>= return . Prelude.head
#endif
mclose :: MonadIO m => Connection -> m ()
#ifndef ghcjs_HOST_OS
mclose con= do
c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c)
case c of
Just (TLSNode2Node ctx) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ tlsClose ctx
Just (Node2Node _ sock _ ) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ NS.close sock !> "SOCKET CLOSE"
Just (Node2Web sconn ) -> liftIO $ WS.sendClose sconn ("closemsg" :: BS.ByteString) !> "WEBSOCkET CLOSE"
_ -> return()
cleanConnectionData con
#else
mclose con= do
c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c)
case c of
Just (Web2Node sconn)->
liftIO $ JavaScript.Web.WebSocket.close Nothing Nothing sconn
#endif
#ifndef ghcjs_HOST_OS
rcookie= unsafePerformIO $ newIORef $ BS.pack "cookie1"
#endif
conSection= unsafePerformIO $ newMVar ()
exclusiveCon mx= do
liftIO $ takeMVar conSection
r <- mx
liftIO $ putMVar conSection ()
return r
mconnect' :: Node -> TransIO Connection
mconnect' node'= exclusiveCon $ do
conn <- do
node <- fixNode node'
nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect1 node
(node'@(Node _ _ pool _):_) -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of
(handle:_) -> do
c <- liftIO $ readIORef $ connData handle
if isNothing c
then mconnect1 node
else return handle
!> ("REUSED!", nodeHost node, nodePort node)
_ -> do
delNodes [node]
r <- mconnect1 node
tr "after mconnect1"
return r
setState conn
return conn
#ifndef ghcjs_HOST_OS
mconnect1 (node@(Node host port _ services ))= do
return () !> ("MCONNECT1",host,port,isTLSIncluded)
let types=mapMaybe (lookup "type") services
needTLS <- if "HTTP" `elem` types then return False
else if "HTTPS" `elem` types then
if not isTLSIncluded then error "no 'initTLS'. This is necessary for https connections. Please include it: main= do{ initTLS; keep ...."
else return True
else return isTLSIncluded
tr ("NEED TLS",needTLS)
(conn,parseContext) <- checkSelf node <|>
timeout 10000000 (connectNode2Node host port needTLS) <|>
timeout 1000000 (connectWebSockets host port needTLS) <|>
timeout 1000000 (checkRelay needTLS) <|>
(throw $ ConnectionError "no connection" node)
setState conn
modify $ \s -> s{execMode=Serial,parseContext= parseContext}
liftIO $ writeIORef (remoteNode conn) $ Just node
liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn]
addNodes [node]
return conn
where
checkSelf node= do
tr "CHECKSELF"
node' <- getMyNodeMaybe
guard $ isJust (connection node')
v <- liftIO $ readMVar (fromJust $ connection node')
tr "IN CHECKSELF"
if node /= node' || null v
then empty
else do
conn<- case connection node of
Nothing -> error "checkSelf error"
Just ref -> do
rnode <- liftIO $ newIORef node'
cdata <- liftIO $ newIORef $ Just Self
conn <- defConnection >>= \c -> return c{myNode= rnode, connData= cdata}
liftIO $ withMVar ref $ const $ return [conn]
return conn
return (conn, noParseContext)
timeout t proc= do
r <- collect' 1 t proc
case r of
[] -> empty !> "TIMEOUT EMPTY"
mr:_ -> case mr of
Nothing -> throw $ ConnectionError "Bad cookie" node
Just r -> return r
checkRelay needTLS= do
case lookup "relay" $ map head (nodeServices node) of
Nothing -> empty
Just relayinfo -> do
let (h,p)= read relayinfo
connectWebSockets1 h p ("/relay/" ++ h ++ "/" ++ show p ++ "/") needTLS
connectSockTLS host port needTLS= do
return () !> "connectSockTLS"
let size=8192
c@Connection{myNode=my,connData=rcdata} <- getSData <|> defConnection
tr "BEFORE HANDSHAKE"
sock <- liftIO $ connectTo' size host $ PortNumber $ fromIntegral port
let cdata= (Node2Node u sock (error $ "addr: outgoing connection"))
cdata' <- liftIO $ readIORef rcdata
pcontext <- makeParseContext $ SBSL.recv sock 4096
conn' <- if isNothing cdata'
then do
liftIO $ writeIORef rcdata $ Just cdata
liftIO $ writeIORef (istream c) pcontext
return c !> "RECONNECT"
else do
c <- defConnection
rcdata' <- liftIO $ newIORef $ Just cdata
liftIO $ writeIORef (istream c) pcontext
return c{myNode=my,connData= rcdata'} !> "CONNECT"
setData conn'
modify $ \s ->s{execMode=Serial,parseContext=pcontext}
when (isTLSIncluded && needTLS) $ maybeClientTLSHandshake host sock mempty
connectNode2Node host port needTLS= do
tr "NODE 2 NODE"
mproxy <- getHTTProxyParams needTLS
let (upass,h',p) = case (mproxy) of
Just p -> p
_ -> ("",BS.pack host,port)
h= BS.unpack h'
if (isLocal host || h == host && p == port) then
connectSockTLS h p needTLS
else do
let connect =
"CONNECT "<> pk host <> ":" <> pk (show port) <> " HTTP/1.1\r\n" <>
"Host: "<> pk host <> ":" <> BS.pack (show port) <> "\r\n" <>
"User-Agent: transient\r\n" <>
(if BS.null upass then "" else "Proxy-Authorization: Basic " <> (B64.encode upass)<> "\r\n") <>
"Proxy-Connection: Keep-Alive\r\n" <>
"\r\n"
tr connect
connectSockTLS h p False
conn <- getSData <|> error "mconnect: no connection data"
sendRaw conn $ connect
first@(vers,code,_) <- getFirstLineResp
tr ("PROXY RESPONSE=",first)
guard (BC.head code== '2')
<|> do
headers <- getHeaders
Raw body <- parseBody headers
error $ show (headers,body)
when (isTLSIncluded && needTLS) $ do
Just(Node2Node{socket=sock}) <- liftIO $ readIORef $ connData conn
maybeClientTLSHandshake h sock mempty
conn <- getSData <|> error "mconnect: no connection data"
parseContext <- gets parseContext
return $ Just(conn,parseContext)
connectWebSockets host port needTLS= connectWebSockets1 host port "/" needTLS
connectWebSockets1 host port verb needTLS= do
tr "WEBSOCKETS"
connectSockTLS host port needTLS
never <- liftIO $ newEmptyMVar :: TransIO (MVar ())
conn <- getSData <|> error "connectWebSockets: no connection"
stream <- liftIO $ makeWSStreamFromConn conn
co <- liftIO $ readIORef rcookie
let hostport= host++(':': show port)
headers= [("cookie", "cookie=" <> BS.toStrict co)]
onException $ \(NWS.CloseRequest code msg) -> do
conn <- getSData
cleanConnectionData conn
empty
wscon <- react (NWS.runClientWithStream stream hostport verb
WS.defaultConnectionOptions headers)
(takeMVar never)
msg <- liftIO $ WS.receiveData wscon
tr "WS RECEIVED"
case msg of
("OK" :: BS.ByteString) -> do
tr "return connectWebSockets"
cdata <- liftIO $ newIORef $ Just $ (Node2Web wscon)
return $ Just (conn{connData= cdata}, noParseContext)
_ -> do tr "RECEIVED CLOSE"; liftIO $ WS.sendClose wscon ("" ::BS.ByteString); return Nothing
isLocal:: String ->Bool
isLocal host= host=="localhost" ||
(or $ map (flip isPrefixOf host)
["0.0","10.","100", "127", "169", "172", "192", "198", "203"]) ||
isAlphaNum (head host) && not ('.' `elem` host)
makeParseContext rec= liftIO $ do
done <- newIORef False
let receive= liftIO $ do
d <- readIORef done
if d then return SDone
else (do
r<- rec
if BS.null r then liftIO $ do writeIORef done True; return SDone
else return $ SMore r)
`catch` \(SomeException e) -> do liftIO $ writeIORef done True
putStr "Parse: "
print e
return SDone
return $ ParseContext receive mempty done
#else
mconnect1 (node@(Node host port (Just pool) _))= do
conn <- getSData <|> error "connect: listen not set for this node"
if nodeHost node== "webnode"
then do
liftIO $ writeIORef (connData conn) $ Just Self
return conn
else do
ws <- connectToWS host $ PortNumber $ fromIntegral port
liftIO $ writeIORef (connData conn) $ Just (Web2Node ws)
let parseContext =
ParseContext (error "parsecontext not available in the browser")
"" (unsafePerformIO $ newIORef False)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist
return conn'
#endif
u= undefined
data ConnectionError= ConnectionError String Node deriving (Show , Read)
instance Exception ConnectionError
mconnect node'= do
node <- fixNode node'
nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect2 node
[node'@(Node _ _ pool _)] -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of
(handle:_) -> do
c <- liftIO $ readIORef $ connData handle
if isNothing c
then mconnect2 node
else return handle
_ -> do
delNodes [node]
mconnect2 node
where
mconnect2 node= do
conn <- mconnect1 node
cd <- liftIO $ readIORef $ connData conn
case cd of
#ifndef ghcjs_HOST_OS
Just Self -> return()
Just (TLSNode2Node _ ) -> do
checkCookie conn
watchConnection conn node
Just (Node2Node _ _ _) -> do
checkCookie conn
watchConnection conn node
#endif
_ -> watchConnection conn node
return conn
#ifndef ghcjs_HOST_OS
checkCookie conn= do
cookie <- liftIO $ readIORef rcookie
mynode <- getMyNode
sendRaw conn $ "CLOS " <> cookie <>
" b \r\nHost: " <> BS.pack (nodeHost mynode) <> "\r\nPort: " <> BS.pack (show $ nodePort mynode) <> "\r\n\r\n"
r <- liftIO $ readFrom conn
case r of
"OK" -> return ()
_ -> do
let Connection{connData=rcdata}= conn
cdata <- liftIO $ readIORef rcdata
case cdata of
Just(Node2Node _ s _) -> liftIO $ NS.close s
Just(TLSNode2Node c) -> liftIO $ tlsClose c
empty
#endif
watchConnection conn node= do
liftIO $ atomicModifyIORef connectionList $ \m -> (conn:m,())
parseContext <- gets parseContext
:: TransIO ParseContext
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
putMailbox ((conn',parseContext,node) :: (Connection,ParseContext,Node))
liftIO $ threadDelay 100000
#ifndef ghcjs_HOST_OS
close1 sock= do
NS.setSocketOption sock NS.Linger 0
NS.close sock
connectTo' bufSize hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(NS.socket NS.AF_INET NS.Stream proto)
(NS.close)
(\sock -> do
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
return sock)
#else
connectToWS h (PortNumber p) = do
protocol <- liftIO $ fromJSValUnchecked js_protocol
pathname <- liftIO $ fromJSValUnchecked js_pathname
tr ("PAHT",pathname)
let ps = case (protocol :: JS.JSString)of "http:" -> "ws://"; "https:" -> "wss://"
wsOpen $ JS.pack $ ps++ h++ ":"++ show p ++ pathname
#endif
type Blocked= MVar (Maybe Integer)
type BuffSize = Int
data ConnectionData=
#ifndef ghcjs_HOST_OS
Node2Node{port :: PortID
,socket ::Socket
,sockAddr :: NS.SockAddr
}
| TLSNode2Node{tlscontext :: SData}
| HTTPS2Node{tlscontext :: SData}
| Node2Web{webSocket :: WS.Connection}
| HTTP2Node{port :: PortID
,socket ::Socket
,sockAddr :: NS.SockAddr}
| Self
#else
Self
| Web2Node{webSocket :: WebSocket}
#endif
data Connection= Connection{idConn :: Int
,myNode :: IORef Node
,remoteNode :: IORef (Maybe Node)
,connData :: IORef (Maybe ConnectionData)
,istream :: IORef ParseContext
,bufferSize :: BuffSize
,isBlocked :: Blocked
,calling :: Bool
,synchronous :: Bool
,localClosures :: MVar (M.Map IdClosure (MVar[EventF],IdClosure, MVar (),EventF))
,closChildren :: IORef (M.Map Int EventF)}
deriving Typeable
connectionList :: IORef [Connection]
connectionList= unsafePerformIO $ newIORef []
defConnection :: TransIO Connection
noParseContext= let err= error "parseContext not set" in
ParseContext err err err
defConnection = do
idc <- genGlobalId
liftIO $ do
my <- createNode "localhost" 0 >>= newIORef
x <- newMVar Nothing
y <- newMVar M.empty
ref <- newIORef Nothing
z <- newIORef M.empty
noconn <- newIORef Nothing
np <- newIORef noParseContext
return $ Connection idc my ref noconn np 8192
x False False y z
#ifndef ghcjs_HOST_OS
setBuffSize :: Int -> TransIO ()
setBuffSize size= do
conn<- getData `onNothing` (defConnection !> "DEFF3")
setData $ conn{bufferSize= size}
getBuffSize=
(do getSData >>= return . bufferSize) <|> return 8192
listen :: Node -> Cloud ()
listen (node@(Node _ port _ _ )) = onAll $ do
labelState "listen"
onException $ \(ConnectionError msg node) -> empty
fork connectionTimeouts
fork loopClosures
setData $ Log{recover=False, buildLog= mempty, fulLog= mempty, lengthFull= 0, hashClosure= 0}
conn' <- getSData <|> defConnection
chs <- liftIO $ newIORef M.empty
cdata <- liftIO $ newIORef $ Just Self
let conn= conn'{connData=cdata,closChildren=chs}
pool <- liftIO $ newMVar [conn]
let node'= node{connection=Just pool}
liftIO $ writeIORef (myNode conn) node'
setData conn
liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]
addNodes [node']
setRState(JobGroup M.empty)
ex <- exceptionPoint :: TransIO (BackPoint SomeException)
setData ex
mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
execLog mlog
tr "END LISTEN"
listenNew port conn'= do
sock <- liftIO $ listenOn $ PortNumber port
liftIO $ do
let bufSize= bufferSize conn'
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
liftIO $ do putStr "Connected to port: "; print port
(sock,addr) <- waitEvents $ NS.accept sock
chs <- liftIO $ newIORef M.empty
noNode <- liftIO $ newIORef Nothing
id1 <- genGlobalId
let conn= conn'{idConn=id1,closChildren=chs, remoteNode= noNode}
input <- liftIO $ SBSL.getContents sock
let nod = unsafePerformIO $ liftIO $ createNode "incoming connection" 0 in
modify $ \s -> s{execMode=Serial,parseContext= (ParseContext
(liftIO $ NS.close sock >> throw (ConnectionError "connection closed" nod))
input (unsafePerformIO $ newIORef False)
::ParseContext )}
cdata <- liftIO $ newIORef $ Just (Node2Node (PortNumber port) sock addr)
let conn'= conn{connData=cdata}
setState conn'
liftIO $ atomicModifyIORef connectionList $ \m -> (conn': m,())
maybeTLSServerHandshake sock input
firstLine@(method, uri, vers) <- getFirstLine
headers <- getHeaders
setState $ HTTPHeaders firstLine headers
case (method, uri) of
("CLOS", hisCookie) -> do
conn <- getSData
tr "CONNECTING"
let host = BC.unpack $ fromMaybe (error "no host in header")$ lookup "Host" headers
port = read $ BC.unpack $ fromMaybe (error "no port in header")$ lookup "Port" headers
remNode' <- liftIO $ createNode host port
rc <- liftIO $ newMVar [conn]
let remNode= remNode'{connection= Just rc}
liftIO $ writeIORef (remoteNode conn) $ Just remNode
tr ("ADDED NODE", remNode)
addNodes [remNode]
myCookie <- liftIO $ readIORef rcookie
if BS.toStrict myCookie /= hisCookie
then do
sendRaw conn "NOK"
mclose conn
error "connection attempt with bad cookie"
else do
sendRaw conn "OK"
mread conn
_ -> do
cutBody method headers <|> many' cutHTTPRequest
HTTPHeaders (method,uri,vers) headers <- getState <|> error "HTTP: no headers?"
let uri'= BC.tail $ uriPath uri !> uriPath uri
tr ("uri'", uri')
case BC.span (/= '/') uri' of
("api",_) -> do
let log= exec <> lazyByteString method <> byteString "/" <> byteString (BC.drop 4 uri')
maybeSetHost headers
tr ("HEADERS", headers)
str <- giveParseString <|> error "no api data"
if lookup "Transfer-Encoding" headers == Just "chunked" then error $ "chunked not supported" else do
len <- (read <$> BC.unpack
<$> (Transient $ return (lookup "Content-Length" headers)))
<|> return 0
log' <- case lookup "Content-Type" headers of
Just "application/json" -> do
let toDecode= BS.take len str
setParseString $ BS.take len str
return $ log <> "/" <> lazyByteString toDecode
Just "application/x-www-form-urlencoded" -> do
tr ("POST HEADERS=", BS.take len str)
setParseString $ BS.take len str
return $ log <> lazyByteString ( BS.take len str)
Just x -> do
tr ("POST HEADERS=", BS.take len str)
let str= BS.take len str
return $ log <> lazyByteString str
_ -> return $ log
setParseString $ toLazyByteString log'
return $ SMore $ ClosureData 0 0 log' !> ("APIIIII", log')
("relay",_) -> proxy sock method vers uri'
(h,rest) -> do
if BC.null rest || h== "file" then do
maybeSetHost headers
let uri= if BC.null h || BC.null rest then uri' else BC.tail rest
tr (method,uri)
servePages (method, uri, headers)
conn <- getSData
sconn <- makeWebsocketConnection conn uri headers
rem <- liftIO $ newIORef Nothing
chs <- liftIO $ newIORef M.empty
cls <- liftIO $ newMVar M.empty
cme <- liftIO $ newIORef M.empty
cdata <- liftIO $ newIORef $ Just (Node2Web sconn)
let conn'= conn{connData= cdata
, closChildren=chs,localClosures=cls, remoteNode=rem}
setState conn' !> "WEBSOCKETS CONNECTION"
co <- liftIO $ readIORef rcookie
let receivedCookie= lookup "cookie" headers
tr ("cookie", receivedCookie)
rcookie <- case receivedCookie of
Just str-> Just <$> do
withParseString (BS.fromStrict str) $ do
tDropUntilToken "cookie="
tTakeWhile (not . isSpace)
Nothing -> return Nothing
tr ("RCOOKIE", rcookie)
if rcookie /= Nothing && rcookie /= Just co
then do
node <- getMyNode
tr "SENDINg"
liftIO $ WS.sendClose sconn ("Bad Cookie" :: BS.ByteString) !> "SendClose Bad cookie"
empty
else do
liftIO $ WS.sendTextData sconn ("OK" :: BS.ByteString)
tr "WEBSOCKET"
s <- waitEvents $ receiveData' conn' sconn :: TransIO BS.ByteString
setParseString s
tr ("WEBSOCKET RECEIVED <-----------",s)
deserialize
<|> (return $ SMore (ClosureData 0 0 (exec <> lazyByteString s)))
else do
let uriparsed= BS.pack $ unEscapeString $ BC.unpack uri'
setParseString uriparsed !> ("uriparsed",uriparsed)
remoteClosure <- deserialize :: TransIO Int
tChar '/'
thisClosure <- deserialize :: TransIO Int
tChar '/'
conn <- getSData
liftIO $ atomicModifyIORef' (connData conn) $ \cdata -> case cdata of
Just(Node2Node port sock addr) -> (Just $ HTTP2Node port sock addr,())
Just(TLSNode2Node ctx) -> (Just $ HTTPS2Node ctx,())
s <- giveParseString
cook <- liftIO $ readIORef rcookie
liftIO $ SBSL.sendAll sock $ "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\n"
return $ SMore $ ClosureData remoteClosure thisClosure $ lazyByteString s
where
cutHTTPRequest = do
first@(method,_,_) <- getFirstLine
headers <- getHeaders
setState $ HTTPHeaders first headers
cutBody method headers
cutBody method headers= do
if method == "POST" then
case fmap (read . BC.unpack) $ lookup "Content-Length" headers of
Nothing -> return ()
Just len -> do
str <- tTake (fromIntegral len)
abduce
setParseString str
else abduce
uriPath = BC.dropWhile (/= '/')
split []= []
split ('/':r)= split r
split s=
let (h,t) = Prelude.span (/= '/') s
in h: split t
proxy sclient method vers uri' = do
let (host:port:_)= split $ BC.unpack $ BC.drop 6 uri'
tr ("RELAY TO",host, port)
sserver <- liftIO $ connectTo' 4096 host $ PortNumber $ fromIntegral $ read port
tr "CONNECTED"
rawHeaders <- getRawHeaders
tr ("RAWHEADERS",rawHeaders)
let uri= BS.fromStrict $ let d x= BC.tail $ BC.dropWhile (/= '/') x in d . d $ d uri'
let sent= method <> BS.pack " /"
<> uri
<> BS.cons ' ' vers
<> BS.pack "\r\n"
<> rawHeaders <> BS.pack "\r\n\r\n"
tr ("SENT",sent)
liftIO $ SBSL.send sserver sent
(send sclient sserver <|> send sserver sclient)
`catcht` \(e:: SomeException ) -> liftIO $ do
putStr "Proxy: " >> print e
NS.close sserver
NS.close sclient
empty
empty
where
send f t= async $ mapData f t
mapData from to = do
content <- recv from 4096
tr (" proxy received ", content)
if not $ BC.null content
then sendAll to content >> mapData from to
else finish
where
finish= NS.close from >> NS.close to
maybeSetHost headers= do
setHost <- liftIO $ readIORef rsetHost
when setHost $ do
mnode <- liftIO $ do
let mhost= lookup "Host" headers
case mhost of
Nothing -> return Nothing
Just host -> atomically $ do
nodes <- readTVar nodeList
let (host1,port)= BC.span (/= ':') host
hostnode= (Prelude.head nodes){nodeHost= BC.unpack host1
,nodePort= if BC.null port then 80
else read $ BC.unpack $ BC.tail port}
writeTVar nodeList $ hostnode : Prelude.tail nodes
return $ Just hostnode
when (isJust mnode) $ do
conn <- getState
liftIO $ writeIORef (myNode conn) $fromJust mnode
liftIO $ writeIORef rsetHost False
{-#NOINLINE rsetHost #-}
rsetHost= unsafePerformIO $ newIORef True
noHTTP= onAll $ do
conn <- getState
cdata <- liftIO $ readIORef $ connData conn
case cdata of
Just (HTTPS2Node ctx) -> do
liftIO $ sendTLSData ctx $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n"
liftIO $ tlsClose ctx
Just (HTTP2Node _ sock _) -> do
liftIO $ SBSL.sendAll sock $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n"
liftIO $ NS.close sock
empty
_ -> return ()
#endif
listenResponses :: Loggable a => TransIO (StreamData a)
listenResponses= do
labelState "listen responses"
(conn, parsecontext, node) <- getMailbox :: TransIO (Connection,ParseContext,Node)
labelState . fromString $ "listen from: "++ show node
setData conn
tr ("CONNECTION RECEIVED","listen from: "++ show node)
modify $ \s-> s{execMode=Serial,parseContext = parsecontext}
mread conn
type IdClosure= Int
newtype Closure= Closure IdClosure deriving (Read,Show,Typeable)
type RemoteClosure= (Node, IdClosure)
newtype JobGroup= JobGroup (M.Map BC.ByteString RemoteClosure) deriving Typeable
stopRemoteJob :: BC.ByteString -> Cloud ()
instance Loggable Closure
stopRemoteJob ident = do
resetRemote ident
Closure closr <- local $ getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
tr ("CLOSRRRRRRRR", closr)
fixClosure
local $ do
Closure closr <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
conn <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
remote <- liftIO $ readIORef $ remoteNode conn
return (remote,closr) !> ("REMOTE",remote)
JobGroup map <- getRState <|> return (JobGroup M.empty)
setRState $ JobGroup $ M.insert ident (fromJust remote,closr) map
resetRemote :: BC.ByteString -> Cloud ()
resetRemote ident = do
mj <- local $ do
JobGroup map <- getRState <|> return (JobGroup M.empty)
return $ M.lookup ident map
when (isJust mj) $ do
let (remote,closr)= fromJust mj
runAt remote $ local $ do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
mcont <- liftIO $ modifyMVar localClosures $ \map -> return ( M.delete closr map, M.lookup closr map)
case mcont of
Nothing -> error $ "closure not found: " ++ show closr
Just (_,_,_,cont) -> do
topState >>= showThreads
liftIO $ killBranch' cont
return ()
execLog :: StreamData NodeMSG -> TransIO ()
execLog mlog =Transient $ do
tr "EXECLOG"
case mlog of
SError e -> do
return() !> ("SERROR",e)
case fromException e of
Just (ErrorCall str) -> do
case read str of
(e@(CloudException _ closl err)) -> do
process closl (error "closr: should not be used") (Left e) True
SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing
SMore (ClosureData closl closr log) -> process closl closr (Right log) False
SLast (ClosureData closl closr log) -> process closl closr (Right log) True
where
process :: IdClosure -> IdClosure -> (Either CloudException Builder) -> Bool -> StateIO (Maybe ())
process closl closr mlog deleteClosure= do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then do
case mlog of
Left except -> do
setData emptyLog
tr "Exception received from network 1"
runTrans $ throwt except
empty
Right log -> do
tr ("CLOSURE 0",log)
setData Log{recover= True, buildLog= mempty, fulLog= log, lengthFull= 0, hashClosure= 0}
setState $ Closure closr
setRState $ DialogInWormholeInitiated True
return $ Just ()
else do
mcont <- liftIO $ modifyMVar localClosures
$ \map -> return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
case mcont of
Nothing -> do
node <- liftIO $ readIORef (remoteNode conn) `onNothing` error "mconnect: no remote node?"
let e = "request received for non existent closure. Perhaps the connection was closed by timeout and reopened"
let err= CloudException node closl $ show e
throw err
Just (chs,closLocal, mv,cont) -> do
when deleteClosure $ do
empty
liftIO $ tryPutMVar mv ()
void $ liftIO $ runStateT (case mlog of
Right log -> do
Log{fulLog=fulLog, hashClosure=hashClosure} <- getLog
let nlog= fulLog <> log
setData $ Log{recover= True, buildLog= mempty, fulLog= nlog, lengthFull=error "lengthFull TODO", hashClosure= hashClosure}
setState $ Closure closr
setRState $ DialogInWormholeInitiated True
setParseString $ toLazyByteString log
runContinuation cont ()
Left except -> do
setData emptyLog
tr ("Exception received from the network", except)
runTrans $ throwt except) cont
return Nothing
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
setRState(JobGroup M.empty)
events <- liftIO $ newIORef M.empty
rnode <- liftIO $ newIORef node
conn <- defConnection >>= \c -> return c{myNode=rnode}
liftIO $ atomicModifyIORef connectionList $ \m -> (conn: m,())
setData conn
r <- listenResponses
execLog r
#endif
type Pool= [Connection]
type SKey= String
type SValue= String
type Service= [(SKey, SValue)]
lookup2 key doubleList=
let r= mapMaybe(lookup key ) doubleList
in if null r then Nothing else Just $ head r
filter2 key doubleList= mapMaybe(lookup key ) doubleList
#ifndef ghcjs_HOST_OS
readFrom con= do
cd <- readIORef $ connData con
case cd of
Just(TLSNode2Node ctx) -> recvTLSData ctx
Just(Node2Node _ sock _) -> BS.toStrict <$> loop sock
_ -> error "readFrom error"
where
bufSize= 4098
loop sock = loop1
where
loop1 :: IO BL.ByteString
loop1 = unsafeInterleaveIO $ do
s <- SBS.recv sock bufSize
if BC.length s < bufSize
then return $ BLC.Chunk s mempty
else BLC.Chunk s `liftM` loop1
makeWSStreamFromConn conn= do
tr "WEBSOCKETS request"
let rec= readFrom conn
send= sendRaw conn
makeStream
(do
bs <- rec
return $ if BC.null bs then Nothing else Just bs)
(\mbBl -> case mbBl of
Nothing -> return ()
Just bl -> send bl)
makeWebsocketConnection conn uri headers= liftIO $ do
stream <- makeWSStreamFromConn conn
let
pc = WS.PendingConnection
{ WS.pendingOptions = WS.defaultConnectionOptions
, WS.pendingRequest = NWS.RequestHead uri headers False
, WS.pendingOnAccept = \_ -> return ()
, WS.pendingStream = stream
}
sconn <- WS.acceptRequest pc
WS.forkPingThread sconn 30
return sconn
servePages (method,uri, headers) = do
conn <- getSData <|> error " servePageMode: no connection"
if isWebSocketsReq headers
then return ()
else do
let file= if BC.null uri then "index.html" else uri
mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file) )
`catch` (\(e:: SomeException) -> return Nothing)
case mcontent of
Just content -> do
cook <- liftIO $ readIORef rcookie
liftIO $ sendRaw conn $
"HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\nContent-Length: "
<> BS.pack (show $ BL.length content) <>"\r\n"
<> "Set-Cookie:" <> "cookie=" <> cook
<>"\r\n\r\n" <> content
Nothing ->liftIO $ sendRaw conn $ BS.pack $
"HTTP/1.0 404 Not Found\nContent-Length: 13\r\nConnection: close\r\n\r\nNot Found 404"
empty
api :: TransIO BS.ByteString -> Cloud ()
api w= Cloud $ do
log <- getLog
if not $ recover log then empty else do
HTTPHeaders (_,_,vers) hdrs <- getState <|> error "api: no HTTP headers???"
let closeit= lookup "Connection" hdrs == Just "close"
conn <- getState <|> error "api: Need a connection opened with initNode, listen, simpleWebApp"
let send= sendRaw conn
r <- w
tr ("response",r)
send r
tr (vers, hdrs)
when (vers == http10 ||
BS.isPrefixOf http10 r ||
lookup "Connection" hdrs == Just "close" ||
closeInResponse r)
$ liftIO $ mclose conn
where
closeInResponse r=
let rest= findSubstring "Connection:" r
rest' = BS.dropWhile (==' ') rest
in if BS.isPrefixOf "close" rest' then True else False
where
findSubstring sub str
| BS.null str = str
| BS.isPrefixOf sub str = BS.drop (BS.length sub) str
| otherwise= findSubstring sub (BS.tail str)
http10= "HTTP/1.0"
isWebSocketsReq = not . null
. filter ( (== mk "Sec-WebSocket-Key") . fst)
data HTTPMethod= GET | POST deriving (Read,Show,Typeable,Eq)
instance Loggable HTTPMethod
getFirstLine= (,,) <$> getMethod <*> (BS.toStrict <$> getUri) <*> getVers
where
getMethod= parseString
getUri= parseString
getVers= parseString
getRawHeaders= dropSpaces >> (withGetParseString $ \s -> return $ scan mempty s)
where
scan res str
| "\r\n\r\n" `BS.isPrefixOf` str = (res, BS.drop 4 str)
| otherwise= scan ( BS.snoc res $ BS.head str) $ BS.tail str
type PostParams = [(BS.ByteString, String)]
parsePostUrlEncoded :: TransIO PostParams
parsePostUrlEncoded= do
dropSpaces
many $ (,) <$> param <*> value
where
param= tTakeWhile' ( /= '=') !> "param"
value= unEscapeString <$> BS.unpack <$> tTakeWhile' (/= '&' )
getHeaders = manyTill paramPair (string "\r\n\r\n")
where
paramPair= (,) <$> (mk <$> getParam) <*> getParamValue
getParam= do
dropSpaces
r <- tTakeWhile (\x -> x /= ':' && not (endline x))
if BS.null r || r=="\r" then empty else anyChar >> return (BS.toStrict r)
where
endline c= c== '\r' || c =='\n'
getParamValue= BS.toStrict <$> ( dropSpaces >> tTakeWhile (\x -> not (endline x)))
where
endline c= c== '\r' || c =='\n'
#endif
#ifdef ghcjs_HOST_OS
isBrowserInstance= True
api _= empty
#else
isBrowserInstance= False
#endif
{-# NOINLINE emptyPool #-}
emptyPool :: MonadIO m => m (MVar Pool)
emptyPool= liftIO $ newMVar []
createNodeServ :: HostName -> Int -> [Service] -> IO Node
createNodeServ h p svs= return $ Node h p Nothing svs
createNode :: HostName -> Int -> IO Node
createNode h p= createNodeServ h p []
createWebNode :: IO Node
createWebNode= do
pool <- emptyPool
port <- randomIO
return $ Node "webnode" port (Just pool) [[("webnode","")]]
instance Eq Node where
Node h p _ _ ==Node h' p' _ _= h==h' && p==p'
instance Show Node where
show (Node h p _ servs )= show (h,p, servs)
instance Read Node where
readsPrec n s=
let r= readsPrec n s
in case r of
[] -> []
[((h,p,ss),s')] -> [(Node h p Nothing ss ,s')]
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."
getMyNode :: TransIO Node
getMyNode = do
Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode" :: TransIO Connection
liftIO $ readIORef node
getMyNodeMaybe= do
Connection{myNode= node} <- getSData
liftIO $ readIORef node
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
getEqualNodes = do
nodes <- getNodes
let srv= nodeServices $ Prelude.head nodes
case srv of
[] -> return $ filter (null . nodeServices) nodes
(srv:_) -> return $ filter (\n -> (not $ null $ nodeServices n) && Prelude.head (nodeServices n) == srv ) nodes
getWebNodes :: MonadIO m => m [Node]
getWebNodes = do
nodes <- getNodes
return $ filter ( (==) "webnode" . nodeHost) nodes
matchNodes f = do
nodes <- getNodes
return $ Prelude.map (\n -> filter f $ nodeServices n) nodes
addNodes :: [Node] -> TransIO ()
addNodes nodes= liftIO $ do
nodes' <- mapM fixNode nodes
atomically $ mapM_ insert nodes'
where
insert node= do
prevnodes <- readTVar nodeList
let mn = filter(==node) prevnodes
case mn of
[] -> do tr "NUEVO NODO"; writeTVar nodeList $ (prevnodes) ++ [node]
[n] ->do
let nservices= nubBy (\s s' -> head s== head s') $ nodeServices node++ nodeServices n
writeTVar nodeList $ ((prevnodes) \\ [node]) ++ [n{nodeServices=nservices}]
_ -> error $ "duplicated node: " ++ show node
delNodes nodes= liftIO $ atomically $ do
nodes' <- readTVar nodeList
writeTVar nodeList $ nodes' \\ nodes
fixNode n= case connection n of
Nothing -> do
pool <- emptyPool
return n{connection= Just pool}
Just _ -> return n
setNodes nodes= liftIO $ do
nodes' <- mapM fixNode nodes
atomically $ writeTVar nodeList nodes'
shuffleNodes :: MonadIO m => m [Node]
shuffleNodes= liftIO . atomically $ do
nodes <- readTVar nodeList
let nodes'= Prelude.tail nodes ++ [Prelude.head nodes]
writeTVar nodeList nodes'
return nodes'
addThisNodeToRemote= do
n <- local getMyNode
atRemote $ local $ do
n' <- setConnectionIn n
addNodes [n']
setConnectionIn node=do
conn <- getState <|> error "addThisNodeToRemote: connection not found"
ref <- liftIO $ newMVar [conn]
return node{connection=Just ref}
connect :: Node -> Node -> Cloud ()
#ifndef ghcjs_HOST_OS
connect node remotenode = do
listen node <|> return ()
connect' remotenode
connect' :: Node -> Cloud ()
connect' remotenode= loggedc $ do
nodes <- local getNodes
localIO $ putStr "connecting to: " >> print remotenode
newNodes <- runAt remotenode $ interchange nodes
let toAdd=remotenode:Prelude.tail newNodes
callNodes' nodes (<>) mempty $ local $ do
liftIO $ putStr "New nodes: " >> print toAdd !> "NEWNODES"
addNodes toAdd
where
interchange nodes=
do
newNodes <- local $ do
conn@Connection{remoteNode=rnode} <- getSData <|>
error ("connect': need to be connected to a node: use wormhole/connect/listen")
let newNodes= nodes
callingNode<- fixNode $ Prelude.head newNodes
liftIO $ writeIORef rnode $ Just callingNode
liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn]
return newNodes
oldNodes <- local $ getNodes
mclustered . local $ do
liftIO $ putStrLn "New nodes: " >> print newNodes
addNodes newNodes
localIO $ atomically $ do
nodes <- readTVar nodeList
let nodes'= (Prelude.head nodes){nodeHost=nodeHost remotenode
,nodePort=nodePort remotenode}:Prelude.tail nodes
writeTVar nodeList nodes'
return oldNodes
#else
connect _ _= empty
connect' _ = empty
#endif
#ifndef ghcjs_HOST_OS
instance {-# Overlapping #-} Loggable Value where
serialize= return . lazyByteString =<< encode
deserialize = decodeIt
where
jsElem :: TransIO BS.ByteString
jsElem= dropSpaces >> (jsonObject <|> array <|> atom)
atom= elemString
array= (brackets $ return "[" <> return "{}" <> chainSepBy mappend (return "," <> jsElem) (tChar ',')) <> return "]"
jsonObject= (braces $ return "{" <> chainMany mappend jsElem) <> return "}"
elemString= do
dropSpaces
tTakeWhile (\c -> c /= '}' && c /= ']' )
decodeIt= do
s <- jsElem
tr ("decode",s)
case eitherDecode s !> "DECODE" of
Right x -> return x
Left err -> empty
data HTTPHeaders= HTTPHeaders (BS.ByteString, B.ByteString, BS.ByteString) [(CI BC.ByteString,BC.ByteString)] deriving Show
rawHTTP :: Loggable a => Node -> String -> TransIO a
rawHTTP node restmsg = do
abduce
tr ("***********************rawHTTP",nodeHost node)
mcon <- getData :: TransIO (Maybe Connection)
c <- do
c <- mconnect' node
tr "after mconnect'"
sendRawRecover c $ BS.pack restmsg
c <- getState <|> error "rawHTTP: no connection?"
let blocked= isBlocked c
tr "before blocked"
liftIO $ takeMVar blocked
tr "after blocked"
ctx <- liftIO $ readIORef $ istream c
liftIO $ writeIORef (done ctx) False
modify $ \s -> s{parseContext= ctx}
return c
`while` \c ->do
is <- isTLS c
px <- getHTTProxyParams is
tr ("PX=", px)
(if isJust px then return True else do c <- anyChar ; tPutStr $ BS.singleton c; tr "anyChar"; return True) <|> do
TOD t _ <- liftIO $ getClockTime
liftIO $ putMVar (isBlocked c) $ Just t
liftIO (writeIORef (connData c) Nothing)
mclose c
tr "CONNECTION EXHAUSTED,RETRYING WITH A NEW CONNECTION"
return False
modify $ \s -> s{execMode=Serial}
let blocked= isBlocked c
tr "after send"
first@(vers,code,_) <- getFirstLineResp <|> do
r <- notParsed
error $ "No HTTP header received:\n"++ up r
tr ("FIRST line",first)
headers <- getHeaders
let hdrs= HTTPHeaders first headers
setState hdrs
guard (BC.head code== '2')
<|> do Raw body <- parseBody headers
error $ show (hdrs,body)
result <- parseBody headers
when (vers == http10 ||
lookup "Connection" headers == Just "close" )
$ do
TOD t _ <- liftIO $ getClockTime
liftIO $ putMVar blocked $ Just t
liftIO $ mclose c
liftIO $ takeMVar blocked
return()
ctx <- gets parseContext
liftIO $ writeIORef (istream c) ctx
TOD t _ <- liftIO $ getClockTime
liftIO $ putMVar blocked $ Just t
if (isJust mcon) then setData (fromJust mcon) else delData c
return result
where
isTLS c= liftIO $ do
cdata <- readIORef $ connData c
case cdata of
Just(TLSNode2Node _) -> return True
_ -> return False
while act fix= do r <- act; b <- fix r; if b then return r else act
parseBody headers= case lookup "Transfer-Encoding" headers of
Just "chunked" -> dechunk |- deserialize
_ -> case fmap (read . BC.unpack) $ lookup "Content-Length" headers of
Just length -> do
msg <- tTake length
tr ("GOT", length)
withParseString msg deserialize
_ -> do
str <- notParsed
BS.length str `seq` withParseString str deserialize
getFirstLineResp= do
(,,) <$> httpVers <*> (BS.toStrict <$> getCode) <*> getMessage
where
httpVers= tTakeUntil (BS.isPrefixOf "HTTP" ) >> parseString
getCode= parseString
getMessage= tTakeUntilToken ("\r\n")
dechunk= do
n<- numChars
if n== 0 then do string "\r\n"; return SDone else do
r <- tTake $ fromIntegral n !> ("numChars",n)
trycrlf
tr "SMORE1"
return $ SMore r
<|> return SDone !> "SDone in dechunk"
where
trycrlf= try (string "\r\n" >> return()) <|> return ()
numChars= do l <- hex ; tDrop 2 >> return l
#endif
foldNet :: Loggable a => (Cloud a -> Cloud a -> Cloud a) -> Cloud a -> Cloud a -> Cloud a
foldNet op init action = atServer $ do
ref <- onAll $ liftIO $ newIORef Nothing
r <- exploreNetExclude []
v <-localIO $ atomicModifyIORef ref $ \v -> (Just r, v)
case v of
Nothing -> return r
Just _ -> empty
where
exploreNetExclude nodesexclude = loggedc $ do
local $ tr "EXPLORENETTTTTTTTTTTT"
action `op` otherNodes
where
otherNodes= do
node <- local getMyNode
nodes <- local getNodes'
tr ("NODES to explore",nodes)
let nodesToExplore= Prelude.tail nodes \\ (node:nodesexclude)
callNodes' nodesToExplore op init $
exploreNetExclude (union (node:nodesexclude) nodes)
getNodes'= getEqualNodes
exploreNet :: (Loggable a,Monoid a) => Cloud a -> Cloud a
exploreNet = foldNet mappend mempty
exploreNetUntil :: (Loggable a) => Cloud a -> Cloud a
exploreNetUntil = foldNet (<|>) empty
onBrowser :: Cloud a -> Cloud a
onBrowser x= do
r <- local $ return isBrowserInstance
if r then x else empty
onServer :: Cloud a -> Cloud a
onServer x= do
r <- local $ return isBrowserInstance
if not r then x else empty
atBrowser :: Loggable a => Cloud a -> Cloud a
atBrowser x= do
r <- local $ return isBrowserInstance
if r then x else atRemote x
atServer :: Loggable a => Cloud a -> Cloud a
atServer x= do
r <- local $ return isBrowserInstance
tr ("AT SERVER",r)
if not r then x else atRemote x
delta= 60
connectionTimeouts :: TransIO ()
connectionTimeouts= do
labelState "loop connections"
threads 0 $ waitEvents $ return ()
liftIO $ threadDelay 10000000
TOD time _ <- liftIO $ getClockTime
toClose <- liftIO $ atomicModifyIORef connectionList $ \ cons ->
Data.List.partition (\con ->
let mc= unsafePerformIO $ readMVar $ isBlocked con
in isNothing mc ||
((time - fromJust mc) < delta) ) cons
forM_ toClose $ \c -> liftIO $ do
tr "close "
tr $ idConn c
when (calling c) $ mclose c
cleanConnectionData c
cleanConnectionData c= liftIO $ do
modifyIORef globalFix $ \m -> M.insert (idConn c) (False,[]) m
modifyMVar_ (localClosures c) $ const $ return M.empty
modifyIORef globalFix $ \m -> M.insert (idConn c) (True,[]) m
loopClosures= do
labelState "loop closures"
threads 0 $ do
waitEvents $ threadDelay 5000000
nodes <- getNodes
node <- choose $ tail nodes
guard (isJust $ connection node)
nc <- liftIO $ readMVar $ fromJust (connection node)
conn <- choose nc
lcs <- liftIO $ readMVar $ localClosures conn
(closLocal,(mv,clos,_,cont)) <- choose $ M.toList lcs
chs <- liftIO $ readMVar $ children $ fromJust $ parent cont
return()
guard (null chs)
tr ("REMOVING", closLocal)
liftIO $ modifyMVar (localClosures conn) $ \lcs -> return $ (M.delete closLocal lcs,())
msend conn $ SLast $ ClosureData clos closLocal mempty